七、pipeLine概述
pipeline的初始化
pipeline在創建Channel的時候被創建
在前面幾節可以看到在服務端和客戶端創建Channel的時候都會調用AbstractChannel構造方法創建Pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
進入newChannelPipeline方法,最終會調用DefaultChannelPipeline()方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在該方法中創建了tail和head,并指明兩者關系為雙向鏈表結構。
并將Channel和Pipeline綁定。
Pipeline節點數據結構:ChannelHandlerContext
ChannelHandlerContext繼承自三個接口AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker。
AttributeMap中為一個Map結構,存放了一些屬性。
ChannelInboundInvoker和ChannelOutboundInvoker定義了一些事件如傳播讀事件、綁定事件、異常事件等。
Pipeline中的兩大哨兵:head和tail
在初始化ChannelPipeline時,我們看到在其中創建了HeadContext和TailContext。
TailContext實現了ChannelInboundHandler,說明是一個inbound處理器,再看一下構造函數:inbound標識為true
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME,inbound: true, outbound: false);
setAddComplete();
}
//本身是一個節點,包含的處理器也是自身
public ChannelHandler handler() {
return this;
}
HeadContext實現了ChannelOutboundHandler和ChannelInboundHandler:但在構造函數中inbound標識為false,outbound標識為true
//unsafe主要是讀寫等相關的操作
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
//設置為Channel的unsafe
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
添加和刪除ChannelHandler
添加ChannelHandler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//流水線管理子通道中的Handler處理器
//向子通道流水線中添加一個Handler處理器
ch.pipeline().addLast(new SimpleNettyServerHandler());
}
});
跟進addlast方法,到達DefaultChannelPipeline.addlast():
判斷是否重復添加checkMultiplicity(handler);
如果未添加并且不是可共享(利用反射獲取注解Sharable 判斷),則將added屬性置為true
創建節點并添加至鏈表
創建節點
newCtx = newContext(group, filterName(name, handler), handler);
filterName方法
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
//創建一個名字
return generateName(handler);
}
//檢查是否重復
checkDuplicateName(name);
return name;
}
newContext方法返回DefaultChannelHandlerContext.
new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
添加節點
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
回調完成添加事件
//在此處,若executor 為null則獲取channel().eventLoop()賦值給executor
EventExecutor executor = newCtx.executor();
//判斷是否為當前線程
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
//在當前線程則直接執行
callHandlerAdded0(newCtx);
在此處添加handler,
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
最終調用的為ChannelInitializer的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
initChannel(ctx)回調至初始化添加代碼,將自定義的Handler添加到Channel上。
最終移除ChannelHandlerContext
刪除ChannelHandler
找到節點
getContextOrDie(handler);
---》
//遍歷查找節點并返回
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
鏈表刪除節點
assert ctx != head && ctx != tail;
//標準鏈表刪除
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
回調刪除Handler事件
//獲取當前executor
EventExecutor executor = ctx.executor();
//判斷是否在當前線程
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
callHandlerRemoved0(ctx);
----》回調到Handler的handlerRemoved方法
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
事件的傳播
Context 包裝 handler,多個 Context 在 pipeline 中形成了雙向鏈表,入站方向叫 inbound,由 head 節點開始,出站方法叫 outbound ,由 tail 節點開始。而節點中間的傳遞通過 AbstractChannelHandlerContext 類內部的 fire 系列方法,找到當前節點的下一個節點不斷的循環傳播。
Handler涉及的環節包括:數據包解碼、業務處理、目標數據編碼、把數據寫入到通道中,有出站和入站兩種操作類型:
如下圖:
入站處理,觸發的方向為:自底向上,由Netty的內部(如通道)到ChannelInboundHandler入站處理器。
出站方向,觸發的方向為:自頂向下,從ChannelOutboundHandler出站處理器到Netty的內部(如通道)。
按照這種方向來分,前面數據包解碼、業務處理這兩個環節屬于入站處理器的工作,后面目標數據編碼、把數據包寫到通道中屬于出站處理器的工作。
ChannelInboundHandler通道入站處理器
當數據或者信息如占到Netty通道時,Netty將觸發入站處理器ChannelInboundHandler對應的入站API,進行入站操作。
ChannelInboundHandler的主要操作如下:
inbound事件的傳播
以ChannelRead事件為例
handler之間的傳播信息通過fireXXX方法:其區別是從哪個節點開始傳播。
ctx.fireChannelRead(msg); 從當前節點往下傳播事件
ctx.channel().pipeline().fireChannelRead(msg);從頭節點HeadContext開始傳播
新建一個ChannelInboundHandler,如下:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandlerA: "+msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().pipeline().fireChannelRead("hello world");
}
}
在ChannelActive處打上斷點,進入:
從head開始傳播:
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
....
//此處next為headContext,調用headContext的involveChannelRead方法:
next.invokeChannelRead(m);
....
((ChannelInboundHandler) handler()).channelRead(this, msg);
....
//沒有做處理,繼續向下傳播
ctx.fireChannelRead(msg);
....
invokeChannelRead(findContextInbound(), msg);
看一下findContextInbound()方法,找到下一個Inbound的ctx
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
繼續向下:
next.invokeChannelRead(m);
....
//此處調用的為InboundHandlerA 的ChannelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg)
繼續向下,可以看到
此處findContextInbound返回的為TailContext
invokeChannelRead(findContextInbound(), msg);
....
//未處理信息的處理
onUnhandledInboundMessage(msg){
....
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//釋放資源
ReferenceCountUtil.release(msg);
}
....
}
Tips: SimpleInboundHandler會幫你自動釋放資源
outbound事件的傳播
當業務處理完成后,需要操作Java NIO底層通道時,通過一系列的ChannelOutboundHandler通道出站處理器,完成Netty通道到底層的操作。比方說,建立底層連接、斷開底層連接、寫入底層Java NIO通道等。ChannelOutboundHandler接口定義了大部分的出站操作,如下:
出站處理的方向是通過上層Netty通道去操作底層Java IO通道。
public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerA: " + msg);
ctx.write(msg, promise);
}
//模擬向客戶端發送消息
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
//從頭結點開始傳播
ctx.channel().write("hello world");
//從當前節點開始往下傳播ctx.write("hello world");
},3, TimeUnit.SECONDS);
}
}
在handlerAdded處打上斷點,跟進:
pipeline.write(msg);
....
//調用tailcontext的write方法
tail.write(msg);
....
//newPromise包裝Channel
write(msg, newPromise());
....
write(msg, false, promise);
....
AbstractChannelHandlerContext next = findContextOutbound(){
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
....
next.invokeWrite(m, promise);
調用到自定義的OutboundHandlerA的write方法。
當處理完后,繼續ctx.write方法則會調用到HeadContext的unsafe.write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
總結
以上是生活随笔為你收集整理的七、pipeLine概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dom和bom
- 下一篇: mac 无法识别seagate硬盘、无