Netty 4.0 新的特性及需要注意的地方
這篇文章和你一起過下Netty的主發(fā)行版本的一些顯著的改變和新特性,讓你在把你的應用程序轉換到新版本的時候有個概念。
項目結構改變
Netty的包名從org.jboss.netty改為io.netty,因為我們不在是JBoss.org的一部分了。
二進制JAR包被分為了多個子模塊以便用戶能夠從類路徑中去掉非必需的特性。當前的結構如下:
模塊描述
| netty | project parent |
| common | utility and logging |
| buffer | buffer API |
| transport | channel API and its core implementations |
| transport-rtrx | RTRX transport implementation |
| transport-sctp | SCTP transport implementation |
| transport-udt | UDT transport implementation |
| handler | channel handlers |
| codec | codec framework |
| codec-http | HTTP, Web Sockets, SPDY, and RTSP codec |
| codec-socks | Socks codec |
| example | examples |
| all | generates an all-in-one JAR |
| tarball | generates a tarball distribution |
所有的Netty的Jar(除了netty-all外)包現在都是OSGI的bundle,能夠用在你喜歡的OSGI容器上。
常用API的變化
- 現在Netty里的大部分操作都支持簡潔的方法鏈。
- 不能配置的getter現在都沒有了get/is前綴 (如Channel.getRemoteAddress()→Channel.remoteAddress())
Buffer API變化
ChannelBuffer?→?ByteBuf
由于上文所提到的結構上的變化,buffer API現在可以作為一個單獨的包被使用。為此,ChannelBuffer這個類型名也不那么講得通了,而應該變更為ByteBuf。
用來創(chuàng)建新buffer的功能類ChannelBuffers被拆分為兩個功能類:Unpooled和BufUtil。就像這個名字所暗示的,4.0引入了一個新的池化的ByteBufs,它可以通過ByteBuf的分配器(Allocator)的對應實現ByteBufAllocator來獲得。
大多數的buffer變成了動態(tài)的,具備可配置的最大容量
在3.x時期,buffer分為固定和動態(tài)兩種類型。一個固定buffer的容量在創(chuàng)建之后就無法改變,而動態(tài)buffer的容量在write*(譯者按:writeByte,writeInt,writeLong...)方法需要更多空間時自動擴容。
從4.0開始,所有buffer都變成了動態(tài)的。但是,相對于之前的動態(tài)進行了優(yōu)化。你可以更容易也更安全的對一個buffer的容量進行擴大和縮小。之所以說它容易是因為有一個新的ByteBuf.capacity(int newCapacity)的方法。說它安全是因為你可以設置一個容量的最大值,以防止容量沒有限制的增大。
| 1 | // 不要再使用 dynamicBuffer() - 使用 buffer(). |
| 2 | ByteBuf buf = ByteBuf.buffer(); |
| 8 | // 縮減buffer的容量 (最后的512個byte被丟棄) |
唯一的例外是那些使用wrappedBuffer方法創(chuàng)建的,包裝(warp)了一個buffer或一個byte數組的buffer。你無法擴大它的容量,因為這樣會使包裝一個已有buffer的目的是去意義——減少內存的復制。如果你想要在包裝了一個buffer之后改變它的容量,你應該重新創(chuàng)建一個擁有足夠容量的buffer,然后將你想要包裝的那個buffer的內容復制過來。
新接口: CompositeByteBuf
一個新的名叫CompositeByteBuf的接口為組合buffer(composite buffer)的實現定義了多種高級的操作。一個用戶可以使用組合buffer,以只比隨機訪問大一點的代價達到一個批量內存復制的目的。要創(chuàng)建一個新的組合buffer,可以像以前一樣使用Unpooled.wrappedBuffer(... 譯者注:此處省略號應該是指省略方法參數,下同)或Unpooled.compositeBuffer(...)。
可預知的NIO buffer轉型
在3.x中,ChannelBuffer.toByteBuffer()以及它的其他變體所提供的約定并不那么明確。用戶無法確定這些方法會返回一個擁有共享數據的視圖buffer還是一個擁有獨立數據的通過復制得到的buffer。4.0將toByteBuffer()替換為ByteBuf.nioBufferCount(),nioBuffer(),以及nioBUffers()。如果調用nioBufferCount()返回0,用戶總是可以通過調用copy().nioBuffer()來獲得一個復制的buffer。
對小字節(jié)序變更的支持
對小字節(jié)序的支持經歷了重大變化。在之前的版本中,一個用戶為了得到一個小字節(jié)序的buffer有兩種選擇:特別指定一個LittleEndianHeapChannelBufferFactory;用目標字節(jié)序將已存在的buffer包裝起來。4.0添加了一個新方法,ByteBuf.order(ByteOrder)。這個方法返回當前buffer對象的一個具有指定字節(jié)序的視圖buffer:
| 01 | import?io.netty.buffer.ByteBuf; |
| 02 | import?io.netty.buffer.Unpooled; |
| 03 | import?java.nio.ByteOrder; |
| 05 | ByteBuf buf = Unpooled.buffer(4); |
| 08 | System.out.format("%08x%n", buf.getInt(0)); |
| 10 | ByteBuf leBuf = buf.order(ByteOrder.LITTLE_ENDIAN); |
| 12 | System.out.format("%08x%n", leBuf.getInt(0)); |
| 15 | assert?buf == buf.order(ByteOrder.BIG_ENDIAN); |
Pooled ByteBuf
前面已經提到Netty引入了pooledByteBufinstances。這在很多方面都很實用,舉列如下:
- 限制了GC壓力,這是因為使用unpooled ByteBufs會造成沉重的分配與再分配問題
- Better handling of direct (native)ByteBuf更好的處理直接(本地)的ByteBuf
- 一個ByteBuf 可以被一個ByteBufAllocator包含.
| 01 | public?interface?ByteBufAllocator { |
| 04 | ????ByteBuf buffer(int?initialCapacity); |
| 05 | ????ByteBuf buffer(int?initialCapacity,?int?maxCapacity); |
| 06 | ????ByteBuf heapBuffer(); |
| 07 | ????ByteBuf heapBuffer(int?initialCapacity); |
| 08 | ????ByteBuf heapBuffer(int?initialCapacity,?int?maxCapacity); |
| 09 | ????ByteBuf directBuffer(); |
| 10 | ????ByteBuf directBuffer(int?initialCapacity); |
| 11 | ????ByteBuf directBuffer(int?initialCapacity,?int?maxCapacity); |
| 12 | ????ByteBuf ioBuffer(); |
| 14 | ????CompositeByteBuf compositeBuffer(); |
| 15 | ????CompositeByteBuf compositeBuffer(int?maxNumComponents); |
| 16 | ????CompositeByteBuf compositeHeapBuffer(); |
| 17 | ????CompositeByteBuf compositeHeapBuffer(int?maxNumComponents); |
| 18 | ????CompositeByteBuf compositeDirectBuffer(); |
| 19 | ????CompositeByteBuf compositeDirectBuffer(int?maxNumComponents); |
要想從一個handler那里獲取當前的?ByteBufAllocator,可以使用ChannelHandlerContext.alloc()或Channel.alloc()方法:
| 2 | ByteBuf buf = channel.alloc().buffer(512); |
| 6 | ChannelHandlerContext ctx = ... |
| 7 | ByteBuf buf2 = ctx.alloc().buffer(512); |
一旦一個ByteBuf被寫入遠程節(jié)點,它會再次自動的釋放進入釋放到池(the pool)里。
默認的ByteBufAllocator為PooledByteBufAllocator.如果你不希望使用buffer pooling或使用你自己的allocator,你可以運用Channel.config().setAllocator(..),以及一個可供選擇的?allocator,比如UnpooledByteBufAllocator。
Channel API的變化
在4.0中,許多io.netty.channel包中的類都經歷大量修改,因此文本上的簡單搜索-替換是無法讓你基于3.x的程序遷移到4.0上。這個部分會嘗試將這些重大變更背后的思考過程展示出來,而不只是簡單地作為展示所有變更。
翻新后的ChannelHandler接口
Upstream → Inbound, Downstream → Outbound
對于初學者來說,術語'upstream'(譯者注:直譯為向上游,有點像TCP/IP協議棧中從下往上,從物理層最終到達應用層這么一個流程)和'downstream'有點讓人迷惑。在4.0中,只要可能,都會使用'inbound'(譯者注:直譯為開往內地的,相對于upstream確實更貼切,即指數據從外部網絡經歷層層filter到達我們的處理邏輯)和'outbound'來替換他們。
新的ChannelHandler繼承層次
在3.x時代,ChannelHandler只是一個標記接口,而在ChannelUpstreamHandler、ChannelDownstreamHandler、LifeCycleAwareChannelHandler定義了具體的處理器方法。在Netty 4中,ChannelHandler將LifeCycleAwareChannelHandler接口和一堆實現輔助方法融合到了一起,具體見代碼:
| 01 | public?interface?ChannelHandler { |
| 03 | ????void?beforeAdd(ChannelHandlerContext ctx)?throws?Exception; |
| 04 | ????void?afterAdd(ChannelHandlerContext ctx)?throws?Exception; |
| 05 | ????void?beforeRemove(ChannelHandlerContext ctx)?throws?Exception; |
| 06 | ????void?afterRemove(ChannelHandlerContext ctx)?throws?Exception; |
| 08 | ????void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception; |
| 09 | ????void?userEventTriggered(ChannelHandlerContext ctx, Object evt)?throws?Exception; |
下方的圖表描述了這個新的類型集成層次:
fixme(原文中還沒有插入此圖)
事件對象從ChannelHandler中消失了
在3.x時代,所有的I/O操作都會創(chuàng)建一個新的ChannelEvent對象。對每個讀或寫的操作,還會額外創(chuàng)建一個新的ChannelBuffer對象。由于將資源管理和buffer的池化交給了JVM,這實際上極大地簡化了Netty的內部實現。但是,基于Netty開發(fā)的應用在高負載下運行時,有時會觀察到GC(Garbage Collection)的壓力增大或變化不定,這些問題的根源也來自于這里。
4.0通過把事件對象替換為直接與類型相對應(譯者注:原文為strongly typed,但是我覺得直譯為強類型不太容易理解)的方法調用,幾乎完全避免了事件對象的創(chuàng)建。3.x中,有類似于handleUpstream()和handleDownstream()這種能夠捕獲所有相關類型事件的處理器方法,4.0中你將不會再看到它們的身影了。所有的事件類型現在都有各自對應的處理器方法:
| 02 | void?handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)?throws?Exception; |
| 03 | void?handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)?throws?Exception; |
| 06 | void?channelRegistered(ChannelHandlerContext ctx)?throws?Exception; |
| 07 | void?channelUnregistered(ChannelHandlerContext ctx)?throws?Exception; |
| 08 | void?channelActive(ChannelHandlerContext ctx)?throws?Exception; |
| 09 | void?channelInactive(ChannelHandlerContext ctx)?throws?Exception; |
| 10 | void?inboundBufferUpdated(ChannelHandlerContext ctx)?throws?Exception; |
| 12 | void?bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelFuture future)?throws?Exception; |
| 14 | ????????ChannelHandlerContext ctx, SocketAddress remoteAddress, |
| 15 | ????????SocketAddress localAddress, ChannelFuture future)?throws?Exception; |
| 16 | void?disconnect(ChannelHandlerContext ctx, ChannelFuture future)?throws?Exception; |
| 17 | void?close(ChannelHandlerContext ctx, ChannelFuture future)?throws?Exception; |
| 18 | void?deregister(ChannelHandlerContext ctx, ChannelFuture future)?throws?Exception; |
| 19 | void?flush(ChannelHandlerContext ctx, ChannelFuture future)?throws?Exception; |
| 20 | void?read(ChannelHandlerContext ctx); |
| 21 | void?sendFile(ChannelHandlerContext ctx, FileRegion region, ChannelPromise promise)?throws?Exception; |
ChannelHandlerContext類也被修改來反映上述提到的變化:
?
| 5 | ctx.fireInboundBufferUpdated(); |
所有這些變化意味著用戶無法去擴展ChannelEvent這個已經不存在的接口了。那用戶要怎樣才能定義他或她自己的事件類型呢,就像IdleStateEvent?4.0中的ChannelHandler有一個處理器方法叫做userEventTriggered(),它就是被設計用來滿足這種特殊的用戶需求。
Simplified channel state model
在3.x中,當一個新的Channel被創(chuàng)建并連接成功,至少三個ChannelStateEvent會被觸發(fā):channelOpen、channelBound以及channelConnected。當一個Channel關閉,則對應channelDisconnected、channelUnbound以及channelClosed三個事件。
fixme
但是,觸發(fā)這么多事件的意義并不那么明顯。如果在一個Channel進入可讀或可寫的狀態(tài)時通知用戶,想來會更有幫助。
fixme
channelOpen、channelBound和channelConnected被合并為channelActive。channelDisconnected、channelUnbound和channelClosed被合并為channelInactive。類似的,Channel.isBound()和Channel.isConnected()也被合并為了Channel.isActive()。
需要注意的是,channelRegistered和channelUnregistered這兩個事件與channelOpen和channelClosed具有的意義是不一樣的。它們(channelRegistered和channelUnregistered)是在支持Channel的動態(tài)注冊、注銷以及再注冊時被引入的,就像下圖所示:
fixme
每個處理器的緩存
不像3.x那樣在每次讀操作都簡歷一個新堆里的緩存來觸發(fā)上游的MessageEvent,4.0不會每次都創(chuàng)建新的 緩存。它直接從socket中讀取數據到由用戶的ChannelInboundByteHandler和ChannelInboundMessageHandler實現創(chuàng)建的入站緩存。
因為由上述處理器創(chuàng)建的入站緩存直到關聯的通道關閉前都會重用,所以在上面的GC和內存帶寬消耗都能保持較小。同樣,當接收到的數據被銷毀時用戶已經完成操作,codec的實現就變得更簡單和有效了。
在創(chuàng)建出站緩存時也是差不多的(不會新建)。用戶的ChannelOutBoundBYteHandler和ChannelOutboundMessageHandler來操作。
不需要每條消息都有一個事件
4.0里不再有了messageReceived或writeRequested處理器方法。它們被inboundBufferUpdated和flush代替了。用戶的入隊一個或多個消息到一個入站(或出站)緩存同時會出發(fā)一個inboundBUfferUpdated(或flush)事件。
| 01 | public?void?inboundBufferUpdated(ChannelHandlerContext ctx) { |
| 02 | ????Queue<MyMessage> in = ctx.inboundMessageBuffer(); |
| 03 | ????Queue<MyNewMessage> out = ctx.nextInboundMessageBuffer(); |
| 05 | ????????MyMessage m = in.poll(); |
| 06 | ????????if?(m ==?null) { |
| 09 | ????????MyNewMessage decoded = decode(m); |
| 10 | ????????out.add(decoded); |
| 12 | ????ctx.fireInboundBufferUpdated(); |
| 15 | public?void?flush(ChannelHandlerContext ctx, ChannelFuture future) { |
| 16 | ????Queue<MyNewMessage> in = ctx.outboundMessageBuffer(); |
| 17 | ????Queue<MyMessage> out = ctx.nextOutboundMessageBuffer(); |
| 19 | ????????MyNewMessage m = in.poll(); |
| 20 | ????????if?(m ==?null) { |
| 23 | ????????MyMessage encoded = encode(m); |
| 24 | ????????out.add(encoded); |
作為選擇,用戶能夠在每個單獨的入站(或出站)消息中觸發(fā)這樣的事件來模擬老的行為,盡管相對新方法來說效率更低。
消息處理器 vs. 字節(jié)處理器
在3.x里一個MessageEvent持有一個任意的對象。它能夠是一個ChannelBuffer或是一個用戶自定義的對象,它們都是同樣對待的:
| 02 | public?void?messageReceived(ChannelHandlerContext ctx, MessageEvent evt) { |
| 03 | ????Object msg = evt.getMessage(); |
| 04 | ????if?(msg?instanceof?ChannelBuffer) { |
| 05 | ????????ChannelBuffer buf = (ChannelBuffer) msg; |
| 08 | ????????MyMessage myMsg = (MyMessage) msg; |
在4.0里,它們就分別對待了,因為一個處理器不再處理一個獨立的消息,而是處理多種多樣的消息:
| 01 | public?void?inboundBufferUpdated(ChannelHandlerContext ctx) { |
| 02 | ????if?(ctx.hasInboundByteBuffer()) { |
| 03 | ????????ByteBuf buf = ctx.inboundByteBuffer(); |
| 06 | ????????Queue<MyMessage> buf = ctx.inboundMessageBuffer(); |
| 08 | ????????????MyMessage msg = buf.poll(); |
| 09 | ????????????if?(buf ==?null) { |
你可能發(fā)現一個ServerChannel的處理器是一個入站緩存是Queue<Channel>的入站處理器是較為有趣的。
處理器適配器
大多數用戶都發(fā)現創(chuàng)建和管理它的生命周期是繁瑣的,因此它支持用戶擴展預定義好的適配器類來使得更方便:
- ChannelHandlerAdapter
- ChannelStateHandlerAdapter
- ChannelOperationHandlerAdapter
- ChannelInboundMessageHandlerAdapter
- ChannelInboundByteHandlerAdapter
- ChannelOutboundMessageHandlerAdapter
- ChannelOutboundByteHandlerAdapter
明智的和不易出錯的入站流量掛起
3.x有一個由Channel.setReadable(boolean)提供的不是很明顯的入站流量掛起機制。它引入了在ChannelHandler之間的復雜交互操作,同時處理器由于不正確實現而很容易互相干擾。
4.0里,新的名為read()的出站操作增加了。如果你使用Channel.config().setAutoRead(false)來關閉默認的auto-read標志,Netty不會讀入任何東西,直到你顯式地調用read()操作。一旦你發(fā)布的read()操作完成,同時通道再次停止讀,一個名為channelReadSuspended()的入站事件會觸發(fā)一遍你能夠重新發(fā)布另外的read()操作。你同樣也可以攔截read()操作來執(zhí)行更多高級的流量控制。
暫停接收傳入的連接
在3.x里,沒有方法讓一個用戶告訴Netty來廳子接收傳入連接,除非是阻塞I/O線程或者關閉服務器socket。在aotu-read標志沒有設置的時候,4.0涉及到的read()操作就像一個普通的通道。
半關閉socket
TCP和SCTP允許用戶關閉一個socket的出站流量而不用完全關閉它。這樣的socket被稱為“半關閉socket”,同時用戶能夠通過調用SocketChannel.shutdownOutput()方法來獲取一個半關閉socket。如果一個遠端關閉了出站通道,SocketChannel.read(..)會返回-1,這看上去并沒有和一個關閉了的鏈接有什么區(qū)別。
3.x沒有shutdownOutput()操作。同樣,它總是在SocketChannel.read(..)返回-1的時候關閉鏈接。
要支持半關閉socket,4.0增加了SocketChannel.shutdownOutput()方法,同時用戶能設置“ALLOW_HALF_CLOSURE”的ChanneOption來阻止Netty在SocketChannel.read(..)返回-1的時候自動關閉鏈接。
靈活的I/O線程分配
在3.x里,一個Channel是由ChannelFactory創(chuàng)建的,同時新創(chuàng)建的Channel會自動注冊到一個隱藏的I/O線程。4.0使用新的名為EventLoopGroup的接口來替換ChannelFactory,它由一個或多個EventLoop來構成。同樣,一個新的Channel不會自動注冊到EventLoopGroup,但用戶可以顯式調用EventLoopGroup.register()來注冊。
感謝這個變化(舉例來說,分離了ChannelFactory和I/O線程),用戶可以注冊不同的Channel實現到同一個EventLoopGroup,或者同一個Channel實現到不同的EventLoopGroup。例如,你可以運行一個NIO服務器socket,NIO UDP socket,以及虛擬機內部的通道在同一個I/O線程里。在編寫一個需要最小延遲的代理服務器時這確實很有用。
能夠從一個已存在的jdk套接字上創(chuàng)建一個Channel
3.x沒提供方法從已存在的jdk套接字(如java.nio.channels.SocketChannel)創(chuàng)建一個新的通道。現在你可以用4.0這樣做了。
取消注冊和重新注冊一個Channel從/到一個I/O線程
一旦一個新的Channel在3.x里創(chuàng)建,它完全綁定到一個單一的I/O線程上,直到它底層的socket關閉。在4.0里,用戶能夠從I/O線程里取消注冊一個Channel來完全控制它底層jdk套接字。例如,你能夠利用Netty提供的高層次無阻塞I/O的優(yōu)勢來解決復雜的協議,然后取消注冊Channel并且切換到阻塞模式來在可能的最大吞吐量下傳輸一個文件。當然,它能夠再次注冊已經取消了注冊的Channel。
| 01 | java.nio.channels.FileChannel myFile = ...; |
| 02 | java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open(); |
| 04 | // Perform some blocking operation here. |
| 08 | SocketChannel ch =?new?NioSocketChannel(mySocket); |
| 09 | EventLoopGroup group = ...; |
| 13 | // Deregister from Netty. |
| 14 | ch.deregister().sync(); |
| 16 | // Perform some blocking operation here. |
| 17 | mySocket.configureBlocking(false); |
| 18 | myFile.transferFrom(mySocket, ...); |
| 20 | // Register back again to another event loop group. |
| 21 | EventLoopGroup anotherGroup = ...; |
| 22 | anotherGroup.register(ch); |
調度任意的任務到一個I/O線程里運行
當一個Channel被注冊到EventLoopGroup時,Channel實際上是注冊到由EventLoopGroup管理EventLoop中的一個。EventLoop實現了java.utilconcurrent.ScheduledExecutorService接口。這意味著用戶可以在一個用戶通道歸屬的I/O線程里執(zhí)行或調度一個任意的Runnable或Callable。隨著新的娘好定義的線程模型的到來(稍后會介紹),它變得極其容易地編寫一個線程安全的處理器。
| 01 | public?class?MyHandler?extends?ChannelOutboundMessageHandlerAdapter { |
| 03 | ????public?void?flush(ChannelHandlerContext ctx, ChannelFuture f) { |
| 07 | ????????// Schedule a write timeout. |
| 08 | ????????ctx.executor().schedule(new?MyWriteTimeoutTask(),?30, TimeUnit.SECONDS); |
| 14 | ????public?static?void?main(String[] args)?throws?Exception { |
| 15 | ????????// Run an arbitrary task from an I/O thread. |
| 16 | ????????Channel ch = ...; |
| 17 | ????????ch.executor().execute(new?Runnable() { ... }); |
簡化的關閉
releaseExternalResources()不必再用了。你可以通過調用EventLoopGroup.shutdown()直接地關閉所有打開的連接同時使所有I/O線程停止,就像你使用java.util.concurrent.ExecutorService.shutdown()關閉你的線程池一樣。
類型安全的ChannelOptions
有兩個方法來配置Netty的Channel的socket參數。第一個是明確地調用ChannelConfig的setter,例如SocketChannelConfig.setTcpNoDelay(true)。這是最為類型安全的方法。另外一個是調用ChannelConfig.setOption()方法。有時候你不得不決定在運行時的時候socket要配置什么選項,同時這個方法在這種情況下有點不切實際。然而,在3.x里它是容易出錯的,因為一個用戶必需用一對字符串和對象來指定選項。如果用戶調用了錯誤的選項名或者值,他或她將會趙宇到一個ClassCastException或指定的選項甚至可能會默默地忽略了。
4.0引入了名為ChannelOption的新的類型,它提供了類型安全地訪問socket選項。
| 01 | ChannelConfig cfg = ...; |
| 04 | cfg.setOption("tcpNoDelay",?true); |
| 05 | cfg.setOption("tcpNoDelay",?0);??// Runtime ClassCastException |
| 06 | cfg.setOption("tcpNoDelays",?true);?// Typo in the option name - ignored silently |
| 09 | cfg.setOption(ChannelOption.TCP_NODELAY,?true); |
| 10 | cfg.setOption(ChannelOption.TCP_NODELAY,?0);?// Compile error |
AttributeMap
在回應用戶指令里,你可以附加任意的對象到Channel和ChannelHandlerContext。一個名為AttributeMap的新接口被加入了,它被Channel和ChannelHandlerContext繼承。作為替代,ChannelLocal和Channel.attachment被移除。這些屬性會在他們關聯的Channel被垃圾回收的同時回收。
| 01 | public?class?MyHandler?extends?ChannelInboundMessageHandlerAdapter<MyMessage> { |
| 03 | ????private?static?final?AttributeKey<MyState> STATE = |
| 04 | ????????????new?AttributeKey<MyState>("MyHandler.state"); |
| 07 | ????public?void?channelRegistered(ChannelHandlerContext ctx) { |
| 08 | ????????ctx.attr(STATE).set(new?MyState()); |
| 09 | ????????ctx.fireChannelRegistered(); |
| 13 | ????public?void?messageReceived(ChannelHandlerContext ctx, MyMessage msg) { |
| 14 | ????????MyState state = ctx.attr(STATE).get(); |
?
新的bootstrap API
bootstrap API已經重頭重寫,盡管它的目的還是一樣;它執(zhí)行需要使服務器或客戶端運行的典型步驟,通常能在樣板代碼里找到。新的bootstrap同樣采取了流暢的接口。
| 01 | public?static?void?main(String[] args)?throws?Exception { |
| 02 | ????// Configure the server. |
| 03 | ????ServerBootstrap b =?new?ServerBootstrap(); |
| 05 | ????????b.group(new?NioEventLoopGroup(),?new?NioEventLoopGroup()) |
| 06 | ?????????.channel(new?NioServerSocketChannel()) |
| 07 | ?????????.option(ChannelOption.SO_BACKLOG,?100) |
| 08 | ?????????.localAddress(8080) |
| 09 | ?????????.childOption(ChannelOption.TCP_NODELAY,?true) |
| 10 | ?????????.childHandler(new?ChannelInitializer<SocketChannel>() { |
| 12 | ?????????????public?void?initChannel(SocketChannel ch)?throws?Exception { |
| 13 | ?????????????????ch.pipeline().addLast(handler1, handler2, ...); |
| 17 | ????????// Start the server. |
| 18 | ????????ChannelFuture f = b.bind().sync(); |
| 20 | ????????// Wait until the server socket is closed. |
| 21 | ????????f.channel().closeFuture().sync(); |
| 23 | ????????// Shut down all event loops to terminate all threads. |
?
ChannelPipelineFactory → ChannelInitializer
和你在上面的例子注意到的一樣,ChannelPipelineFactory不再存在了。而是由ChannelInitializer來替換,它給予了在Channel和ChannelPipeline的配置的更多控制。
請注意,你不能自己創(chuàng)建一個新的ChannelPipeline。通過觀察目前為止的用例報告,Netty項目隊伍總結到讓用戶去創(chuàng)建自己的管道實現或者是繼承默認的實現是沒有好處的。因此,ChannelPipeline不再讓用戶創(chuàng)建。ChannelPipeline由Channel自動創(chuàng)建。
ChannelFuture拆分為ChannelFuture和ChannelPromise
ChannelFuture已經被拆分為ChannelFuture和ChannelPromise了。這不僅僅是讓異步操作里的生產者和消費者間的約定更明顯,同樣也是得在使用從鏈中返回的ChannelFuture更加安全,因為ChannelFuture的狀態(tài)是不能改變的。
由于這個編號,一些方法現在都采用ChannelPromiser而不是ChannelFuture來改變它的狀態(tài)。
良好定義的線程模型
在3.x里并沒有良好設計的線程模型,盡管曾經要修復線程模型在3.5的不一致性。4.0定義的一個嚴格的線程模型來幫助用戶編寫ChannelHandler而不必擔心太多關于線程安全的東西。
- Netty將不會再同步地調用ChannelHandler的方法了,除非ChannelHandler由@Shareable注解。這不會理會處理器方法的類似——入站、操作、或者是生命周期時間處理器方法。
- 用戶不再需要同步入站或出站的事件處理器方法。
- 4.0不允許加入加入一個ChannelHandler超過一次,除非它由@Sharable注解。
- 每個由Netty調用的ChannelHandler的方法之間的關系總是happens-before。
- 用戶不用定義一個volatile字段來保存處理器的狀態(tài)。
- 用戶能夠在他加入一個處理器到ChannelPipeline的時候指定EventExecutor。
- 如果有指定,ChannelHandler的處理器方法總是由自動的EventExecutor來調用
- 如果沒指定,處理器方法總是由它關聯的Channel注冊到的EventLoop來調用。
- 聲明到一個處理器的EventExecutor和EventLoop總是單線程的。
- 處理器方法總是由相同線程調用。
- 如果指定了多線程的EventExecutor或EventLoop,線程中的一個會被選擇,然后選擇到的線程將會被使用,直到取消注冊。
- 如果在相同管道里的兩個處理器聲明到不同的EventExecutor,它們會同時被調用。如果多個一個處理器去訪問共享數據,用戶需要注意線程安全,即使共享數據只能被相同管道里的處理器訪問。
- 加入到ChannelFuture的ChannelFutureListener總是由關聯到future相關的Channel的EventLoop線程調用。
不再有ExecutionHandler ——它包含到核心里
在你加入一個ChannelHandler到一個ChannelPipeline來告訴管道總是通過指定的EventExecutor調用加入的ChannelHander處理器的方法的時候,你可以指定一個EventExecutor。
| 2 | ChannelPipeline p = ch.pipeline(); |
| 3 | EventExecutor e1 =?new?DefaultEventExecutor(16); |
| 4 | EventExecutor e2 =?new?DefaultEventExecutor(8); |
| 6 | p.addLast(new?MyProtocolCodec()); |
| 7 | p.addLast(e1,?new?MyDatabaseAccessingHandler()); |
| 8 | p.addLast(e2,?new?MyHardDiskAccessingHandler()); |
EventExecutor是EventLoop的超類,同時也繼承了ScheduledExecutorService。
fixme
編碼解碼器框架變化
在編碼解碼器框架里有實質性的內部改變,因為4.0需要一個處理器來創(chuàng)建和管理它的緩存(看這篇文章的每個處理器緩存部分。)然而,從用戶角度來看這些變化都不是很大的。
- 核心編碼界面器類移到io.netty.handler.codec包里。
- FrameDecoder重命名為ByteToMessageDecoder。
- OneToOneEncoder和OneToOneDecoder由MessageToMessageEncoder和MessageToMessageDecoder替換。
- decode(),decodeLast(),encode()的方法前面稍微改變了來支持泛型同時移除冗余參數。
編碼解碼器嵌入器→ EmbeddedChannel
編碼解碼器嵌入器已經被 io.netty.channel.embedded.EmbeddedByteChannel和EmbeddedMessageChannel替換了。EmbeddedChannel允許用戶對任何包含編碼解碼器的管道進行單元測試。
HTTP編碼解碼器
HTTP解碼器現在在每個HTTP消息中總生成多個消息對象:
| 1 | 1???????* HttpRequest / HttpResponse |
| 3 | 1???????* LastHttpContent |
要看更多的細節(jié),請到轉到已更新了的HttpSnoopServer例子。如果你希望為一個單一的HTTP消息處理多個消息,你可以把HttpObjectAggregator放入管道里。HttpObjectAggregator會把多個消息轉換為一個單一的FullHttpRequest或是FullHttpResponse。
傳輸實現的變化
下面是傳輸協議新加入的東西:
- 使用NIO.2AsynchronousSocketChannel的AIO套接字傳輸實現。
- OIO SCTP 傳輸實現
- UDT 傳輸實現
用例學習:移植示例Factorial
這部分粗略地展示把示例Factorial從3.0移植到4.0的步驟。示例Factorial已經移植到4.0了,它放在io.netty.example.factorial包里。請瀏覽示例的源代碼來看下每一處的變化。
移植服務端
重寫FactorialServer.run()方法來使用新的 bootstrap API。不再有ChannelFactory了。 由你自己去實例化NioEventLoop(一個是用來接收接入的鏈接,另外的就用來處理接收到的鏈接)。從命名FactorialServerPipelineFactory為FactorialServerInitializer。讓它繼承ChannelInitializer<Channel>。取代創(chuàng)建新的ChannelPipeline,通過Channel.pipeline()來獲取。讓FactorialServerHandler繼承sChannelInboundMessageHandlerAdapter<BigInteger>。用channelInactive()來替換channelDisconnected()。handleUpstream()不能再使用了。讓BigIntegerDecoder繼承ByteToMessageDecoder<BigInteger>。讓NumberEncoder繼承MessageToByteEncoder<Number>。encode()不在返回一個緩存了。由ByteToMessageDecoder來提供填充編碼好的數據到緩存里。移植客戶端
大部分和移植服務端差不多,但你要在你編寫一個潛在的大數據流時要多注意下。
重寫FactorialClient.run()方法來使用新的bootstrap API。重命名FactorialClientPipelineFactory為FactorialClientInitializer。使FactorialClientHandler繼承ChannelInboundMessageHandlerAdapter<BigInteger>在這一點,你發(fā)現在4.0里沒有了Channel.isWritable()或者channelInterestChanged()。作為代替,你自己來管理那些未決定的寫操作。新的sendNumbers()看起來如下: | 01 | private?void?sendNumbers() { |
| 02 | ????// Do not send more than 4096 numbers. |
| 03 | ????boolean?finished =?false; |
| 04 | ????MessageBuf<Object> out = ctx.nextOutboundMessageBuffer(); |
| 05 | ????while?(out.size() <?4096) { |
| 06 | ????????if?(i <= count) { |
| 07 | ????????????out.add(Integer.valueOf(i)); |
| 10 | ????????????finished =?true; |
| 15 | ????ChannelFuture f = ctx.flush(); |
| 17 | ????????f.addListener(numberSender); |
| 21 | private?final?ChannelFutureListener numberSender =?new?ChannelFutureListener() { |
| 23 | ????public?void?operationComplete(ChannelFuture future)?throws?Exception { |
| 24 | ????????if?(future.isSuccess()) { |
| 25 | ????????????sendNumbers(); |
本文地址:http://www.oschina.net/translate/netty-4-0-new-and-noteworthy
原文地址:http://netty.io/wiki/new-and-noteworthy.html
本文中的所有譯文僅用于學習和交流目的,轉載請務必注明文章譯者、出處、和本文鏈接
我們的翻譯工作遵照?CC 協議,如果我們的工作有侵犯到您的權益,請及時聯系我們 轉自:http://www.oschina.net/translate/netty-4-0-new-and-noteworthy?print
轉載于:https://www.cnblogs.com/blogsme/p/3599138.html
總結
以上是生活随笔為你收集整理的Netty 4.0 新的特性及需要注意的地方的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。