Netty学习笔记(二)Netty服务端流程启动分析
先貼下在NIO和Netty里啟動(dòng)服務(wù)端的代碼
public class NioServer { /*** 指定端口號(hào)啟動(dòng)服務(wù)* */public boolean startServer(int port){try {selector = Selector.open();//打開監(jiān)聽通道ServerSocketChannel server = ServerSocketChannel.open();//默認(rèn)configureBlocking為true,如果為 true,此通道將被置于阻塞模式;如果為 false.則此通道將被置于非阻塞模式server.configureBlocking(false);//監(jiān)聽客戶端連接請(qǐng)求server.register(selector, SelectionKey.OP_ACCEPT);//綁定端口server.bind(new InetSocketAddress(this.port));System.out.println("服務(wù)端啟動(dòng)成功,監(jiān)聽端口:" + port);}catch (Exception e){System.out.println("服務(wù)器啟動(dòng)失敗");return false;}return true;} } //定義主線程池EventLoopGroup bossGroup = new NioEventLoopGroup();//定義工作線程池EventLoopGroup workerGroup = new NioEventLoopGroup();//類似于ServerSocketServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class)//定義工作線程的處理函數(shù).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉(zhuǎn)化對(duì)應(yīng)的傳輸數(shù)據(jù) 從字節(jié)流到目標(biāo)對(duì)象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關(guān)的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})//boss線程池的最大線程數(shù).option(ChannelOption.SO_BACKLOG, 128)//工作線程保持長(zhǎng)連接.childOption(ChannelOption.SO_KEEPALIVE, true);//綁定端口啟動(dòng)netty服務(wù)端ChannelFuture future = serverBootstrap.bind(ZKConfig.SERVER_PORT).sync();和客戶端的EventLoopGroup不同,服務(wù)端需要指定兩個(gè)EventLoopGroup,這是因?yàn)榉?wù)端需要兩個(gè)線程池,bossGroup--用于處理客戶端的連接請(qǐng)求;另一個(gè)workerGroup,用于處理與各個(gè)客戶端連接的IO 操作(這與Reactor模型有關(guān))
前面的方法都是對(duì)各個(gè)對(duì)象進(jìn)行賦值,先從啟動(dòng)的方法開始看:
------AbstractBootStrap public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort)); }public ChannelFuture bind(SocketAddress localAddress) {//檢驗(yàn)group,channelFactory等必須屬性是否有賦值this.validate();if (localAddress == null) {throw new NullPointerException("localAddress");} else {return this.doBind(localAddress);}}private ChannelFuture doBind(final SocketAddress localAddress) {//進(jìn)行channel的初始化和注冊(cè)final ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();...ChannelPromise promise = channel.newPromise();//執(zhí)行實(shí)際的端口綁定doBind0(regFuture, channel, localAddress, promise);...}這里有兩個(gè)重要的方法initAndRegister()和doBind0()
initAndRegister
先來看下initAndRegister方法,大致可以分成以下三個(gè)步驟:
- 創(chuàng)建channel
- 初始化channel
- 注冊(cè)channel到Selector
Channel創(chuàng)建
channel = this.channelFactory.newChannel();
調(diào)用了ReflectiveChannelFactory的newChannel方法,最后通過反射創(chuàng)建了channel對(duì)象
public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");} else {this.clazz = clazz;}}public T newChannel() {try {//調(diào)用clazz默認(rèn)的構(gòu)造方法返回channel對(duì)象,這里的clazz就是上面構(gòu)造函數(shù)里的clazzreturn (Channel)this.clazz.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}這里創(chuàng)建的channel就是如下代碼設(shè)置的Class
源碼里的channel方法如下:
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));} }NioServerSocketChannel的構(gòu)造方法如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel() {//先通過newSocket方法創(chuàng)建channel,然后調(diào)用重載的this方法this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a server socket.", var2);}}//最后是調(diào)用了SelectorProvider的openServerSocketChannel方法 public ServerSocketChannel openServerSocketChannel() throws IOException {//調(diào)用了java nio包創(chuàng)建了一個(gè)ServerSocketChannel對(duì)象return new ServerSocketChannelImpl(this);}//創(chuàng)建一個(gè)NioServerSocketChannelConfig的配置對(duì)象public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {super((Channel)null, channel, 16);//這里this.javaChannel().socket()其實(shí)就是調(diào)用了Java底層的nio包的api拿到ServerSocketthis.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this,this.javaChannel().socket());}上面的super方法會(huì)調(diào)用NioServerSocketChannel父類的構(gòu)造方法直到AbstractChannel類(這是所有Channel的頂層父類)
初始化了id,unsafe.pipeline三個(gè)對(duì)象
初始化Channel
看下ServerBootStrap的實(shí)現(xiàn)
void init(Channel channel) throws Exception {//1.獲得options和attrsMap<ChannelOption<?>, Object> options = this.options0();synchronized(options) {channel.config().setOptions(options);}Map<AttributeKey<?>, Object> attrs = this.attrs0();synchronized(attrs) {Iterator i$ = attrs.entrySet().iterator();while(true) {if (!i$.hasNext()) {break;}Entry<AttributeKey<?>, Object> e = (Entry)i$.next();AttributeKey<Object> key = (AttributeKey)e.getKey();channel.attr(key).set(e.getValue());}}//2.創(chuàng)建pipeline并獲取childOptions和childAttrsChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;final Entry[] currentChildOptions;synchronized(this.childOptions) {currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));}final Entry[] currentChildAttrs;synchronized(this.childAttrs) {currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));}//3.為pipeline添加ChannelHandlerp.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});}主要可以分為四步:
1.獲取我們?cè)赟erverBootstrap里設(shè)置的options和attrs屬性 并給channel和config賦值
2.設(shè)置childOptions和childAttrs
3.為pipeline添加ChannelHandler
4.添加ChannelInitializer執(zhí)行器,為添加ServerBootstrapAcceptor連接器做準(zhǔn)備
強(qiáng)調(diào)一下這里是先添加了一個(gè)ChannelInitializer的handler,在它的initChannel方法里添加ServerBootstrapAcceptor
ServerBootstrapAcceptor連接器的真正添加是在register完成之后的回調(diào)里進(jìn)行的,我們就先簡(jiǎn)單看下連接器里有哪些屬性和主要行為
在連接器里把channel和對(duì)應(yīng)的workGroup的eventLoop進(jìn)行了綁定,在ServerBootStrap的channelRead方法里
ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);...try {//這里的childGroup是從構(gòu)造方法傳入的workerGroupchildGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {...}});} catch (Throwable t) {forceClose(child, t);}}ServerBootstrapAcceptor 中的 childGroup 是在構(gòu)造方法里傳入的 currentChildGroup,也就是 workerGroup 對(duì)象。
而這里的 Channel 是NioSocketChannel 的實(shí)例,因此這里的childGroup 的 register()方法就是將 workerGroup 中的
某個(gè) EventLoop 和 NioSocketChannel 關(guān)聯(lián)上了。(至于這里的channelRead怎么調(diào)用到的放到最后做一個(gè)分析,最好看完EventLoopGroup 和 Pipeline相關(guān)的源碼有個(gè)基本了解后再看)。
在channel上進(jìn)行注冊(cè)register
執(zhí)行? ? ? ?
ChannelFuture regFuture = this.config().group().register(channel);
最后還是會(huì)調(diào)用AbstractChannel里的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");} else if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {//把channel和當(dāng)前的EventLoop進(jìn)行綁定,這里的eventLoop是來自于前面定義的BossGroupAbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {//核心方法this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}} private void register0(ChannelPromise promise) {try {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;//這里的invokeHandlerAddedIfNeeded會(huì)回調(diào)ChannelInitializer的handlerAdded反方,從而將ServerBootstrapAcceptor真正添加到Pipeline中pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();//這里的isActive()方法會(huì)返回false,底層調(diào)用NioServerSocketChannel的isActive判斷,此時(shí)bind操作還沒完成if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {}}-----NioServerSocketChannel@Overridepublic boolean isActive() {return javaChannel().socket().isBound();}最終調(diào)用了AbstractNioChannel里的doRegister方法 來進(jìn)行事件注冊(cè)
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) { eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}完成事件注冊(cè)之后,會(huì)調(diào)用pipeline.invokeHandlerAddedIfNeeded(); 從而調(diào)用ChannelInitializer的initChannel方法(這里的執(zhí)行流程在Netty學(xué)習(xí)筆記(五)Pipeline?一文里有介紹,就不詳細(xì)講了),也就是執(zhí)行如下代碼,真正將ServerBootstrapAcceptor添加到Pipeline
public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {//4.添加ServerBootstrapAcceptorpipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}}});doBind()端口綁定
調(diào)用的時(shí)序圖如下:
這里的調(diào)用鏈其實(shí)就是Pipeline的一個(gè)傳播過程,感興趣的可以看下:Netty學(xué)習(xí)筆記(五)Pipeline,Netty學(xué)習(xí)筆記(六)Pipeline的傳播機(jī)制
最后調(diào)用的AbstractChannel bind方法源碼如下:
public final void bind(SocketAddress localAddress, ChannelPromise promise) {this.assertEventLoop();if (promise.setUncancellable() && this.ensureOpen(promise)) {...//wasActive=falseboolean wasActive = AbstractChannel.this.isActive();try {AbstractChannel.this.doBind(localAddress);} catch (Throwable var5) {this.safeSetFailure(promise, var5);this.closeIfClosed();return;}//執(zhí)行完上述的doBind方法后,isActive()方法返回true,會(huì)調(diào)用fireChannelActive進(jìn)行回調(diào)通知if (!wasActive && AbstractChannel.this.isActive()) {this.invokeLater(new Runnable() {public void run() {AbstractChannel.this.pipeline.fireChannelActive();}});}this.safeSetSuccess(promise);}}然后調(diào)用了NioServerSocketChannel的doBind0方法,在這里調(diào)用了java 底層的nio 包
private void doBind0(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {this.javaChannel().bind(localAddress);} else {this.javaChannel().socket().bind(localAddress);}}主要做了兩件事
1.調(diào)用jdk底層接口進(jìn)行端口綁定
2.調(diào)用pipeline.fireChannelActive(); 進(jìn)行回調(diào)通知
ServerBootstrapAcceptor分析
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {return;}}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}當(dāng)服務(wù)端綁定端口啟動(dòng)后,會(huì)啟動(dòng)reactor線程(也就是NioEventLoop),reactor不斷檢測(cè)是否有新的事件發(fā)生,直到檢測(cè)出有accept事件發(fā)生,會(huì)調(diào)用unsafe.read()方法
進(jìn)入到對(duì)應(yīng)的實(shí)現(xiàn)類(AbstractNioMessageChannel.NioMessageUnsafe)
private final List<Object> readBuf = new ArrayList<Object>(); public void read() {assert eventLoop().inEventLoop();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();boolean closed = false;try {do {//循環(huán)調(diào)用int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();}首先注意到這里有一個(gè)do..while()循環(huán),我們看下doReadMessage(readBuf)做了什么
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}return 0;}很簡(jiǎn)單明了,就是調(diào)用jdk底層的accept()方法獲取Channel,然后Channel封裝成NioSocketChannel全部添加到readBuf集合里(注意這里的this是NioServerSocketChannel)
回到read()方法,接下來就是遍歷readBuf拿到各個(gè)channel,并執(zhí)行pipeline.fireChannelRead(channel) 這里就是通過fireChannelRead來回調(diào)ServerBootstrapAcceptor的chanelRead();最后調(diào)用一個(gè)pipeline的channelReadComplete事件通知
我們?cè)倏聪耂erverBootstrapAcceptor的源碼
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;}@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}} }這里主要就是做兩件事:
1.調(diào)用pipeline.addLast(childHandler)方法 添加執(zhí)行器
.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉(zhuǎn)化對(duì)應(yīng)的傳輸數(shù)據(jù) 從字節(jié)流到目標(biāo)對(duì)象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義相關(guān)的編/解碼器等pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))....addLast(new RpcServerHandler(beanMappings));}})先將對(duì)應(yīng)的ChannelInitializer添加到Pipeline,真正執(zhí)行initChannel方法在后面
2.?childGroup.register(child) 將channel和workGroup里的eventLoop進(jìn)行綁定
這里的childGroup就是構(gòu)造方法里傳入的workGroup,child 是NioSocketChannel對(duì)象(doReadMessages()方法里創(chuàng)建的)
跟下去會(huì)執(zhí)行MultithreadEventLoopGroup的如下方法:
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}這里的next()是EventExecutorChooser事件執(zhí)行器選擇器的方法,其作用就是從我們的workGroup線程組中選擇一個(gè)線程與channel綁定(實(shí)際就是從一個(gè)線程數(shù)組選擇下一個(gè)線程來綁定,具體的實(shí)現(xiàn)在EventLoopGroup的博客里有講到)
剩下的register調(diào)用大家都很熟悉了吧,就是調(diào)用
unsafe.register()-->AbstractChannel.AbstractUnsafe.register()-->register0()-->AbstractNioChannel.doRegister()
private void register0(ChannelPromise promise) {boolean firstRegistration = neverRegistered;doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered(); if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}} }先是調(diào)用doRegister(),完成注冊(cè)過程,如下
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}將該channel綁定到一個(gè)eventLoop線程的selector上,后續(xù)該channel的事件輪詢,以及事件處理,異步task執(zhí)行都是由此eventLoop(reactor)線程來負(fù)責(zé)
執(zhí)行完doRegister()方法之后我們又看到了熟悉的pipeline.invokeHandlerAddedIfNeeded(); 同樣的,也是回調(diào)ChannelInitializer的handlerAdded方法從而執(zhí)行對(duì)應(yīng)的initChannel,此時(shí)才是真正將用戶自己定義的各種handler添加到Pipeline。這樣在后續(xù)的事件通知中會(huì)通過回調(diào)方法執(zhí)行各個(gè)handler里的業(yè)務(wù)邏輯
服務(wù)端啟動(dòng)總結(jié):
- 設(shè)置啟動(dòng)類參數(shù),最重要的就是設(shè)置channel
- 創(chuàng)建服務(wù)端對(duì)應(yīng)的channel和各大組件,包括ChannelConfig,ChannelPipeline,Unsafe等
- 初始化Channel,設(shè)置一些attr,option,以及設(shè)置子channel的attr,option,向pipeline添加接入器ServerBootstrapAcceptor,觸發(fā)用戶自定義ChannelHandler的添加和register操作
- 調(diào)用到j(luò)dk底層bind方法實(shí)現(xiàn)端口綁定,并觸發(fā)channelActive事件,做事件注冊(cè)
總結(jié)
以上是生活随笔為你收集整理的Netty学习笔记(二)Netty服务端流程启动分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手写带注册中心的rpc框架(Netty版
- 下一篇: Netty学习笔记(一)Netty客户端