Netty学习笔记(二)Netty服务端流程启动分析
先貼下在NIO和Netty里啟動服務端的代碼
public class NioServer { /*** 指定端口號啟動服務* */public boolean startServer(int port){try {selector = Selector.open();//打開監聽通道ServerSocketChannel server = ServerSocketChannel.open();//默認configureBlocking為true,如果為 true,此通道將被置于阻塞模式;如果為 false.則此通道將被置于非阻塞模式server.configureBlocking(false);//監聽客戶端連接請求server.register(selector, SelectionKey.OP_ACCEPT);//綁定端口server.bind(new InetSocketAddress(this.port));System.out.println("服務端啟動成功,監聽端口:" + port);}catch (Exception e){System.out.println("服務器啟動失敗");return false;}return true;} } //定義主線程池EventLoopGroup bossGroup = new NioEventLoopGroup();//定義工作線程池EventLoopGroup workerGroup = new NioEventLoopGroup();//類似于ServerSocketServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class)//定義工作線程的處理函數.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉化對應的傳輸數據 從字節流到目標對象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})//boss線程池的最大線程數.option(ChannelOption.SO_BACKLOG, 128)//工作線程保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true);//綁定端口啟動netty服務端ChannelFuture future = serverBootstrap.bind(ZKConfig.SERVER_PORT).sync();和客戶端的EventLoopGroup不同,服務端需要指定兩個EventLoopGroup,這是因為服務端需要兩個線程池,bossGroup--用于處理客戶端的連接請求;另一個workerGroup,用于處理與各個客戶端連接的IO 操作(這與Reactor模型有關)
前面的方法都是對各個對象進行賦值,先從啟動的方法開始看:
------AbstractBootStrap public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort)); }public ChannelFuture bind(SocketAddress localAddress) {//檢驗group,channelFactory等必須屬性是否有賦值this.validate();if (localAddress == null) {throw new NullPointerException("localAddress");} else {return this.doBind(localAddress);}}private ChannelFuture doBind(final SocketAddress localAddress) {//進行channel的初始化和注冊final ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();...ChannelPromise promise = channel.newPromise();//執行實際的端口綁定doBind0(regFuture, channel, localAddress, promise);...}這里有兩個重要的方法initAndRegister()和doBind0()
initAndRegister
先來看下initAndRegister方法,大致可以分成以下三個步驟:
- 創建channel
- 初始化channel
- 注冊channel到Selector
Channel創建
channel = this.channelFactory.newChannel();
調用了ReflectiveChannelFactory的newChannel方法,最后通過反射創建了channel對象
public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");} else {this.clazz = clazz;}}public T newChannel() {try {//調用clazz默認的構造方法返回channel對象,這里的clazz就是上面構造函數里的clazzreturn (Channel)this.clazz.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}這里創建的channel就是如下代碼設置的Class
源碼里的channel方法如下:
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));} }NioServerSocketChannel的構造方法如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {//先通過newSocket方法創建channel,然后調用重載的this方法this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a server socket.", var2);}}//最后是調用了SelectorProvider的openServerSocketChannel方法 public ServerSocketChannel openServerSocketChannel() throws IOException {//調用了java nio包創建了一個ServerSocketChannel對象return new ServerSocketChannelImpl(this);}//創建一個NioServerSocketChannelConfig的配置對象public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {super((Channel)null, channel, 16);//這里this.javaChannel().socket()其實就是調用了Java底層的nio包的api拿到ServerSocketthis.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this,this.javaChannel().socket());}上面的super方法會調用NioServerSocketChannel父類的構造方法直到AbstractChannel類(這是所有Channel的頂層父類)
初始化了id,unsafe.pipeline三個對象
初始化Channel
看下ServerBootStrap的實現
void init(Channel channel) throws Exception {//1.獲得options和attrsMap<ChannelOption<?>, Object> options = this.options0();synchronized(options) {channel.config().setOptions(options);}Map<AttributeKey<?>, Object> attrs = this.attrs0();synchronized(attrs) {Iterator i$ = attrs.entrySet().iterator();while(true) {if (!i$.hasNext()) {break;}Entry<AttributeKey<?>, Object> e = (Entry)i$.next();AttributeKey<Object> key = (AttributeKey)e.getKey();channel.attr(key).set(e.getValue());}}//2.創建pipeline并獲取childOptions和childAttrsChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;final Entry[] currentChildOptions;synchronized(this.childOptions) {currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));}final Entry[] currentChildAttrs;synchronized(this.childAttrs) {currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));}//3.為pipeline添加ChannelHandlerp.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}主要可以分為四步:
1.獲取我們在ServerBootstrap里設置的options和attrs屬性 并給channel和config賦值
2.設置childOptions和childAttrs
3.為pipeline添加ChannelHandler
4.添加ChannelInitializer執行器,為添加ServerBootstrapAcceptor連接器做準備
強調一下這里是先添加了一個ChannelInitializer的handler,在它的initChannel方法里添加ServerBootstrapAcceptor
ServerBootstrapAcceptor連接器的真正添加是在register完成之后的回調里進行的,我們就先簡單看下連接器里有哪些屬性和主要行為
在連接器里把channel和對應的workGroup的eventLoop進行了綁定,在ServerBootStrap的channelRead方法里
ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...try {//這里的childGroup是從構造方法傳入的workerGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}ServerBootstrapAcceptor 中的 childGroup 是在構造方法里傳入的 currentChildGroup,也就是 workerGroup 對象。
而這里的 Channel 是NioSocketChannel 的實例,因此這里的childGroup 的 register()方法就是將 workerGroup 中的
某個 EventLoop 和 NioSocketChannel 關聯上了。(至于這里的channelRead怎么調用到的放到最后做一個分析,最好看完EventLoopGroup 和 Pipeline相關的源碼有個基本了解后再看)。
在channel上進行注冊register
執行? ? ? ?
ChannelFuture regFuture = this.config().group().register(channel);
最后還是會調用AbstractChannel里的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");} else if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {//把channel和當前的EventLoop進行綁定,這里的eventLoop是來自于前面定義的BossGroupAbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {//核心方法this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}} private void register0(ChannelPromise promise) {try {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;//這里的invokeHandlerAddedIfNeeded會回調ChannelInitializer的handlerAdded反方,從而將ServerBootstrapAcceptor真正添加到Pipeline中pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();//這里的isActive()方法會返回false,底層調用NioServerSocketChannel的isActive判斷,此時bind操作還沒完成if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}-----NioServerSocketChannel@Overridepublic boolean isActive() {return javaChannel().socket().isBound();}最終調用了AbstractNioChannel里的doRegister方法 來進行事件注冊
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) { eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}完成事件注冊之后,會調用pipeline.invokeHandlerAddedIfNeeded(); 從而調用ChannelInitializer的initChannel方法(這里的執行流程在Netty學習筆記(五)Pipeline?一文里有介紹,就不詳細講了),也就是執行如下代碼,真正將ServerBootstrapAcceptor添加到Pipeline
public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});doBind()端口綁定
調用的時序圖如下:
這里的調用鏈其實就是Pipeline的一個傳播過程,感興趣的可以看下:Netty學習筆記(五)Pipeline,Netty學習筆記(六)Pipeline的傳播機制
最后調用的AbstractChannel bind方法源碼如下:
public final void bind(SocketAddress localAddress, ChannelPromise promise) {this.assertEventLoop();if (promise.setUncancellable() && this.ensureOpen(promise)) {...//wasActive=falseboolean wasActive = AbstractChannel.this.isActive();try {AbstractChannel.this.doBind(localAddress);} catch (Throwable var5) {this.safeSetFailure(promise, var5);this.closeIfClosed();return;}//執行完上述的doBind方法后,isActive()方法返回true,會調用fireChannelActive進行回調通知if (!wasActive && AbstractChannel.this.isActive()) {this.invokeLater(new Runnable() {public void run() {AbstractChannel.this.pipeline.fireChannelActive();}});}this.safeSetSuccess(promise);}}然后調用了NioServerSocketChannel的doBind0方法,在這里調用了java 底層的nio 包
private void doBind0(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {this.javaChannel().bind(localAddress);} else {this.javaChannel().socket().bind(localAddress);}}主要做了兩件事
1.調用jdk底層接口進行端口綁定
2.調用pipeline.fireChannelActive(); 進行回調通知
ServerBootstrapAcceptor分析
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}當服務端綁定端口啟動后,會啟動reactor線程(也就是NioEventLoop),reactor不斷檢測是否有新的事件發生,直到檢測出有accept事件發生,會調用unsafe.read()方法
進入到對應的實現類(AbstractNioMessageChannel.NioMessageUnsafe)
private final List<Object> readBuf = new ArrayList<Object>(); public void read() {assert eventLoop().inEventLoop();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();boolean closed = false;try {do {//循環調用int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();}首先注意到這里有一個do..while()循環,我們看下doReadMessage(readBuf)做了什么
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}很簡單明了,就是調用jdk底層的accept()方法獲取Channel,然后Channel封裝成NioSocketChannel全部添加到readBuf集合里(注意這里的this是NioServerSocketChannel)
回到read()方法,接下來就是遍歷readBuf拿到各個channel,并執行pipeline.fireChannelRead(channel) 這里就是通過fireChannelRead來回調ServerBootstrapAcceptor的chanelRead();最后調用一個pipeline的channelReadComplete事件通知
我們再看下ServerBootstrapAcceptor的源碼
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}} }這里主要就是做兩件事:
1.調用pipeline.addLast(childHandler)方法 添加執行器
.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉化對應的傳輸數據 從字節流到目標對象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})先將對應的ChannelInitializer添加到Pipeline,真正執行initChannel方法在后面
2.?childGroup.register(child) 將channel和workGroup里的eventLoop進行綁定
這里的childGroup就是構造方法里傳入的workGroup,child 是NioSocketChannel對象(doReadMessages()方法里創建的)
跟下去會執行MultithreadEventLoopGroup的如下方法:
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}這里的next()是EventExecutorChooser事件執行器選擇器的方法,其作用就是從我們的workGroup線程組中選擇一個線程與channel綁定(實際就是從一個線程數組選擇下一個線程來綁定,具體的實現在EventLoopGroup的博客里有講到)
剩下的register調用大家都很熟悉了吧,就是調用
unsafe.register()-->AbstractChannel.AbstractUnsafe.register()-->register0()-->AbstractNioChannel.doRegister()
private void register0(ChannelPromise promise) {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered(); if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}} }先是調用doRegister(),完成注冊過程,如下
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}將該channel綁定到一個eventLoop線程的selector上,后續該channel的事件輪詢,以及事件處理,異步task執行都是由此eventLoop(reactor)線程來負責
執行完doRegister()方法之后我們又看到了熟悉的pipeline.invokeHandlerAddedIfNeeded(); 同樣的,也是回調ChannelInitializer的handlerAdded方法從而執行對應的initChannel,此時才是真正將用戶自己定義的各種handler添加到Pipeline。這樣在后續的事件通知中會通過回調方法執行各個handler里的業務邏輯
服務端啟動總結:
- 設置啟動類參數,最重要的就是設置channel
- 創建服務端對應的channel和各大組件,包括ChannelConfig,ChannelPipeline,Unsafe等
- 初始化Channel,設置一些attr,option,以及設置子channel的attr,option,向pipeline添加接入器ServerBootstrapAcceptor,觸發用戶自定義ChannelHandler的添加和register操作
- 調用到jdk底層bind方法實現端口綁定,并觸發channelActive事件,做事件注冊
總結
以上是生活随笔為你收集整理的Netty学习笔记(二)Netty服务端流程启动分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手写带注册中心的rpc框架(Netty版
- 下一篇: Netty学习笔记(一)Netty客户端