日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

七、pipeLine概述

發布時間:2023/12/13 综合教程 58 生活家
生活随笔 收集整理的這篇文章主要介紹了 七、pipeLine概述 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

pipeline的初始化

pipeline在創建Channel的時候被創建
在前面幾節可以看到在服務端和客戶端創建Channel的時候都會調用AbstractChannel構造方法創建Pipeline:

  protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

進入newChannelPipeline方法,最終會調用DefaultChannelPipeline()方法:

  protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在該方法中創建了tail和head,并指明兩者關系為雙向鏈表結構。
并將Channel和Pipeline綁定。

Pipeline節點數據結構:ChannelHandlerContext
ChannelHandlerContext繼承自三個接口AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker。
AttributeMap中為一個Map結構,存放了一些屬性。
ChannelInboundInvoker和ChannelOutboundInvoker定義了一些事件如傳播讀事件、綁定事件、異常事件等。

Pipeline中的兩大哨兵:head和tail

在初始化ChannelPipeline時,我們看到在其中創建了HeadContext和TailContext。
TailContext實現了ChannelInboundHandler,說明是一個inbound處理器,再看一下構造函數:inbound標識為true

 TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME,inbound: true, outbound: false);
            setAddComplete();
        }
        //本身是一個節點,包含的處理器也是自身
        public ChannelHandler handler() {
            return this;
        }

HeadContext實現了ChannelOutboundHandler和ChannelInboundHandler:但在構造函數中inbound標識為false,outbound標識為true

        //unsafe主要是讀寫等相關的操作
        private final Unsafe unsafe;
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            //設置為Channel的unsafe
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        @Override
        public ChannelHandler handler() {
          return this;
        }        

添加和刪除ChannelHandler

添加ChannelHandler

   serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    //流水線管理子通道中的Handler處理器
                    //向子通道流水線中添加一個Handler處理器
                    ch.pipeline().addLast(new SimpleNettyServerHandler());
                }
            });

跟進addlast方法,到達DefaultChannelPipeline.addlast():

判斷是否重復添加checkMultiplicity(handler);
如果未添加并且不是可共享(利用反射獲取注解Sharable 判斷),則將added屬性置為true
創建節點并添加至鏈表

創建節點

 newCtx = newContext(group, filterName(name, handler), handler);

filterName方法

private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
           //創建一個名字
            return generateName(handler);
        }
        //檢查是否重復
        checkDuplicateName(name);
        return name;
    }

newContext方法返回DefaultChannelHandlerContext.

new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);

添加節點

addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

回調完成添加事件

 //在此處,若executor 為null則獲取channel().eventLoop()賦值給executor
 EventExecutor executor = newCtx.executor();
            //判斷是否為當前線程
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
            //在當前線程則直接執行
            callHandlerAdded0(newCtx);

在此處添加handler,

   ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();

最終調用的為ChannelInitializer的handlerAdded方法

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
        try {
             initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }

initChannel(ctx)回調至初始化添加代碼,將自定義的Handler添加到Channel上。
最終移除ChannelHandlerContext

刪除ChannelHandler

找到節點

getContextOrDie(handler);
---》
//遍歷查找節點并返回
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);

鏈表刪除節點

assert ctx != head && ctx != tail;
//標準鏈表刪除
private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

回調刪除Handler事件

   //獲取當前executor
   EventExecutor executor = ctx.executor();
          //判斷是否在當前線程
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
             callHandlerRemoved0(ctx);
             ----》回調到Handler的handlerRemoved方法
              try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }

事件的傳播

Context 包裝 handler,多個 Context 在 pipeline 中形成了雙向鏈表,入站方向叫 inbound,由 head 節點開始,出站方法叫 outbound ,由 tail 節點開始。而節點中間的傳遞通過 AbstractChannelHandlerContext 類內部的 fire 系列方法,找到當前節點的下一個節點不斷的循環傳播。
Handler涉及的環節包括:數據包解碼、業務處理、目標數據編碼、把數據寫入到通道中,有出站和入站兩種操作類型:
如下圖:

入站處理,觸發的方向為:自底向上,由Netty的內部(如通道)到ChannelInboundHandler入站處理器。
出站方向,觸發的方向為:自頂向下,從ChannelOutboundHandler出站處理器到Netty的內部(如通道)。
按照這種方向來分,前面數據包解碼、業務處理這兩個環節屬于入站處理器的工作,后面目標數據編碼、把數據包寫到通道中屬于出站處理器的工作。

ChannelInboundHandler通道入站處理器

當數據或者信息如占到Netty通道時,Netty將觸發入站處理器ChannelInboundHandler對應的入站API,進行入站操作。
ChannelInboundHandler的主要操作如下:

inbound事件的傳播

以ChannelRead事件為例
handler之間的傳播信息通過fireXXX方法:其區別是從哪個節點開始傳播。

ctx.fireChannelRead(msg); 從當前節點往下傳播事件
ctx.channel().pipeline().fireChannelRead(msg);從頭節點HeadContext開始傳播
新建一個ChannelInboundHandler,如下:

public class InboundHandlerA extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandlerA:  "+msg);
        ctx.fireChannelRead(msg);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello world");
    }
}

在ChannelActive處打上斷點,進入:
從head開始傳播:

 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
....
//此處next為headContext,調用headContext的involveChannelRead方法:
next.invokeChannelRead(m);
....
 ((ChannelInboundHandler) handler()).channelRead(this, msg);
....
 //沒有做處理,繼續向下傳播
ctx.fireChannelRead(msg);
....
invokeChannelRead(findContextInbound(), msg);

看一下findContextInbound()方法,找到下一個Inbound的ctx

  AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.next;
       } while (!ctx.inbound);
       return ctx;

繼續向下:

next.invokeChannelRead(m);
....
//此處調用的為InboundHandlerA 的ChannelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg)

繼續向下,可以看到
此處findContextInbound返回的為TailContext

invokeChannelRead(findContextInbound(), msg);
....
//未處理信息的處理
onUnhandledInboundMessage(msg){
  ....
  try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
        //釋放資源
            ReferenceCountUtil.release(msg);
        }
....
}

Tips: SimpleInboundHandler會幫你自動釋放資源

outbound事件的傳播

當業務處理完成后,需要操作Java NIO底層通道時,通過一系列的ChannelOutboundHandler通道出站處理器,完成Netty通道到底層的操作。比方說,建立底層連接、斷開底層連接、寫入底層Java NIO通道等。ChannelOutboundHandler接口定義了大部分的出站操作,如下:

出站處理的方向是通過上層Netty通道去操作底層Java IO通道。

public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutboundHandlerA:  " + msg);
        ctx.write(msg, promise);
    }
    //模擬向客戶端發送消息
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> {
           //從頭結點開始傳播
            ctx.channel().write("hello world");
          //從當前節點開始往下傳播ctx.write("hello world");  
        },3, TimeUnit.SECONDS);
    }
}

在handlerAdded處打上斷點,跟進:

pipeline.write(msg);
....
//調用tailcontext的write方法
tail.write(msg);
....
//newPromise包裝Channel
write(msg, newPromise());
....
write(msg, false, promise);
....
AbstractChannelHandlerContext next = findContextOutbound(){
     AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
        }
....
next.invokeWrite(m, promise);

調用到自定義的OutboundHandlerA的write方法。
當處理完后,繼續ctx.write方法則會調用到HeadContext的unsafe.write方法:

  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

總結

以上是生活随笔為你收集整理的七、pipeLine概述的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。