netty 5 alph1源码分析(服务端创建过程)
研究了netty的服務(wù)端創(chuàng)建過程。至于netty的優(yōu)勢,可以參照網(wǎng)絡(luò)其他文章。《Netty系列之Netty 服務(wù)端創(chuàng)建》是?李林鋒撰寫的netty源碼分析的一篇好文,絕對是技術(shù)干貨。但拋開技術(shù)來說,也存在一些瑕疵。
缺點如下
代碼銜接不連貫,上下不連貫。
代碼片段是截圖,對閱讀代理不便(可能和閱讀習(xí)慣有關(guān))
本篇主要內(nèi)容,參照《Netty系列之Netty 服務(wù)端創(chuàng)建》,梳理出自己喜歡的閱讀風(fēng)格。
1.整體邏輯圖
整體將服務(wù)端創(chuàng)建分為2部分:(1)綁定端口,提供服務(wù)過程;(2)輪詢網(wǎng)絡(luò)請求
1.1 綁定端口序列圖
1.2 類圖
類圖僅僅涵蓋了綁定過程中比較重要的幾個組件
1.3 代碼分析
step 2 doBind?綁定本地端口,啟動服務(wù)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | private?ChannelFuture?doBind(final?SocketAddress?localAddress)?{ ????final?ChannelFuture?regFuture?=?initAndRegister();//1 ????final?Channel?channel?=?regFuture.channel(); ????if?(regFuture.cause()?!=?null)?{ ????????return?regFuture; ????} ????final?ChannelPromise?promise; ????if?(regFuture.isDone())?{ ????????promise?=?channel.newPromise(); ????????doBind0(regFuture,?channel,?localAddress,?promise);//2 ????}?else?{ ????????//?Registration?future?is?almost?always?fulfilled?already,?but?just?in?case?it's?not. ????????promise?=?new?DefaultChannelPromise(channel,?GlobalEventExecutor.INSTANCE); ????????regFuture.addListener(new?ChannelFutureListener()?{ ????????????@Override ????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{ ????????????????doBind0(regFuture,?channel,?localAddress,?promise);//2 ????????????} ????????}); ????} ????return?promise; } |
step3 initAndRegister
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | final?ChannelFuture?initAndRegister()?{ ????Channel?channel; ????try?{ ????????channel?=?createChannel(); ????}?catch?(Throwable?t)?{ ????????return?VoidChannel.INSTANCE.newFailedFuture(t); ????} ????try?{ ????????init(channel); ????}?catch?(Throwable?t)?{ ????????channel.unsafe().closeForcibly(); ????????return?channel.newFailedFuture(t); ????} //注冊NioServerSocketChannel到Reactor線程的多路復(fù)用器上 ????ChannelPromise?regFuture?=?channel.newPromise(); ????channel.unsafe().register(regFuture); ????if?(regFuture.cause()?!=?null)?{ ????????if?(channel.isRegistered())?{ ????????????channel.close(); ????????}?else?{ ????????????channel.unsafe().closeForcibly(); ????????} ????} ????return?regFuture; } |
createChannel由子類ServerBootstrap實現(xiàn),創(chuàng)建新的NioServerSocketChannel,并完成Channel初始化,以及注冊。
4.ServerBootstrap.createChannel
| 1 2 3 4 | Channel?createChannel()?{ ????EventLoop?eventLoop?=?group().next(); ????return?channelFactory().newChannel(eventLoop,?childGroup); } |
它有兩個參數(shù),參數(shù)1是從父類的NIO線程池中順序獲取一個NioEventLoop,它就是服務(wù)端用于監(jiān)聽和接收客戶端連接的Reactor線程。第二個參數(shù)就是所謂的workerGroup線程池,它就是處理IO讀寫的Reactor線程組。
5.ServerBootstrap.init
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | void?init(Channel?channel)?throws?Exception?{ //設(shè)置Socket參數(shù)和NioServerSocketChannel的附加屬性 ????final?Map<ChannelOption<?>,?Object>?options?=?options(); ????synchronized?(options)?{ ????????channel.config().setOptions(options); ????} ????final?Map<AttributeKey<?>,?Object>?attrs?=?attrs(); ????synchronized?(attrs)?{ ????????for?(Entry<AttributeKey<?>,?Object>?e:?attrs.entrySet())?{ ????????????@SuppressWarnings("unchecked") ????????????AttributeKey<Object>?key?=?(AttributeKey<Object>)?e.getKey(); ????????????channel.attr(key).set(e.getValue()); ????????} ????} //將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中 ????ChannelPipeline?p?=?channel.pipeline(); ????if?(handler()?!=?null)?{ ????????p.addLast(handler()); ????} ????final?ChannelHandler?currentChildHandler?=?childHandler; ????final?Entry<ChannelOption<?>,?Object>[]?currentChildOptions; ????final?Entry<AttributeKey<?>,?Object>[]?currentChildAttrs; ????synchronized?(childOptions)?{ ????????currentChildOptions?=?childOptions.entrySet().toArray(newOptionArray(childOptions.size())); ????} ????synchronized?(childAttrs)?{ ????????currentChildAttrs?=?childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); ????} //將用于服務(wù)端注冊的Handler?ServerBootstrapAcceptor添加到ChannelPipeline中 ????p.addLast(new?ChannelInitializer<Channel>()?{ ????????@Override ????????public?void?initChannel(Channel?ch)?throws?Exception?{ ????????????ch.pipeline().addLast(new?ServerBootstrapAcceptor(currentChildHandler,?currentChildOptions, ????????????????????currentChildAttrs)); ????????} ????}); } |
到此處,Netty服務(wù)端監(jiān)聽的相關(guān)資源已經(jīng)初始化完畢。
6.AbstractChannel.AbstractUnsafe.register
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public?final?void?register(final?ChannelPromise?promise)?{ //首先判斷是否是NioEventLoop自身發(fā)起的操作,如果是,則不存在并發(fā)操作,直接執(zhí)行Channel注冊; ????if?(eventLoop.inEventLoop())?{ ????????register0(promise); ????}?else?{//如果由其它線程發(fā)起,則封裝成一個Task放入消息隊列中異步執(zhí)行。 ????????try?{ ????????????eventLoop.execute(new?Runnable()?{ ????????????????@Override ????????????????public?void?run()?{ ????????????????????register0(promise); ????????????????} ????????????}); ????????}?catch?(Throwable?t)?{ ????????????logger.warn( ????????????????????"Force-closing?a?channel?whose?registration?task?was?not?accepted?by?an?event?loop:?{}", ????????????????????AbstractChannel.this,?t); ????????????closeForcibly(); ????????????closeFuture.setClosed(); ????????????promise.setFailure(t); ????????} ????} } |
7.register0
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | private?void?register0(ChannelPromise?promise)?{ ????try?{ ????????//?check?if?the?channel?is?still?open?as?it?could?be?closed?in?the?mean?time?when?the?register ????????//?call?was?outside?of?the?eventLoop ????????if?(!ensureOpen(promise))?{ ????????????return; ????????} ????????doRegister(); ????????registered?=?true; ????????promise.setSuccess(); ????????pipeline.fireChannelRegistered(); ????????? ????????if?(isActive())?{//完成綁定時,不會調(diào)用該代碼段 ????????????pipeline.fireChannelActive(); ????????} ????}?catch?(Throwable?t)?{ ????????//?Close?the?channel?directly?to?avoid?FD?leak. ????????closeForcibly(); ????????closeFuture.setClosed(); ????????if?(!promise.tryFailure(t))?{ ????????????logger.warn( ????????????????????"Tried?to?fail?the?registration?promise,?but?it?is?complete?already.?"?+ ????????????????????????????"Swallowing?the?cause?of?the?registration?failure:",?t); ????????} ????} } |
觸發(fā)事件
8.doRegister
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | protected?void?doRegister()?throws?Exception?{ ????boolean?selected?=?false; ????for?(;;)?{ ????????try?{ ????????//將NioServerSocketChannel注冊到NioEventLoop的Selector上 ????????????selectionKey?=?javaChannel().register(eventLoop().selector,?0,?this); ????????????return; ????????}?catch?(CancelledKeyException?e)?{ ????????????if?(!selected)?{ ????????????????//?Force?the?Selector?to?select?now?as?the?"canceled"?SelectionKey?may?still?be ????????????????//?cached?and?not?removed?because?no?Select.select(..)?operation?was?called?yet. ????????????????eventLoop().selectNow(); ????????????????selected?=?true; ????????????}?else?{ ????????????????//?We?forced?a?select?operation?on?the?selector?before?but?the?SelectionKey?is?still?cached ????????????????//?for?whatever?reason.?JDK?bug?? ????????????????throw?e; ????????????} ????????} ????} } |
大伙兒可能會很詫異,應(yīng)該注冊O(shè)P_ACCEPT(16)到多路復(fù)用器上,怎么注冊0呢?0表示只注冊,不監(jiān)聽任何網(wǎng)絡(luò)操作。這樣做的原因如下:
注冊方法是多態(tài)的,它既可以被NioServerSocketChannel用來監(jiān)聽客戶端的連接接入,也可以用來注冊SocketChannel,用來監(jiān)聽網(wǎng)絡(luò)讀或者寫操作;
通過SelectionKey的interestOps(int ops)方法可以方便的修改監(jiān)聽操作位。所以,此處注冊需要獲取SelectionKey并給AbstractNioChannel的成員變量selectionKey賦值。
本文轉(zhuǎn)自 randy_shandong 51CTO博客,原文鏈接:http://blog.51cto.com/dba10g/1863497,如需轉(zhuǎn)載請自行聯(lián)系原作者
總結(jié)
以上是生活随笔為你收集整理的netty 5 alph1源码分析(服务端创建过程)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 制作liveusb实现centos6.2
- 下一篇: redis演练(5) redis持久化