Netty学习笔记(五)Pipeline
Pipeline是Netty中的另一核心組件,前面在說過在Channel進行初始化的時候最后創建一系列的重要對象,其中就有Pipeline
我們看下Netty官網對于Pipeline的定義
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel
Pipeline就是由一系列處理Channel的inbound和outbound事件的ChannelHandlers組成的集合。那我們來看下其具體的實現
Pipeline初始化
AbstractChannel
protected AbstractChannel(Channel parent, ChannelId id) {this.parent = parent;this.id = id;unsafe = newUnsafe();pipeline = newChannelPipeline();} protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}protected DefaultChannelPipeline(Channel channel) {//保存channel的信息到pipelinethis.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}從Pipeline的初始化代碼可以看出,Pipeline本質是一個雙向鏈表,每個節點都是ChannelHandlerContext的對象,節點里保存了Pipeline的信息。這個鏈表的頭是 HeadContext,鏈表的尾是 TailContext,并且每個ChannelHandlerContext 中又關聯著一個ChannelHandler。
可以看下HeadContext和TailContext的類圖
不難看出HeadContext實現了ChannelOutboundHandler接口 , TailContext實現了ChannelInboundHandler 接口,二者都實現了ChannelHandlerContext接口,可以說 head 和tail 既是一個ChannelHandler,又是一個ChannelHandlerContext (這里不太明白為什么Head實現了ChannelInboundHandler 接口,但是Inboud屬性又是false ??)
再看下HeadContext的構造方法的代碼
有兩個重要的參數inbound,outbound,用于標識節點的inbound和outbound屬性,HeadContext傳入了inbound=false,outbound=true , TailContext則相反,傳入了inbound=true,outbound=false
初始化后的Pipeline結構如下所示:
Pipeline context添加
在使用Netty框架的時候,我們一般會在進行bootstrap或者serverBootstrap初始化的時候通過如下的方式向Pipeline中添加節點
bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast("frameEncoder", new LengthFieldPrepender(4))//對象參數類型編碼器.addLast("encoder", new ObjectEncoder())// 對象參數類型解碼器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}});調用handler時傳入了 ChannelInitializer對象,它提供了一個initChannel()方法讓我們實現自己的初始化操作,在初始化的時候我們自己又定義了對pipeline的添加操作addLast
這里的ChannelInitializer是一個ChannelInboundHanler對象,這里的channelHandler會在客戶端啟動的是執行如下代碼的時候加入到pipeline中
那么我們來看下這個pipeline的addLast方法到底做了什么?
DefaultChannelPipeline實現了多個重載的addLast方法,最后會調用如下代碼
主要分為四步:
- 校驗handler是否重復
- 創建context節點
- 添加context節點到Pipeline尾部
- 執行回調通知
注意下,這里有一個callHandlerCallbackLater方法,當channel還沒有被注冊的時候,會創建一個PendingHandlerAddedTask對象,它包含一個新增的context對象,后面會用到,先mark一下
判斷handler是否重復
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}根據handler的added標識判斷該handler是否已經添加過,如果handler不是sharable共享的且已經添加過就報錯,否則將added置為true
這里的isSharable()就是通過該類上是否有Sharable注解來實現的,為了提高效率,Netty對其做了緩存
創建Context節點
newCtx = newContext(group, filterName(name, handler), handler);先根據name和handler為這個context創建一個唯一的name
private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;}如果name為空,就根據handler來自動生成一個name,默認是handler的類名+"#0",生成后直接校驗該name是否重復;如果name是用戶指定的,就直接校驗name是否重復,重復的話就報錯給用戶
private String generateName(ChannelHandler handler) {Map<Class<?>, String> cache = nameCaches.get();Class<?> handlerType = handler.getClass();String name = cache.get(handlerType);if (name == null) {//根據handler類名生成默認的name handlerClassName+"#0"name = generateName0(handlerType);cache.put(handlerType, name);}//這里的context0就是用來檢測pipeline里是否已有同名的context//如果同名則將最后的數字遞增,一直到沒有重名為止if (context0(name) != null) {String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.for (int i = 1;; i ++) {String newName = baseName + i;if (context0(newName) == null) {name = newName;break;}}}return name;} //從head節點開始一直向下遍歷到tail,判斷Pipeline中是否已有同名的context private AbstractChannelHandlerContext context0(String name) {AbstractChannelHandlerContext context = head.next;while (context != tail) {if (context.name().equals(name)) {return context;}context = context.next;}return null;}接下來就是根據校驗后的name生成對應的context節點,然后保存對應的handler,pipeline,executor,inbound,outbound屬性字段。這里的inbound,outbound就是根據其實現的接口類型來判斷的
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {//這里的group=nullreturn new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}添加context到Pipeline尾部
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}就是一個簡單的雙向鏈表的操作,把context節點加入到tail節點前的最后一個節點
自此,我們可以得到一個如下的Pipeline
執行回調通知
調用的是如下方法: callHandlerAdded0(newCtx);
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.handler().handlerAdded(ctx);ctx.setAddComplete();} catch (Throwable t) {}}final void setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return;}}}節點添加之后,調用handler的handlerAdded通知ChannelHandler,并嘗試設置context的狀態為ADD_COMPLETE
setAddComplete的退出只有兩種可能:
1.handler已經被移除 此時狀態為REMOVE_COMPLETE
2.成功設置Handler狀態為ADD_COMPLETE
initChannel的執行
現在我們已經添加了ChannelInitializer這個自定義的handler到Pipeline,但是我們真正想要添加到Pipeline的應該是定義在initChannel里的操作,那么這個方法又是在哪里執行的呢?
搜索一下ChannelInitializer代碼,我們可以發現initChannel()方法在channelRegistered和handlerAdded方法里都有調用,那么實際是在哪個方法回調里執行的呢?我們從客戶端的啟動開始再次分析下流程
調用鏈如下:
Bootstrap.doResolveAndConnect()
–>AbstractBootstrap.initAndRegister()
–>SingleThreadEventLoop.register(channel)
–>SingleThreadEventLoop.register(promise)
–>AbstractChannel.register(eventLoop, promise)
–>AbstractChannel.register0(promise)
–>AbstractNioChannel.doRegister()
我們看下這里的register0方法
private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}在注冊成功之后會先調用pipeline.invokeHandlerAddedIfNeeded,再調用pipeline.fireChannelRegistered()方法
先來看下invokeHandlerAddedIfNeeded方法:
這里的firstRegistration=true,所以會執行對應的分支,說明當注冊channel到eventLoop成功后,就可以通過handlerAdded回調方法去添加的我們定義channelHandler , 接下去看看這個方法是怎么做的
private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;// This Channel itself was registered.registered = true;pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;// Null out so it can be GC'ed.this.pendingHandlerCallbackHead = null;}// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside// the EventLoop.PendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {task.execute();task = task.next;}}先會拿到pendingHandlerCallbackHead(實現了runnable接口)對象,它就是前面在執行addLast(ChannelInitializer)時創建的,后面就是遍歷這個鏈表,調用節點的execute()執行
@Overridevoid execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {executor.execute(this); }}private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {}}final void callHandlerAdded() throws Exception {// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates// any pipeline events ctx.handler() will miss them because the state will not allow it.if (setAddComplete()) {handler().handlerAdded(this);}}后面的邏輯就非常清晰了,獲取ChannelInitinal的handler并調用其handlerAdded方法,從而執行我們實現的initChannel方法,嘗試將我們自己定義的channelHandler添加到Pipeline,也就是會執行如下代碼:
socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast("frameEncoder", new LengthFieldPrepender(4))//對象參數類型編碼器.addLast("encoder", new ObjectEncoder())// 對象參數類型解碼器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}前面說到執行完invokeHandlerAddedIfNeeded,后面還有一個fireChannelRegistered的方法方法回調,我們來簡單看下,代碼如下:
public final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}private void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {//對于Head節點,這里會執行這個分支代碼fireChannelRegistered();}}也就是說會從Head節點開始一直向下遍歷,找到每個Inbound屬性為true的節點,然后調用其channelRegistered方法做回調通知
此時我們就會得到如下的一個Pipeline(這里只畫最后的自定義handler)
添加完上述的channelHandler之后,還有最后一步remove(ctx),也就是說會將當前的context(ChannelInitializer從Pipeline移除),就剩下如下的Pipeline雙向鏈表了
移除的邏輯和添加類似,就不再講了
總結
1.Pipeline底層是一個雙向鏈表結構,添加和刪除節點就是維護這個雙向鏈表
2.Pipeline每個節點context都會有一個唯一name,默認是HandlerClassName+"#0"
3.客戶端初始化添加用戶自定義Handler到Pipeline,會先添加ChannelInitializer到Pipeline,在調用jdk底層NIO API完成注冊后會添加自定義的Handler到Pipeline,然后從Pipeline移除ChannelInitializer
總結
以上是生活随笔為你收集整理的Netty学习笔记(五)Pipeline的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty学习笔记(四)EventLoo
- 下一篇: Netty学习笔记(六)Pipeline