日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Netty学习笔记(二)Netty服务端流程启动分析

發布時間:2024/4/11 58 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty学习笔记(二)Netty服务端流程启动分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

先貼下在NIO和Netty里啟動服務端的代碼

public class NioServer { /*** 指定端口號啟動服務* */public boolean startServer(int port){try {selector = Selector.open();//打開監聽通道ServerSocketChannel server = ServerSocketChannel.open();//默認configureBlocking為true,如果為 true,此通道將被置于阻塞模式;如果為 false.則此通道將被置于非阻塞模式server.configureBlocking(false);//監聽客戶端連接請求server.register(selector, SelectionKey.OP_ACCEPT);//綁定端口server.bind(new InetSocketAddress(this.port));System.out.println("服務端啟動成功,監聽端口:" + port);}catch (Exception e){System.out.println("服務器啟動失敗");return false;}return true;} } //定義主線程池EventLoopGroup bossGroup = new NioEventLoopGroup();//定義工作線程池EventLoopGroup workerGroup = new NioEventLoopGroup();//類似于ServerSocketServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class)//定義工作線程的處理函數.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉化對應的傳輸數據 從字節流到目標對象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})//boss線程池的最大線程數.option(ChannelOption.SO_BACKLOG, 128)//工作線程保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true);//綁定端口啟動netty服務端ChannelFuture future = serverBootstrap.bind(ZKConfig.SERVER_PORT).sync();

和客戶端的EventLoopGroup不同,服務端需要指定兩個EventLoopGroup,這是因為服務端需要兩個線程池,bossGroup--用于處理客戶端的連接請求;另一個workerGroup,用于處理與各個客戶端連接的IO 操作(這與Reactor模型有關)

前面的方法都是對各個對象進行賦值,先從啟動的方法開始看:

------AbstractBootStrap public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort)); }public ChannelFuture bind(SocketAddress localAddress) {//檢驗group,channelFactory等必須屬性是否有賦值this.validate();if (localAddress == null) {throw new NullPointerException("localAddress");} else {return this.doBind(localAddress);}}private ChannelFuture doBind(final SocketAddress localAddress) {//進行channel的初始化和注冊final ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();...ChannelPromise promise = channel.newPromise();//執行實際的端口綁定doBind0(regFuture, channel, localAddress, promise);...}

這里有兩個重要的方法initAndRegister()和doBind0()

initAndRegister

先來看下initAndRegister方法,大致可以分成以下三個步驟:

  • 創建channel
  • 初始化channel
  • 注冊channel到Selector
final ChannelFuture initAndRegister() {Channel channel = null;try {//通過channelFactory創建Channelchannel = this.channelFactory.newChannel();//初始化channel并預設參數this.init(channel);} catch (Throwable var3) {...}//注冊channel到SelectorChannelFuture regFuture = this.config().group().register(channel);...return regFuture;}

Channel創建

channel = this.channelFactory.newChannel();

調用了ReflectiveChannelFactory的newChannel方法,最后通過反射創建了channel對象

public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");} else {this.clazz = clazz;}}public T newChannel() {try {//調用clazz默認的構造方法返回channel對象,這里的clazz就是上面構造函數里的clazzreturn (Channel)this.clazz.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}

這里創建的channel就是如下代碼設置的Class

源碼里的channel方法如下:

public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));} }

NioServerSocketChannel的構造方法如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {//先通過newSocket方法創建channel,然后調用重載的this方法this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a server socket.", var2);}}//最后是調用了SelectorProvider的openServerSocketChannel方法 public ServerSocketChannel openServerSocketChannel() throws IOException {//調用了java nio包創建了一個ServerSocketChannel對象return new ServerSocketChannelImpl(this);}//創建一個NioServerSocketChannelConfig的配置對象public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {super((Channel)null, channel, 16);//這里this.javaChannel().socket()其實就是調用了Java底層的nio包的api拿到ServerSocketthis.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this,this.javaChannel().socket());}

上面的super方法會調用NioServerSocketChannel父類的構造方法直到AbstractChannel類(這是所有Channel的頂層父類)
初始化了id,unsafe.pipeline三個對象

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { //調用父類--AbstractChannel的構造方法,這里的parent=null//readInterestOp是SelectionKey.OP_ACCEPT事件super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {//設置channel的阻塞模式 使用非阻塞模式 你可以在nio的demo也看到這樣的代碼ch.configureBlocking(false);} catch (IOException var7) {try {ch.close();} catch (IOException var6) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", var6);}}throw new ChannelException("Failed to enter non-blocking mode.", var7);}} protected AbstractChannel(Channel parent) {this.parent = parent;this.id = this.newId();this.unsafe = this.newUnsafe();this.pipeline = this.newChannelPipeline();}

初始化Channel

看下ServerBootStrap的實現

void init(Channel channel) throws Exception {//1.獲得options和attrsMap<ChannelOption<?>, Object> options = this.options0();synchronized(options) {channel.config().setOptions(options);}Map<AttributeKey<?>, Object> attrs = this.attrs0();synchronized(attrs) {Iterator i$ = attrs.entrySet().iterator();while(true) {if (!i$.hasNext()) {break;}Entry<AttributeKey<?>, Object> e = (Entry)i$.next();AttributeKey<Object> key = (AttributeKey)e.getKey();channel.attr(key).set(e.getValue());}}//2.創建pipeline并獲取childOptions和childAttrsChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;final Entry[] currentChildOptions;synchronized(this.childOptions) {currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));}final Entry[] currentChildAttrs;synchronized(this.childAttrs) {currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));}//3.為pipeline添加ChannelHandlerp.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}

主要可以分為四步:

1.獲取我們在ServerBootstrap里設置的options和attrs屬性 并給channel和config賦值
2.設置childOptions和childAttrs
3.為pipeline添加ChannelHandler
4.添加ChannelInitializer執行器,為添加ServerBootstrapAcceptor連接器做準備

強調一下這里是先添加了一個ChannelInitializer的handler,在它的initChannel方法里添加ServerBootstrapAcceptor
ServerBootstrapAcceptor連接器的真正添加是在register完成之后的回調里進行的,我們就先簡單看下連接器里有哪些屬性和主要行為

在連接器里把channel和對應的workGroup的eventLoop進行了綁定,在ServerBootStrap的channelRead方法里

ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...try {//這里的childGroup是從構造方法傳入的workerGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}

ServerBootstrapAcceptor 中的 childGroup 是在構造方法里傳入的 currentChildGroup,也就是 workerGroup 對象。
而這里的 Channel 是NioSocketChannel 的實例,因此這里的childGroup 的 register()方法就是將 workerGroup 中的
某個 EventLoop 和 NioSocketChannel 關聯上了。(至于這里的channelRead怎么調用到的放到最后做一個分析,最好看完EventLoopGroup 和 Pipeline相關的源碼有個基本了解后再看)。

在channel上進行注冊register

執行? ? ? ?

ChannelFuture regFuture = this.config().group().register(channel);

最后還是會調用AbstractChannel里的register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");} else if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {//把channel和當前的EventLoop進行綁定,這里的eventLoop是來自于前面定義的BossGroupAbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {//核心方法this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}} private void register0(ChannelPromise promise) {try {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;//這里的invokeHandlerAddedIfNeeded會回調ChannelInitializer的handlerAdded反方,從而將ServerBootstrapAcceptor真正添加到Pipeline中pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();//這里的isActive()方法會返回false,底層調用NioServerSocketChannel的isActive判斷,此時bind操作還沒完成if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}-----NioServerSocketChannel@Overridepublic boolean isActive() {return javaChannel().socket().isBound();}

最終調用了AbstractNioChannel里的doRegister方法 來進行事件注冊

protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) { eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}

完成事件注冊之后,會調用pipeline.invokeHandlerAddedIfNeeded(); 從而調用ChannelInitializer的initChannel方法(這里的執行流程在Netty學習筆記(五)Pipeline?一文里有介紹,就不詳細講了),也就是執行如下代碼,真正將ServerBootstrapAcceptor添加到Pipeline

public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});

doBind()端口綁定

調用的時序圖如下:

這里的調用鏈其實就是Pipeline的一個傳播過程,感興趣的可以看下:Netty學習筆記(五)Pipeline,Netty學習筆記(六)Pipeline的傳播機制

最后調用的AbstractChannel bind方法源碼如下:

public final void bind(SocketAddress localAddress, ChannelPromise promise) {this.assertEventLoop();if (promise.setUncancellable() && this.ensureOpen(promise)) {...//wasActive=falseboolean wasActive = AbstractChannel.this.isActive();try {AbstractChannel.this.doBind(localAddress);} catch (Throwable var5) {this.safeSetFailure(promise, var5);this.closeIfClosed();return;}//執行完上述的doBind方法后,isActive()方法返回true,會調用fireChannelActive進行回調通知if (!wasActive && AbstractChannel.this.isActive()) {this.invokeLater(new Runnable() {public void run() {AbstractChannel.this.pipeline.fireChannelActive();}});}this.safeSetSuccess(promise);}}

然后調用了NioServerSocketChannel的doBind0方法,在這里調用了java 底層的nio 包

private void doBind0(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {this.javaChannel().bind(localAddress);} else {this.javaChannel().socket().bind(localAddress);}}

主要做了兩件事

1.調用jdk底層接口進行端口綁定

2.調用pipeline.fireChannelActive(); 進行回調通知

ServerBootstrapAcceptor分析

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

當服務端綁定端口啟動后,會啟動reactor線程(也就是NioEventLoop),reactor不斷檢測是否有新的事件發生,直到檢測出有accept事件發生,會調用unsafe.read()方法

進入到對應的實現類(AbstractNioMessageChannel.NioMessageUnsafe)

private final List<Object> readBuf = new ArrayList<Object>(); public void read() {assert eventLoop().inEventLoop();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();boolean closed = false;try {do {//循環調用int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();}

首先注意到這里有一個do..while()循環,我們看下doReadMessage(readBuf)做了什么

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}

很簡單明了,就是調用jdk底層的accept()方法獲取Channel,然后Channel封裝成NioSocketChannel全部添加到readBuf集合里(注意這里的this是NioServerSocketChannel)

回到read()方法,接下來就是遍歷readBuf拿到各個channel,并執行pipeline.fireChannelRead(channel) 這里就是通過fireChannelRead來回調ServerBootstrapAcceptor的chanelRead();最后調用一個pipeline的channelReadComplete事件通知

我們再看下ServerBootstrapAcceptor的源碼

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}} }

這里主要就是做兩件事:

1.調用pipeline.addLast(childHandler)方法 添加執行器

.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉化對應的傳輸數據 從字節流到目標對象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})

先將對應的ChannelInitializer添加到Pipeline,真正執行initChannel方法在后面

2.?childGroup.register(child) 將channel和workGroup里的eventLoop進行綁定

這里的childGroup就是構造方法里傳入的workGroup,child 是NioSocketChannel對象(doReadMessages()方法里創建的)

跟下去會執行MultithreadEventLoopGroup的如下方法:

@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}

這里的next()是EventExecutorChooser事件執行器選擇器的方法,其作用就是從我們的workGroup線程組中選擇一個線程與channel綁定(實際就是從一個線程數組選擇下一個線程來綁定,具體的實現在EventLoopGroup的博客里有講到)

剩下的register調用大家都很熟悉了吧,就是調用

unsafe.register()-->AbstractChannel.AbstractUnsafe.register()-->register0()-->AbstractNioChannel.doRegister()

private void register0(ChannelPromise promise) {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered(); if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}} }

先是調用doRegister(),完成注冊過程,如下

protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}

將該channel綁定到一個eventLoop線程的selector上,后續該channel的事件輪詢,以及事件處理,異步task執行都是由此eventLoop(reactor)線程來負責

執行完doRegister()方法之后我們又看到了熟悉的pipeline.invokeHandlerAddedIfNeeded(); 同樣的,也是回調ChannelInitializer的handlerAdded方法從而執行對應的initChannel,此時才是真正將用戶自己定義的各種handler添加到Pipeline。這樣在后續的事件通知中會通過回調方法執行各個handler里的業務邏輯

服務端啟動總結:

  • 設置啟動類參數,最重要的就是設置channel
  • 創建服務端對應的channel和各大組件,包括ChannelConfig,ChannelPipeline,Unsafe等
  • 初始化Channel,設置一些attr,option,以及設置子channel的attr,option,向pipeline添加接入器ServerBootstrapAcceptor,觸發用戶自定義ChannelHandler的添加和register操作
  • 調用到jdk底層bind方法實現端口綁定,并觸發channelActive事件,做事件注冊

總結

以上是生活随笔為你收集整理的Netty学习笔记(二)Netty服务端流程启动分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。