日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ServerBootstrap的启动流程

發布時間:2025/3/19 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ServerBootstrap的启动流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、ServerBootstrap的啟動示例代碼?

EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("bossThread",false));EventLoopGroup workEventLoopGroup = new NioEventLoopGroup(new NamedThreadFactory("workThread",false));ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossEventLoopGroup,workEventLoopGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).handler(new LoggingHandler(LogLevel.INFO)).childHandler( new PojoServerIntitlizer());try {log.debug(" will start server");ChannelFuture closeFuture = bootstrap.bind(port).sync();log.debug(" server is closing");closeFuture.channel().closeFuture().sync();log.debug(" server is closed");} catch (InterruptedException e) {e.printStackTrace();}finally {bossEventLoopGroup.shutdownGracefully();workEventLoopGroup.shutdownGracefully();log.debug(" release event loop group");}

二、初始化流程

1.ServerBootstrap和bootStrap繼承于AbstractBootStrap,都使用模板方式。AbstractBootStrap類有幾個比較重要的成員變量。

group:ServerSocketChannel的EventLoopGroup,即reactor-accept的BOSS事件循環線程池組。

channelFactory:創建channel的工廠類,如創建ServerSocketChannel,SocketChannel,

handler:reactor-accept的事件處理類。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private volatile ChannelHandler handler;

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;

? 2.設置參數流程。

bootstrap.group(bossEventLoopGroup,workEventLoopGroup) bossEventLoopGroup設置AbstractBootStrap的group,workEventLoopGroup設置為ServerBootstrap的childGroup,

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;} channel(NioServerSocketChannel.class) 生成一個根據指定SocketChannel類的反射CHANNEL工廠類,就是根據傳入的類反射生成對象。這是設置AbstractBootStrap的類的channelFactory public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));} .handler(new LoggingHandler(LogLevel.INFO)) 這是設置AbstractBootStrap的類的handler public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");return self();} .childHandler( new PojoServerIntitlizer());這是設置ServerBootstrap子類的childHandler public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}

三、綁定流程

1.ChannelFuture closeFuture = bootstrap.bind(port).sync();這為起點,會調用AbstractBootStrap的doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;}

2.這里面首先調用initAndRegister,會先創建ServerSocketChannel,并且將channel注冊到NioEventLoop的selector中,也就是前文的channel.register(selector,0,eventLoop)

final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {}ChannelFuture regFuture = config().group().register(channel);return regFuture;}

3.newChannel就是通過前面生成的ReflectChannelFactory調用反射生成一個NioServerSocketChannel對象,這里看一下類的繼承圖,

?AbstractChannel 有兩個成員變量unsafe和pipline,unsafe為操作底層網絡IO的接口。pipline也就是我們的管道事件流。也就是說每個channel會自帶一個pipline

protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}

?3.創建完channel后,我們來看下初始化流程,這個會調用到子類(ServerBootStrap)的init方法,

這個會做兩個事件,

? ?(1).設置channel的連接選項和屬性。

? ?(2).添加新的handler

這里會為ServerSocketChannel的管道中新增一個handler,用來處理accept-reactor的IO流讀取事件,也就是新連接事件。ServerBootstrapAcceptor這個類就是來處理新連接,并注冊accept的socketChannel并注冊到childGroup的work事件循環以及內部eventLoop的selector中,并且設置子socketChannel的handler為childHandler.這個我們留在接收新連接的流程中講解。

void init(Channel channel) {setChannelOptions(channel, this.newOptionsArray(), logger);setAttributes(channel, (Entry[])this.attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}

4.現在回到第2步,初始化完成了,現在開始將serverSocketChannel注冊到bossEventLoop的selector中。ChannelFuture regFuture = config().group().register(channel);這個會調用MultithreadEventLoopGroup.register,也就是從事件循環組的子eventLoop中取下一個NioEventLoop進行注冊分配。也就是work-reactor的channel注冊機制。

@Overridepublic EventLoop next() {return (EventLoop) super.next();}@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

?5.我們的NioEventLoop為SingleThreadEventLoop,這個會將channel,當前的eventLoop對象封裝成一個promise,進行注冊。

public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); }

6.接著找到 ServerSocketChannel的內部的unsafe對象進行注冊。這個unsafe就是NioMessageUnsafe

public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();} }

7.由于NioMessageUnsafe繼承于AbstractUnsafe,所以調用此類的register,這個方法就是首先調用Channel.register,然后觸發管道的handlerAdd,channelRegister,channelActive方法。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} }private void register0(ChannelPromise promise) {try {boolean firstRegistration = this.neverRegistered;AbstractChannel.this.doRegister();this.neverRegistered = false;AbstractChannel.this.registered = true;AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();this.safeSetSuccess(promise);AbstractChannel.this.pipeline.fireChannelRegistered();if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}}}}

8.AbstractChannel.this.doRegister();這個就是把ServerSocketChannel注冊到NioEventLoop的selector中去。這時還沒有監聽事件。

AbstractNioChannelprotected void doRegister() throws Exception {boolean selected = false;while(true) {try {this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} }}

生成的selectKey如下:?

9.invokeHandlerAddedIfNeeded會調用到父類CHANNEL管道的channelHandler,也就是在我們初始化時第三步加入的handler

?在這里才會真正觸發handler的initChannel方法,生成第三步的ServerBootstrapAcceptor,初始化處理新的子連接的channelReader的處理器。

?10.現在初始化和注冊完成了,我們再回到第一步,進行doBind0(regFuture, channel, localAddress, promise)

AbstractBootstrap private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

11.channel.bind,會調用內部管道的bind,然后到tail的BIND,再到AbstractChannelHandlerContext的bind,因為tail就是繼承于AbstractChannelHandlerContext,這里面的BIND就是從尾部向前找,找一個帶有bind標簽的HANDLER進行處理。最終找到了header.ChannelHandlerContext(DefaultChannelPipeline$HeadContext#0, [id: 0x159b9902])

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();next.invokeBind(localAddress, promise);return promise;}

12.最終到了head的BIND,這里面調用了unsafe本地IO類的bind.

final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}

13.最終調用到AbstractChannel的內部類AbstractUnsafe的bind,

@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireChannelActive();}});}safeSetSuccess(promise);}

?14.接著調用到了AbstractUnsafe的外部類的繼承類的doBind方法,也就是NioServerSOcketChannel.doBind,這個就是調用原生的ServeSocketChannel進行bind.

protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}

15.綁定成功后會調用pipeline.fireChannelActive();這個是從頭節點一直向后傳遞事件。在頭節點的

channelActive方法中會觸發readIfIsAutoRead @Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelActive();}});}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();}

?16.readIfIsAutoRead就是channel如果開啟了自動讀,則開始設置讀關注事件到eventLoop的selector中。這個讀會從管道的讀-》tail的讀->一直向前找支持讀的handler進行處理。這個就是找到了head的handler

private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {public void read(ChannelHandlerContext ctx) {unsafe.beginRead();}

?17.headHandler的beginRead就調用了unsafe.beginRead方法,這個會調用外部channel.

doBeginRead--》AbstractNioChannel AbstractNioChannel protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

這個就是將讀事件加到channel的selectKey的關注事件中。

?

?四、接收新連接流程

1.我們回到上一節的3,9步,為父類NioServerSocketChannel的管道中新增了一個handler,

ServerBootstrapAcceptor,用來處理父類的接收數據,也就是新連接請求。當有新連接進來時,會觸發到ServerBootstrapAcceptor.channelRead

2.ServerBootstrapAcceptor.channelRead 方法是怎么被調用的呢? 其實當一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會調用到 NioServerSocketChannel.doReadMessages:

NioServerSocketChannel protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

在 doReadMessages 中, 通過 javaChannel().accept() 獲取到客戶端新連接的 SocketChannel, 接著就實例化一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創建的這個 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .

?3.ServerBootstrapAcceptor.channelRead方法,這個就是把子channel的管道添加子handler,并且把子channel注冊到子group的eventLoop的selector中。流程跟父類channel注冊一樣。

public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child)}

?4.還有個讀完成事件,里面跟上一節的第16步一樣,都是將當前channel的selectKey中加入讀關注事件,以便接收讀數據。

@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.fireChannelReadComplete();readIfIsAutoRead();}

?當然這個事件會反復調用,然后判斷加過讀事件,就不重復加了。

總結

以上是生活随笔為你收集整理的ServerBootstrap的启动流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。