日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

netty源码解解析(4.0)-3 Channel的抽象实现

發(fā)布時(shí)間:2023/12/18 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 netty源码解解析(4.0)-3 Channel的抽象实现 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
AbstractChannel和AbstractUnsafe抽象類 io.netty.channel.AbstractChannel 從本章開(kāi)始,會(huì)有大量的篇幅涉及到代碼分析。為了能夠清晰簡(jiǎn)潔的地說(shuō)明代碼的結(jié)構(gòu)和功能,我會(huì)用代碼注釋+獨(dú)立段落的方式加以呈現(xiàn)。 所以,為你能更好地理解代碼,請(qǐng)不要忽略代碼中黑體字注釋。 AbstractChannel和AbstractUnsafe之間的關(guān)系 AbstractChannel實(shí)現(xiàn)了Channel接口,AbstractUnsafe實(shí)現(xiàn)了Unsafe。這兩個(gè)類是抽象類,他們實(shí)現(xiàn)了Channel和Unsafe的絕大部分接口。在AbstractChannel的實(shí)現(xiàn)中,每個(gè)方法都會(huì)直接或間接調(diào)用Unsafe對(duì)應(yīng)的同名方法。所有的inbound和outbound方法都是通過(guò)pipeline間接調(diào)用,其他的輔助方法直接使用unsafe實(shí)例調(diào)用。pipline和unsafe實(shí)例在AbstractChannel的構(gòu)造方法創(chuàng)建: protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); //AbstractChannel沒(méi)有實(shí)現(xiàn)這個(gè)方法 pipeline = newChannelPipeline(); // newChannelPipline的實(shí)現(xiàn) return new DefaultChannelPipeline(this); } 直接調(diào)用的例子: @Override public SocketAddress localAddres) { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { //這里直接調(diào)用了Unsafe的localAddress()方法 this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress; } 間接調(diào)用的例子 @Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); //通過(guò)pipline間接調(diào)用Unsafe的bind方法 } 關(guān)于pipline是怎樣調(diào)用Unsafe方法的,會(huì)在后面的Pipline相關(guān)章節(jié)詳細(xì)分析,這里只需記住。pipeline所有方法調(diào)用最終都會(huì)(如果沒(méi)有改變ChannelContextHandler的默認(rèn)實(shí)現(xiàn))通過(guò)使用newUnsafe創(chuàng)建的Unsafe實(shí)例調(diào)用Unsafe的同名方法(如果有的話)。 netty給出這一對(duì)Abstract實(shí)現(xiàn)有兩個(gè)目的:
  • 進(jìn)一步明確接口的語(yǔ)意。
  • 簡(jiǎn)化Channel接口的實(shí)現(xiàn)。
下面來(lái)具體看一下AbstractUnsafe的主要方法實(shí)現(xiàn)。 AbstractUnsafe的重要實(shí)現(xiàn) register實(shí)現(xiàn) @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { //檢查是否已經(jīng)注冊(cè), 避免重復(fù)只需注冊(cè)動(dòng)作。 promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) {//檢查eventloop是否滿足Channel的要求,由子類實(shí)現(xiàn) promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //設(shè)置Channel的EventLoop實(shí)例 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { //檢查是否在當(dāng)前線程中,如果是,直接調(diào)用 register0(promise); } else { //如果不是,把register0包裝到runnable中放到eventloop中調(diào)用。 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } 這個(gè)方法的實(shí)現(xiàn)為我們展示了netty使用I/O線程的一般套路 if(eventLoop.inEventLoop()){ doSomething(); }else{ eventLoop.execute(new Runnable(){ @Override public void run() { doSomething(); } }); } 對(duì)于某個(gè)需要放到I/O線性中執(zhí)行的方法,先檢查當(dāng)前線程是不是I/O線程,是就直接執(zhí)行,不是就把它包裝到Ruannable中放到eventLoop中執(zhí)行。 register的功能總結(jié)一句話就是調(diào)用register0, 下面看看register0的實(shí)現(xiàn)。 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop //確保promise沒(méi)有被取消同時(shí)Channel沒(méi)有被關(guān)閉才能執(zhí)行后面的動(dòng)作 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); //執(zhí)行真正的register操作,留改子類實(shí)現(xiàn) neverRegistered = false; registered = true; //設(shè)置Channel已經(jīng)處于registed狀態(tài) // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //觸發(fā)handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); //觸發(fā)channelRegistered事件 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {//確保Channel只有在第一次register 的時(shí)候被觸發(fā) pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 //對(duì)于設(shè)置了autoRead的Channel執(zhí)行beginRead(); beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } register語(yǔ)義:
  • 把channel和eventLoop綁定,eventLoop線程就是I/O線程。
  • 確保真正的register操作在I/O線程中執(zhí)行。
  • 確保每個(gè)channel的register操作只執(zhí)行一次。
  • 真正的register操作執(zhí)行成功后, 觸發(fā)channelRegistered事件,如果channel此時(shí)仍處于active狀態(tài),觸發(fā)channelActive事件,并確保這些事件只觸發(fā)一次。
  • 真正的register操作執(zhí)行成功后, 如果channel此時(shí)仍處于active狀態(tài),并且channel的配置支持autoRead, 則執(zhí)行beginRead操作,讓eventLoop可以自動(dòng)觸發(fā)channel的read事件。
  • bind實(shí)現(xiàn) @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } //先保存是否active的狀態(tài) boolean wasActive = isActive(); try { doBind(localAddress); //調(diào)用doBind, 需要子類實(shí)現(xiàn)這個(gè)方法完成真正的bind操作 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { //如果執(zhí)行完doBind后從非active狀態(tài)變成active裝,則觸發(fā)channelActive事件 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } bind語(yǔ)義:
    • 調(diào)用抽象方法doBind, 它需要子類實(shí)現(xiàn)。
    • 如果channel的狀態(tài)從非active變成active狀態(tài),則觸發(fā)channelActive事件
    disconnect實(shí)現(xiàn) disconnect和bind的實(shí)現(xiàn)類型,不同的是他調(diào)用的是doDisconnect方法,這個(gè)方法同樣是抽象方法需要子類實(shí)現(xiàn)。當(dāng)channel的狀態(tài)從非active變成active狀態(tài)時(shí),調(diào)用pipeline.fireChannelInactive()觸發(fā)channelInactive事件。 close實(shí)現(xiàn) @Override public final void close(final ChannelPromise promise) { assertEventLoop(); close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); } private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; } if (closeInitiated) { //這段代碼的作用就是防止多次執(zhí)行close操作 if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. // This means close() was called before so we just register a listener and return closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.setSuccess(); } }); } return; } closeInitiated = true; final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //把outboundBuffer置空,在這之后無(wú)法進(jìn)行write或flush操作 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose(); //這個(gè)方法默認(rèn)實(shí)現(xiàn)是return null. 如果有些可以在子類中覆蓋這個(gè)方法添加關(guān)閉前的準(zhǔn)備代 //下面的if..else執(zhí)行的是相同的操作,不同的是如果closeExecutor可以用,就在這個(gè)executor中執(zhí)行,否則在當(dāng)前線程總執(zhí)行 if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); //執(zhí)行close操作 } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! // close完成之后的操作, 在eventLoop中執(zhí)行 invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { // Fail all the queued messages //對(duì)outboundBuffer中的數(shù)據(jù)進(jìn)行錯(cuò)誤處理 outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } //執(zhí)行deregister操作, 如果channel由active變成非active狀態(tài)就觸發(fā)channelInactive事件 fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { // Close the channel and fail the queued messages in all cases. doClose0(promise); } finally { if (outboundBuffer != null) { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } } if (inFlush0) { //如果正在執(zhí)行flush操作,把deregister操作放在eventLoop中執(zhí)行 invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive); } }); } else { fireChannelInactiveAndDeregister(wasActive); } } } private void doClose0(ChannelPromise promise) { try { doClose(); //調(diào)用doClose執(zhí)行真正的close操作,它是一個(gè)抽象方法,需要在子類中實(shí)現(xiàn)。 closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } } close實(shí)現(xiàn)的代碼雖然比較多,但做的事情比較簡(jiǎn)單:首先執(zhí)行close操作,然后實(shí)現(xiàn)deregister操作,觸發(fā)channelInactive事件。 在close的實(shí)現(xiàn)中,先調(diào)用assertEventLoop方法確保當(dāng)前方法是在eventLoop中執(zhí)行,然后多次使用invokeLater方法吧一系列操作放在放在Runnable中執(zhí)行,這樣做的目的是事為了保證接下來(lái)的操作一定在當(dāng)前操作完成之后才會(huì)執(zhí)行,這一點(diǎn)是有eventLoop來(lái)保證的,eventLoop執(zhí)行Runnable的順序和調(diào)用execute的順序一致,相關(guān)實(shí)現(xiàn)會(huì)在后面eventLoop章節(jié)具體討論。 deregister實(shí)現(xiàn) @Override public final void deregister(final ChannelPromise promise) { assertEventLoop(); deregister(promise, false); } private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { //避免多次執(zhí)行deregister操作 safeSetSuccess(promise); return; } // As a user may call deregister() from within any method while doing processing in the ChannelPipeline, // we need to ensure we do the actual deregister operation later. This is needed as for example, // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay, // the deregister operation this could lead to have a handler invoked by different EventLoop and so // threads. // // See: // https://github.com/netty/netty/issues/4435 invokeLater(new Runnable() { @Override public void run() { try { doDeregister(); //執(zhí)行真正的deregister操作,這方法默認(rèn)沒(méi)做任何事情,子類可以根據(jù)需要覆蓋實(shí)現(xiàn) } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (fireChannelInactive) { pipeline.fireChannelInactive(); // 觸發(fā)channelInactive事件 } // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered, so check // if it was registered. if (registered) { registered = false; pipeline.fireChannelUnregistered(); //觸發(fā)channelUnregistered事件 } safeSetSuccess(promise); } } }); } 語(yǔ)義:
    • 調(diào)用doDeregister執(zhí)行真正的deregister操作
    • 根據(jù)參數(shù)可能需要觸發(fā)channelInactive事件
    • 觸發(fā)channelUnregistered事件
    write實(shí)現(xiàn) @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { //如果outboundBuffer是null, 意味著這個(gè)channel已經(jīng)被close掉了,需要使用promise返回錯(cuò)誤,然后釋放掉msg // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); //過(guò)濾msg, 默認(rèn)實(shí)現(xiàn)中沒(méi)有做任何操作,把msg原樣返回, 資料可以根據(jù)需要覆蓋實(shí)現(xiàn)。 size = pipeline.estimatorHandle().size(msg); //計(jì)算msg序列化之后的長(zhǎng)度 if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); //把msg放入outboundBuffer中 } write的操作比較簡(jiǎn)單,他只是把消息放到outboundBuffer中,并沒(méi)有做實(shí)際的寫(xiě)操作。 flush實(shí)現(xiàn) @Override public final void flush() { assertEventLoop(); //確保在eventLoop中執(zhí)行 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); //如果outboundBuffer不是null才可以進(jìn)入真正的write階段 flush0(); } protected void flush0() { if (inFlush0) { //確保不被多個(gè)線程同時(shí)執(zhí)行 // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { //確保outboundBuffer有數(shù)據(jù)是才執(zhí)行下面的步驟 return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { //如果channel不是active狀態(tài),返回錯(cuò)誤 try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); //執(zhí)行真正的寫(xiě)操作,這是一個(gè)抽象方法,需要子類實(shí)現(xiàn)。 } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ //如是I/O異常,并且channel配置允許自動(dòng)關(guān)閉,則關(guān)閉channel close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); //關(guān)閉output通道,不允許執(zhí)行write操作。 } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { inFlush0 = false; } } 語(yǔ)義:
    • 調(diào)用doWrite方法執(zhí)行真正的寫(xiě)操作
    • 如果寫(xiě)操作失敗,調(diào)用close或者shutdownOutput進(jìn)行善后。
    至此,已經(jīng)分析完了AbstractChannel和AbstractUnsafe的所有重要的實(shí)現(xiàn),回頭總結(jié)一下,這個(gè)類主要做了這么幾件事: 1. 明確了AbstractChannel和AbstractUnsafe方法之間的調(diào)用關(guān)系,或通過(guò)unsafe實(shí)例直接調(diào)用,或通過(guò)pipleline間接調(diào)用。 2. 規(guī)定了Unsafe方法的執(zhí)行線程,有些必須在eventLoop中執(zhí)行,這樣的方法第一行就調(diào)用assertEventLoop來(lái)確保當(dāng)前方法是在eventLoop線性中,有些不需要一定在eventLoop中執(zhí)行的則沒(méi)有這個(gè)調(diào)用 3. 確保多線程多線程環(huán)境下的執(zhí)行順序,這一點(diǎn)通過(guò)把一系列操作包裝成Runnable放入eventLoop中來(lái)保證,invokeLater方法就是一個(gè)典型的例子。 4. 定義了事件的觸發(fā)條件,在前面的代碼分析中,頻繁地出現(xiàn)pipeline.fireXXX()的調(diào)用,這些調(diào)用就是在觸發(fā)特定的事件,大部分情況下用戶不要自己去觸發(fā)事件。 5. 優(yōu)化多線程環(huán)境下的數(shù)據(jù)同步性能,使用volatile減少synchronized和Lock的使用, 典型的用法如下所示: private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { ...... return; } .... doWrite(outboundBuffer); AbstractUnsafe的擴(kuò)展點(diǎn) 前面說(shuō)過(guò),AbstractUnsafe做了很多事,但把臨門一腳的工作交給子類完成,這樣讓子類的實(shí)現(xiàn)變得簡(jiǎn)單很多。AbstractUsafe把這些工作定義成形如doXXX的抽象方法或是沒(méi)有干任何事的空方法。下面是這些方法的列表:
    方法 說(shuō)明
    protected abstract SocketAddress localAddress0() 被localAddress調(diào)用,執(zhí)行真正的獲取本地地址的操作。
    protected abstract SocketAddress remoteAddress0() 被remoteAddress調(diào)用,是真正的獲取遠(yuǎn)程地址的操作。
    protected abstract boolean isCompatible(EventLoop loop) 檢查eventLoop是是否和這個(gè)Channel兼容。
    protected void doRegister() 調(diào)用鏈register->register0->doRegister, 真正的注冊(cè)操作。
    protected abstract void doBind(SocketAddress localAddress) 被bind調(diào)用,執(zhí)行真正綁定本地地址的操作。
    protected abstract void doDisconnect() 被disconnect調(diào)用,執(zhí)行真正的斷開(kāi)連接操作。
    protected abstract void doClose() 被close掉,執(zhí)行真正的關(guān)閉channel操作。
    protected void doShutdownOutput() 被shutdownOutput調(diào)用,用來(lái)關(guān)閉output通道,使Channel不能write。它的的默認(rèn)實(shí)現(xiàn)是調(diào)用doClose
    protected void doDeregister() 被deregister調(diào)用,是真正的注銷操作,雖然不是抽象方法,然而只有一個(gè){}, 還是要等你來(lái)搞定。
    protected abstract void doBeginRead() 調(diào)用鏈register->register0->beginRead->doBeginRead, 實(shí)現(xiàn)讓eventLoop可以自動(dòng)觸發(fā)read事件。
    protected abstract void doWrite(ChannelOutboundBuffer in) 調(diào)用鏈flush->flush0->doWrite, 執(zhí)行真正的寫(xiě)操作。
    protected Object filterOutboundMessage(Object msg) 被write調(diào)用,在消息被放到outboundBuffer之前對(duì)消息進(jìn)行處理,默認(rèn)啥事都沒(méi)干,就是把你傳進(jìn)去的msg還給你。

    轉(zhuǎn)載于:https://www.cnblogs.com/brandonli/p/9949730.html

    總結(jié)

    以上是生活随笔為你收集整理的netty源码解解析(4.0)-3 Channel的抽象实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。