netty系列之:channelPipeline详解
文章目錄
- 簡介
- ChannelPipeline
- 事件傳遞
- DefaultChannelPipeline
- 總結
簡介
我們在介紹channel的時候提到過,幾乎channel中所有的實現都是通過channelPipeline進行的,作為一個pipline,它到底是如何工作的呢?
一起來看看吧。
ChannelPipeline
ChannelPipeline是一個interface,它繼承了三個接口,分別是ChannelInboundInvoker,ChannelOutboundInvoker和Iterable:
public interface ChannelPipelineextends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>繼承自ChannelInboundInvoker,表示ChannelPipeline可以觸發channel inboud的一些事件,比如:
ChannelInboundInvoker fireChannelRegistered(); ChannelInboundInvoker fireChannelUnregistered(); ChannelInboundInvoker fireChannelActive(); ChannelInboundInvoker fireChannelInactive(); ChannelInboundInvoker fireExceptionCaught(Throwable cause); ChannelInboundInvoker fireUserEventTriggered(Object event); ChannelInboundInvoker fireChannelRead(Object msg); ChannelInboundInvoker fireChannelReadComplete(); ChannelInboundInvoker fireChannelWritabilityChanged();繼承自ChannelOutboundInvoker,表示ChannelPipeline可以進行一些channel的主動操作,如:bind,connect,disconnect,close,deregister,read,write,flush等操作。
繼承自Iterable,表示ChannelPipeline是可遍歷的,為什么ChannelPipeline是可遍歷的呢?
因為ChannelPipeline中可以添加一個或者多個ChannelHandler,ChannelPipeline可以看做是一個ChannelHandler的集合。
比如ChannelPipeline提供了一系列的添加ChannelHandler的方法:
ChannelPipeline addFirst(String name, ChannelHandler handler); ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); ChannelPipeline addFirst(ChannelHandler... handlers);ChannelPipeline addLast(String name, ChannelHandler handler); ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); ChannelPipeline addLast(ChannelHandler... handlers); ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);可以從前面添加,也可以從后面添加,或者從特定的位置添加handler。
另外還可以從pipeline中刪除特定的channelHandler,或者移出和替換特定位置的handler:
ChannelPipeline remove(ChannelHandler handler); ChannelHandler remove(String name); ChannelHandler removeFirst(); ChannelHandler removeLast(); ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);當然,更少不了對應的查詢操作:
ChannelHandler first(); ChannelHandler last(); ChannelHandler get(String name); List<String> names();還可以根據傳入的ChannelHandler獲得handler對應的ChannelHandlerContext。
ChannelHandlerContext context(ChannelHandler handler);ChannelPipeline中還有一些觸發channel相關的事件,如:
ChannelPipeline fireChannelRegistered();ChannelPipeline fireChannelUnregistered();ChannelPipeline fireChannelActive();ChannelPipeline fireChannelInactive();ChannelPipeline fireExceptionCaught(Throwable cause);ChannelPipeline fireUserEventTriggered(Object event);ChannelPipeline fireChannelRead(Object msg);ChannelPipeline fireChannelReadComplete();ChannelPipeline fireChannelWritabilityChanged();事件傳遞
那么有些朋友可能會問了,既然ChannelPipeline中包含了很多個handler,那么handler中的事件是怎么傳遞的呢?
其實這些事件是通過調用ChannelHandlerContext中的相應方法來觸發的。
對于Inbound事件來說,可以調用下面的方法,進行事件的傳遞:
ChannelHandlerContext.fireChannelRegistered() ChannelHandlerContext.fireChannelActive() ChannelHandlerContext.fireChannelRead(Object) ChannelHandlerContext.fireChannelReadComplete() ChannelHandlerContext.fireExceptionCaught(Throwable) ChannelHandlerContext.fireUserEventTriggered(Object) ChannelHandlerContext.fireChannelWritabilityChanged() ChannelHandlerContext.fireChannelInactive() ChannelHandlerContext.fireChannelUnregistered()對于Outbound事件來說,可以調用下面的方法,進行事件的傳遞:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise) ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext.write(Object, ChannelPromise) ChannelHandlerContext.flush() ChannelHandlerContext.read() ChannelHandlerContext.disconnect(ChannelPromise) ChannelHandlerContext.close(ChannelPromise) ChannelHandlerContext.deregister(ChannelPromise)具體而言,就是在handler中調用ChannelHandlerContext中對應的方法:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("Connected!");ctx.fireChannelActive();}}public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) {System.out.println("Closing ..");ctx.close(promise);}}DefaultChannelPipeline
ChannelPipeline有一個官方的實現叫做DefaultChannelPipeline,因為對于pipeline來說,主要的功能就是進行handler的管理和事件傳遞,相對于而言功能比較簡單,但是他也有一些特別的實現地方,比如它有兩個AbstractChannelHandlerContext類型的head和tail。
我們知道ChannelPipeline實際上是很多handler的集合,那么這些集合是怎么進行存儲的呢?這種存儲的數據結構就是AbstractChannelHandlerContext。每個AbstractChannelHandlerContext中都有一個next節點和一個prev節點,用來組成一個雙向鏈表。
同樣的在DefaultChannelPipeline中使用head和tail來將封裝好的handler存儲起來。
注意,這里的head和tail雖然都是AbstractChannelHandlerContext,但是兩者有稍許不同。先看下head和tail的定義:
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}在DefaultChannelPipeline的構造函數中,對tail和head進行初始化,其中tail是TailContext,而head是HeadContext。
其中TailContext實現了ChannelInboundHandler接口:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler而HeadContext實現了ChannelOutboundHandler和ChannelInboundHandler接口:
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler下面我們以addFirst方法為例,來看一下handler是怎么被加入pipline的:
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);name = filterName(name, handler);newCtx = newContext(group, name, handler);addFirst0(newCtx);// If the registered is false it means that the channel was not registered on an eventLoop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}callHandlerAdded0(newCtx);return this;}它的工作邏輯是首先根據傳入的handler構建一個新的context,然后調用addFirst0方法,將context加入AbstractChannelHandlerContext組成的雙向鏈表中:
private void addFirst0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext nextCtx = head.next;newCtx.prev = head;newCtx.next = nextCtx;head.next = newCtx;nextCtx.prev = newCtx;}然后調用callHandlerAdded0方法來觸發context的handlerAdded方法。
總結
channelPipeline負責管理channel的各種handler,在DefaultChannelPipeline中使用了AbstractChannelHandlerContext的head和tail來對多個handler進行存儲,同時借用這個鏈式結構對handler進行各種管理,非常方便。
本文已收錄于 http://www.flydean.com/04-3-netty-channelpipeline/
最通俗的解讀,最深刻的干貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!
歡迎關注我的公眾號:「程序那些事」,懂技術,更懂你!
總結
以上是生活随笔為你收集整理的netty系列之:channelPipeline详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dart系列之:集合使用实践
- 下一篇: netty系列之:channelHand