日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Netty学习笔记(二)Netty服务端流程启动分析

發(fā)布時(shí)間:2024/4/11 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习笔记(二)Netty服务端流程启动分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

先貼下在NIO和Netty里啟動(dòng)服務(wù)端的代碼

public class NioServer { /*** 指定端口號(hào)啟動(dòng)服務(wù)* */public boolean startServer(int port){try {selector = Selector.open();//打開監(jiān)聽通道ServerSocketChannel server = ServerSocketChannel.open();//默認(rèn)configureBlocking為true,如果為 true,此通道將被置于阻塞模式;如果為 false.則此通道將被置于非阻塞模式server.configureBlocking(false);//監(jiān)聽客戶端連接請(qǐng)求server.register(selector, SelectionKey.OP_ACCEPT);//綁定端口server.bind(new InetSocketAddress(this.port));System.out.println("服務(wù)端啟動(dòng)成功,監(jiān)聽端口:" + port);}catch (Exception e){System.out.println("服務(wù)器啟動(dòng)失敗");return false;}return true;} } //定義主線程池EventLoopGroup bossGroup = new NioEventLoopGroup();//定義工作線程池EventLoopGroup workerGroup = new NioEventLoopGroup();//類似于ServerSocketServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class)//定義工作線程的處理函數(shù).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉(zhuǎn)化對(duì)應(yīng)的傳輸數(shù)據(jù) 從字節(jié)流到目標(biāo)對(duì)象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關(guān)的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})//boss線程池的最大線程數(shù).option(ChannelOption.SO_BACKLOG, 128)//工作線程保持長(zhǎng)連接.childOption(ChannelOption.SO_KEEPALIVE, true);//綁定端口啟動(dòng)netty服務(wù)端ChannelFuture future = serverBootstrap.bind(ZKConfig.SERVER_PORT).sync();

和客戶端的EventLoopGroup不同,服務(wù)端需要指定兩個(gè)EventLoopGroup,這是因?yàn)榉?wù)端需要兩個(gè)線程池,bossGroup--用于處理客戶端的連接請(qǐng)求;另一個(gè)workerGroup,用于處理與各個(gè)客戶端連接的IO 操作(這與Reactor模型有關(guān))

前面的方法都是對(duì)各個(gè)對(duì)象進(jìn)行賦值,先從啟動(dòng)的方法開始看:

------AbstractBootStrap public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort)); }public ChannelFuture bind(SocketAddress localAddress) {//檢驗(yàn)group,channelFactory等必須屬性是否有賦值this.validate();if (localAddress == null) {throw new NullPointerException("localAddress");} else {return this.doBind(localAddress);}}private ChannelFuture doBind(final SocketAddress localAddress) {//進(jìn)行channel的初始化和注冊(cè)final ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();...ChannelPromise promise = channel.newPromise();//執(zhí)行實(shí)際的端口綁定doBind0(regFuture, channel, localAddress, promise);...}

這里有兩個(gè)重要的方法initAndRegister()和doBind0()

initAndRegister

先來看下initAndRegister方法,大致可以分成以下三個(gè)步驟:

  • 創(chuàng)建channel
  • 初始化channel
  • 注冊(cè)channel到Selector
final ChannelFuture initAndRegister() {Channel channel = null;try {//通過channelFactory創(chuàng)建Channelchannel = this.channelFactory.newChannel();//初始化channel并預(yù)設(shè)參數(shù)this.init(channel);} catch (Throwable var3) {...}//注冊(cè)channel到SelectorChannelFuture regFuture = this.config().group().register(channel);...return regFuture;}

Channel創(chuàng)建

channel = this.channelFactory.newChannel();

調(diào)用了ReflectiveChannelFactory的newChannel方法,最后通過反射創(chuàng)建了channel對(duì)象

public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");} else {this.clazz = clazz;}}public T newChannel() {try {//調(diào)用clazz默認(rèn)的構(gòu)造方法返回channel對(duì)象,這里的clazz就是上面構(gòu)造函數(shù)里的clazzreturn (Channel)this.clazz.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}

這里創(chuàng)建的channel就是如下代碼設(shè)置的Class

源碼里的channel方法如下:

public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));} }

NioServerSocketChannel的構(gòu)造方法如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {//先通過newSocket方法創(chuàng)建channel,然后調(diào)用重載的this方法this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a server socket.", var2);}}//最后是調(diào)用了SelectorProvider的openServerSocketChannel方法 public ServerSocketChannel openServerSocketChannel() throws IOException {//調(diào)用了java nio包創(chuàng)建了一個(gè)ServerSocketChannel對(duì)象return new ServerSocketChannelImpl(this);}//創(chuàng)建一個(gè)NioServerSocketChannelConfig的配置對(duì)象public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {super((Channel)null, channel, 16);//這里this.javaChannel().socket()其實(shí)就是調(diào)用了Java底層的nio包的api拿到ServerSocketthis.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this,this.javaChannel().socket());}

上面的super方法會(huì)調(diào)用NioServerSocketChannel父類的構(gòu)造方法直到AbstractChannel類(這是所有Channel的頂層父類)
初始化了id,unsafe.pipeline三個(gè)對(duì)象

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { //調(diào)用父類--AbstractChannel的構(gòu)造方法,這里的parent=null//readInterestOp是SelectionKey.OP_ACCEPT事件super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {//設(shè)置channel的阻塞模式 使用非阻塞模式 你可以在nio的demo也看到這樣的代碼ch.configureBlocking(false);} catch (IOException var7) {try {ch.close();} catch (IOException var6) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", var6);}}throw new ChannelException("Failed to enter non-blocking mode.", var7);}} protected AbstractChannel(Channel parent) {this.parent = parent;this.id = this.newId();this.unsafe = this.newUnsafe();this.pipeline = this.newChannelPipeline();}

初始化Channel

看下ServerBootStrap的實(shí)現(xiàn)

void init(Channel channel) throws Exception {//1.獲得options和attrsMap<ChannelOption<?>, Object> options = this.options0();synchronized(options) {channel.config().setOptions(options);}Map<AttributeKey<?>, Object> attrs = this.attrs0();synchronized(attrs) {Iterator i$ = attrs.entrySet().iterator();while(true) {if (!i$.hasNext()) {break;}Entry<AttributeKey<?>, Object> e = (Entry)i$.next();AttributeKey<Object> key = (AttributeKey)e.getKey();channel.attr(key).set(e.getValue());}}//2.創(chuàng)建pipeline并獲取childOptions和childAttrsChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;final Entry[] currentChildOptions;synchronized(this.childOptions) {currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));}final Entry[] currentChildAttrs;synchronized(this.childAttrs) {currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));}//3.為pipeline添加ChannelHandlerp.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}

主要可以分為四步:

1.獲取我們?cè)赟erverBootstrap里設(shè)置的options和attrs屬性 并給channel和config賦值
2.設(shè)置childOptions和childAttrs
3.為pipeline添加ChannelHandler
4.添加ChannelInitializer執(zhí)行器,為添加ServerBootstrapAcceptor連接器做準(zhǔn)備

強(qiáng)調(diào)一下這里是先添加了一個(gè)ChannelInitializer的handler,在它的initChannel方法里添加ServerBootstrapAcceptor
ServerBootstrapAcceptor連接器的真正添加是在register完成之后的回調(diào)里進(jìn)行的,我們就先簡(jiǎn)單看下連接器里有哪些屬性和主要行為

在連接器里把channel和對(duì)應(yīng)的workGroup的eventLoop進(jìn)行了綁定,在ServerBootStrap的channelRead方法里

ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...try {//這里的childGroup是從構(gòu)造方法傳入的workerGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}

ServerBootstrapAcceptor 中的 childGroup 是在構(gòu)造方法里傳入的 currentChildGroup,也就是 workerGroup 對(duì)象。
而這里的 Channel 是NioSocketChannel 的實(shí)例,因此這里的childGroup 的 register()方法就是將 workerGroup 中的
某個(gè) EventLoop 和 NioSocketChannel 關(guān)聯(lián)上了。(至于這里的channelRead怎么調(diào)用到的放到最后做一個(gè)分析,最好看完EventLoopGroup 和 Pipeline相關(guān)的源碼有個(gè)基本了解后再看)。

在channel上進(jìn)行注冊(cè)register

執(zhí)行? ? ? ?

ChannelFuture regFuture = this.config().group().register(channel);

最后還是會(huì)調(diào)用AbstractChannel里的register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");} else if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {//把channel和當(dāng)前的EventLoop進(jìn)行綁定,這里的eventLoop是來自于前面定義的BossGroupAbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {//核心方法this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}} private void register0(ChannelPromise promise) {try {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;//這里的invokeHandlerAddedIfNeeded會(huì)回調(diào)ChannelInitializer的handlerAdded反方,從而將ServerBootstrapAcceptor真正添加到Pipeline中pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();//這里的isActive()方法會(huì)返回false,底層調(diào)用NioServerSocketChannel的isActive判斷,此時(shí)bind操作還沒完成if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}-----NioServerSocketChannel@Overridepublic boolean isActive() {return javaChannel().socket().isBound();}

最終調(diào)用了AbstractNioChannel里的doRegister方法 來進(jìn)行事件注冊(cè)

protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) { eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}

完成事件注冊(cè)之后,會(huì)調(diào)用pipeline.invokeHandlerAddedIfNeeded(); 從而調(diào)用ChannelInitializer的initChannel方法(這里的執(zhí)行流程在Netty學(xué)習(xí)筆記(五)Pipeline?一文里有介紹,就不詳細(xì)講了),也就是執(zhí)行如下代碼,真正將ServerBootstrapAcceptor添加到Pipeline

public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});

doBind()端口綁定

調(diào)用的時(shí)序圖如下:

這里的調(diào)用鏈其實(shí)就是Pipeline的一個(gè)傳播過程,感興趣的可以看下:Netty學(xué)習(xí)筆記(五)Pipeline,Netty學(xué)習(xí)筆記(六)Pipeline的傳播機(jī)制

最后調(diào)用的AbstractChannel bind方法源碼如下:

public final void bind(SocketAddress localAddress, ChannelPromise promise) {this.assertEventLoop();if (promise.setUncancellable() && this.ensureOpen(promise)) {...//wasActive=falseboolean wasActive = AbstractChannel.this.isActive();try {AbstractChannel.this.doBind(localAddress);} catch (Throwable var5) {this.safeSetFailure(promise, var5);this.closeIfClosed();return;}//執(zhí)行完上述的doBind方法后,isActive()方法返回true,會(huì)調(diào)用fireChannelActive進(jìn)行回調(diào)通知if (!wasActive && AbstractChannel.this.isActive()) {this.invokeLater(new Runnable() {public void run() {AbstractChannel.this.pipeline.fireChannelActive();}});}this.safeSetSuccess(promise);}}

然后調(diào)用了NioServerSocketChannel的doBind0方法,在這里調(diào)用了java 底層的nio 包

private void doBind0(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {this.javaChannel().bind(localAddress);} else {this.javaChannel().socket().bind(localAddress);}}

主要做了兩件事

1.調(diào)用jdk底層接口進(jìn)行端口綁定

2.調(diào)用pipeline.fireChannelActive(); 進(jìn)行回調(diào)通知

ServerBootstrapAcceptor分析

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

當(dāng)服務(wù)端綁定端口啟動(dòng)后,會(huì)啟動(dòng)reactor線程(也就是NioEventLoop),reactor不斷檢測(cè)是否有新的事件發(fā)生,直到檢測(cè)出有accept事件發(fā)生,會(huì)調(diào)用unsafe.read()方法

進(jìn)入到對(duì)應(yīng)的實(shí)現(xiàn)類(AbstractNioMessageChannel.NioMessageUnsafe)

private final List<Object> readBuf = new ArrayList<Object>(); public void read() {assert eventLoop().inEventLoop();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();boolean closed = false;try {do {//循環(huán)調(diào)用int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();}

首先注意到這里有一個(gè)do..while()循環(huán),我們看下doReadMessage(readBuf)做了什么

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}

很簡(jiǎn)單明了,就是調(diào)用jdk底層的accept()方法獲取Channel,然后Channel封裝成NioSocketChannel全部添加到readBuf集合里(注意這里的this是NioServerSocketChannel)

回到read()方法,接下來就是遍歷readBuf拿到各個(gè)channel,并執(zhí)行pipeline.fireChannelRead(channel) 這里就是通過fireChannelRead來回調(diào)ServerBootstrapAcceptor的chanelRead();最后調(diào)用一個(gè)pipeline的channelReadComplete事件通知

我們?cè)倏聪耂erverBootstrapAcceptor的源碼

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}} }

這里主要就是做兩件事:

1.調(diào)用pipeline.addLast(childHandler)方法 添加執(zhí)行器

.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉(zhuǎn)化對(duì)應(yīng)的傳輸數(shù)據(jù) 從字節(jié)流到目標(biāo)對(duì)象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關(guān)的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})

先將對(duì)應(yīng)的ChannelInitializer添加到Pipeline,真正執(zhí)行initChannel方法在后面

2.?childGroup.register(child) 將channel和workGroup里的eventLoop進(jìn)行綁定

這里的childGroup就是構(gòu)造方法里傳入的workGroup,child 是NioSocketChannel對(duì)象(doReadMessages()方法里創(chuàng)建的)

跟下去會(huì)執(zhí)行MultithreadEventLoopGroup的如下方法:

@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

這里的next()是EventExecutorChooser事件執(zhí)行器選擇器的方法,其作用就是從我們的workGroup線程組中選擇一個(gè)線程與channel綁定(實(shí)際就是從一個(gè)線程數(shù)組選擇下一個(gè)線程來綁定,具體的實(shí)現(xiàn)在EventLoopGroup的博客里有講到)

剩下的register調(diào)用大家都很熟悉了吧,就是調(diào)用

unsafe.register()-->AbstractChannel.AbstractUnsafe.register()-->register0()-->AbstractNioChannel.doRegister()

private void register0(ChannelPromise promise) {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered(); if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}} }

先是調(diào)用doRegister(),完成注冊(cè)過程,如下

protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}

將該channel綁定到一個(gè)eventLoop線程的selector上,后續(xù)該channel的事件輪詢,以及事件處理,異步task執(zhí)行都是由此eventLoop(reactor)線程來負(fù)責(zé)

執(zhí)行完doRegister()方法之后我們又看到了熟悉的pipeline.invokeHandlerAddedIfNeeded(); 同樣的,也是回調(diào)ChannelInitializer的handlerAdded方法從而執(zhí)行對(duì)應(yīng)的initChannel,此時(shí)才是真正將用戶自己定義的各種handler添加到Pipeline。這樣在后續(xù)的事件通知中會(huì)通過回調(diào)方法執(zhí)行各個(gè)handler里的業(yè)務(wù)邏輯

服務(wù)端啟動(dòng)總結(jié):

  • 設(shè)置啟動(dòng)類參數(shù),最重要的就是設(shè)置channel
  • 創(chuàng)建服務(wù)端對(duì)應(yīng)的channel和各大組件,包括ChannelConfig,ChannelPipeline,Unsafe等
  • 初始化Channel,設(shè)置一些attr,option,以及設(shè)置子channel的attr,option,向pipeline添加接入器ServerBootstrapAcceptor,觸發(fā)用戶自定義ChannelHandler的添加和register操作
  • 調(diào)用到j(luò)dk底層bind方法實(shí)現(xiàn)端口綁定,并觸發(fā)channelActive事件,做事件注冊(cè)

總結(jié)

以上是生活随笔為你收集整理的Netty学习笔记(二)Netty服务端流程启动分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。