日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty学习笔记(五)Pipeline

發布時間:2024/4/11 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习笔记(五)Pipeline 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Pipeline是Netty中的另一核心組件,前面在說過在Channel進行初始化的時候最后創建一系列的重要對象,其中就有Pipeline
我們看下Netty官網對于Pipeline的定義

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel

Pipeline就是由一系列處理Channel的inbound和outbound事件的ChannelHandlers組成的集合。那我們來看下其具體的實現

Pipeline初始化

AbstractChannel

protected AbstractChannel(Channel parent, ChannelId id) {this.parent = parent;this.id = id;unsafe = newUnsafe();pipeline = newChannelPipeline();} protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}protected DefaultChannelPipeline(Channel channel) {//保存channel的信息到pipelinethis.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}

從Pipeline的初始化代碼可以看出,Pipeline本質是一個雙向鏈表,每個節點都是ChannelHandlerContext的對象,節點里保存了Pipeline的信息。這個鏈表的頭是 HeadContext,鏈表的尾是 TailContext,并且每個ChannelHandlerContext 中又關聯著一個ChannelHandler。

可以看下HeadContext和TailContext的類圖


不難看出HeadContext實現了ChannelOutboundHandler接口 , TailContext實現了ChannelInboundHandler 接口,二者都實現了ChannelHandlerContext接口,可以說 head 和tail 既是一個ChannelHandler,又是一個ChannelHandlerContext (這里不太明白為什么Head實現了ChannelInboundHandler 接口,但是Inboud屬性又是false ??)
再看下HeadContext的構造方法的代碼

HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, false, true);unsafe = pipeline.channel().unsafe();setAddComplete();}

有兩個重要的參數inbound,outbound,用于標識節點的inbound和outbound屬性,HeadContext傳入了inbound=false,outbound=true , TailContext則相反,傳入了inbound=true,outbound=false

初始化后的Pipeline結構如下所示:

Pipeline context添加

在使用Netty框架的時候,我們一般會在進行bootstrap或者serverBootstrap初始化的時候通過如下的方式向Pipeline中添加節點

bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast("frameEncoder", new LengthFieldPrepender(4))//對象參數類型編碼器.addLast("encoder", new ObjectEncoder())// 對象參數類型解碼器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}});

調用handler時傳入了 ChannelInitializer對象,它提供了一個initChannel()方法讓我們實現自己的初始化操作,在初始化的時候我們自己又定義了對pipeline的添加操作addLast
這里的ChannelInitializer是一個ChannelInboundHanler對象,這里的channelHandler會在客戶端啟動的是執行如下代碼的時候加入到pipeline中

void init(Channel channel) throws Exception {ChannelPipeline p = channel.pipeline();//這里的config.handler()調用的就是bootstrap.handler(),也就是我們自定義的ChannelInitializer對象p.addLast(config.handler()); }

那么我們來看下這個pipeline的addLast方法到底做了什么?
DefaultChannelPipeline實現了多個重載的addLast方法,最后會調用如下代碼

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}

主要分為四步:

  • 校驗handler是否重復
  • 創建context節點
  • 添加context節點到Pipeline尾部
  • 執行回調通知
    注意下,這里有一個callHandlerCallbackLater方法,當channel還沒有被注冊的時候,會創建一個PendingHandlerAddedTask對象,它包含一個新增的context對象,后面會用到,先mark一下
判斷handler是否重復
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}

根據handler的added標識判斷該handler是否已經添加過,如果handler不是sharable共享的且已經添加過就報錯,否則將added置為true
這里的isSharable()就是通過該類上是否有Sharable注解來實現的,為了提高效率,Netty對其做了緩存

public boolean isSharable() {Class<?> clazz = getClass();Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = cache.get(clazz);if (sharable == null) {sharable = clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;}
創建Context節點
newCtx = newContext(group, filterName(name, handler), handler);

先根據name和handler為這個context創建一個唯一的name

private String filterName(String name, ChannelHandler handler) {if (name == null) {return generateName(handler);}checkDuplicateName(name);return name;}

如果name為空,就根據handler來自動生成一個name,默認是handler的類名+"#0",生成后直接校驗該name是否重復;如果name是用戶指定的,就直接校驗name是否重復,重復的話就報錯給用戶

private String generateName(ChannelHandler handler) {Map<Class<?>, String> cache = nameCaches.get();Class<?> handlerType = handler.getClass();String name = cache.get(handlerType);if (name == null) {//根據handler類名生成默認的name handlerClassName+"#0"name = generateName0(handlerType);cache.put(handlerType, name);}//這里的context0就是用來檢測pipeline里是否已有同名的context//如果同名則將最后的數字遞增,一直到沒有重名為止if (context0(name) != null) {String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.for (int i = 1;; i ++) {String newName = baseName + i;if (context0(newName) == null) {name = newName;break;}}}return name;} //從head節點開始一直向下遍歷到tail,判斷Pipeline中是否已有同名的context private AbstractChannelHandlerContext context0(String name) {AbstractChannelHandlerContext context = head.next;while (context != tail) {if (context.name().equals(name)) {return context;}context = context.next;}return null;}

接下來就是根據校驗后的name生成對應的context節點,然后保存對應的handler,pipeline,executor,inbound,outbound屬性字段。這里的inbound,outbound就是根據其實現的接口類型來判斷的

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {//這里的group=nullreturn new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;this.executor = executor;this.inbound = inbound;this.outbound = outbound;// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}
添加context到Pipeline尾部
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}

就是一個簡單的雙向鏈表的操作,把context節點加入到tail節點前的最后一個節點
自此,我們可以得到一個如下的Pipeline

執行回調通知

調用的是如下方法: callHandlerAdded0(newCtx);

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.handler().handlerAdded(ctx);ctx.setAddComplete();} catch (Throwable t) {}}final void setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return;}}}

節點添加之后,調用handler的handlerAdded通知ChannelHandler,并嘗試設置context的狀態為ADD_COMPLETE
setAddComplete的退出只有兩種可能:
1.handler已經被移除 此時狀態為REMOVE_COMPLETE
2.成功設置Handler狀態為ADD_COMPLETE

initChannel的執行

現在我們已經添加了ChannelInitializer這個自定義的handler到Pipeline,但是我們真正想要添加到Pipeline的應該是定義在initChannel里的操作,那么這個方法又是在哪里執行的呢?
搜索一下ChannelInitializer代碼,我們可以發現initChannel()方法在channelRegistered和handlerAdded方法里都有調用,那么實際是在哪個方法回調里執行的呢?我們從客戶端的啟動開始再次分析下流程

調用鏈如下:
Bootstrap.doResolveAndConnect()
–>AbstractBootstrap.initAndRegister()
–>SingleThreadEventLoop.register(channel)
–>SingleThreadEventLoop.register(promise)
–>AbstractChannel.register(eventLoop, promise)
–>AbstractChannel.register0(promise)
–>AbstractNioChannel.doRegister()

我們看下這里的register0方法

private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}

在注冊成功之后會先調用pipeline.invokeHandlerAddedIfNeeded,再調用pipeline.fireChannelRegistered()方法
先來看下invokeHandlerAddedIfNeeded方法:

final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();if (firstRegistration) {firstRegistration = false;// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,// that were added before the registration was done.callHandlerAddedForAllHandlers();}}

這里的firstRegistration=true,所以會執行對應的分支,說明當注冊channel到eventLoop成功后,就可以通過handlerAdded回調方法去添加的我們定義channelHandler , 接下去看看這個方法是怎么做的

private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;// This Channel itself was registered.registered = true;pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;// Null out so it can be GC'ed.this.pendingHandlerCallbackHead = null;}// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside// the EventLoop.PendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {task.execute();task = task.next;}}

先會拿到pendingHandlerCallbackHead(實現了runnable接口)對象,它就是前面在執行addLast(ChannelInitializer)時創建的,后面就是遍歷這個鏈表,調用節點的execute()執行

@Overridevoid execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {executor.execute(this); }}private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {}}final void callHandlerAdded() throws Exception {// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates// any pipeline events ctx.handler() will miss them because the state will not allow it.if (setAddComplete()) {handler().handlerAdded(this);}}

后面的邏輯就非常清晰了,獲取ChannelInitinal的handler并調用其handlerAdded方法,從而執行我們實現的initChannel方法,嘗試將我們自己定義的channelHandler添加到Pipeline,也就是會執行如下代碼:

socketChannel.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast("frameEncoder", new LengthFieldPrepender(4))//對象參數類型編碼器.addLast("encoder", new ObjectEncoder())// 對象參數類型解碼器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(myHandler);}

前面說到執行完invokeHandlerAddedIfNeeded,后面還有一個fireChannelRegistered的方法方法回調,我們來簡單看下,代碼如下:

public final ChannelPipeline fireChannelRegistered() {AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRegistered();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});}}private void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {//對于Head節點,這里會執行這個分支代碼fireChannelRegistered();}}

也就是說會從Head節點開始一直向下遍歷,找到每個Inbound屬性為true的節點,然后調用其channelRegistered方法做回調通知

此時我們就會得到如下的一個Pipeline(這里只畫最后的自定義handler)

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {//移除當前的ChannelIntinalizer節點remove(ctx);}return true;}return false;}

添加完上述的channelHandler之后,還有最后一步remove(ctx),也就是說會將當前的context(ChannelInitializer從Pipeline移除),就剩下如下的Pipeline雙向鏈表了

移除的邏輯和添加類似,就不再講了

總結

1.Pipeline底層是一個雙向鏈表結構,添加和刪除節點就是維護這個雙向鏈表
2.Pipeline每個節點context都會有一個唯一name,默認是HandlerClassName+"#0"
3.客戶端初始化添加用戶自定義Handler到Pipeline,會先添加ChannelInitializer到Pipeline,在調用jdk底層NIO API完成注冊后會添加自定義的Handler到Pipeline,然后從Pipeline移除ChannelInitializer

總結

以上是生活随笔為你收集整理的Netty学习笔记(五)Pipeline的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。