日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

45 张图深度解析 Netty 架构与原理

發布時間:2025/3/11 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 45 张图深度解析 Netty 架构与原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

作為一個學 Java 的,如果沒有研究過 Netty,那么你對 Java 語言的使用和理解僅僅停留在表面水平,會點 SSH 寫幾個 MVC,訪問數據庫和緩存,這些只是初等 Java 程序員干的事。如果你要進階,想了解 Java 服務器的深層高階知識,Netty 絕對是一個必須要過的門檻。

接下來我們會學習一個 Netty 系列教程,Netty 系列由「架構與原理」,「源碼」,「架構」三部分組成,今天我們先來看看第一部分:Netty 架構與原理初探,大綱如下:

  • 前言

  • 1. Netty 基礎

    • 1.4.1. 緩沖區(Buffer)

    • 1.4.2. 通道(Channel)

    • 1.4.3. 選擇器(Selector)

    • 1.1. Netty 是什么

    • 1.2. Netty 的應用場景

    • 1.3. Java 中的網絡 IO 模型

    • 1.4. Java NIO API 簡單回顧

    • 1.5. 零拷貝技術

  • 2. Netty 的架構與原理

    • 2.2.1. 單 Reactor 單線程模式

    • 2.2.2. 單 Reactor 多線程模式

    • 2.2.3. 主從 Reactor 多線程模式

    • 2.1. 為什么要制造 Netty

    • 2.2. 幾種 Reactor 線程模式

    • 2.3. Netty 的模樣

    • 2.4. 基于 Netty 的 TCP Server/Client 案例

    • 2.5. Netty 的 Handler 組件

    • 2.6. Netty 的 Pipeline 組件

    • 2.7. Netty 的 EventLoopGroup 組件

    • 2.8. Netty 的 TaskQueue

    • 2.9. Netty 的 Future 和 Promise

  • 3. 結束語

前言

讀者在閱讀本文前最好有 Java 的 IO 編程經驗(知道 Java 的各種 IO 流),以及 Java 網絡編程經驗(用 ServerSocket 和 Socket 寫過 demo),并對 Java NIO 有基本的認識(至少知道 Channel、Buffer、Selector 中的核心屬性和方法,以及三者如何配合使用的),以及 JUC 編程經驗(至少知道其中的 Future 異步處理機制),沒有也沒關系,文中多數會介紹,不影響整體的理解。

文中對于 Reactor 的講解使用了幾張來自網絡上的深灰色背景的示意圖,但未找到原始出處,文中已標注“圖片來源于網絡”。

Netty 的設計復雜,接口和類體系龐大,因此我會從不同的層次對有些 Netty 中的重要組件反復描述,以幫助讀者理解。

1. Netty 基礎

基礎好的同學,如果已經掌握了 Java NIO 并對 IO 多路復用的概念有一定的認知,可以跳過本章。

1.1. Netty 是什么

1)Netty 是 JBoss 開源項目,是異步的、基于事件驅動的網絡應用框架,它以高性能、高并發著稱。所謂基于事件驅動,說得簡單點就是 Netty 會根據客戶端事件(連接、讀、寫等)做出響應,關于這點,隨著文章的論述的展開,讀者自然會明白。

2)Netty 主要用于開發基于 TCP 協議的網絡 IO 程序(TCP/IP 是網絡通信的基石,當然也是 Netty 的基石,Netty 并沒有去改變這些底層的網絡基礎設施,而是在這之上提供更高層的網絡基礎設施),例如高性能服務器段/客戶端、P2P 程序等。

3)Netty 是基于 Java NIO 構建出來的,Java NIO 又是基于 Linux 提供的高性能 IO 接口/系統調用構建出來的。關于 Netty 在網絡中的地位,下圖可以很好地表達出來:

054

1.2. Netty 的應用場景

在互聯網領域,Netty 作為異步高并發的網絡組件,常常用于構建高性能 RPC 框架,以提升分布式服務群之間調用或者數據傳輸的并發度和速度。例如 Dubbo 的網絡層就可以(但并非一定)使用 Netty。

一些大數據基礎設施,比如 Hadoop,在處理海量數據的時候,數據在多個計算節點之中傳輸,為了提高傳輸性能,也采用 Netty 構建性能更高的網絡 IO 層。

在游戲行業,Netty 被用于構建高性能的游戲交互服務器,Netty 提供了 TCP/UDP、HTTP 協議棧,方便開發者基于 Netty 進行私有協議的開發。

……

Netty 作為成熟的高性能異步通信框架,無論是應用在互聯網分布式應用開發中,還是在大數據基礎設施構建中,亦或是用于實現應用層基于公私協議的服務器等等,都有出色的表現,是一個極好的輪子。

1.3. Java 中的網絡 IO 模型

Java 中的網絡 IO 模型有三種:BIO、NIO、AIO。

1)BIO:同步的、阻塞式 IO。在這種模型中,服務器上一個線程處理一次連接,即客戶端每發起一個請求,服務端都要開啟一個線程專門處理該請求。這種模型對線程量的耗費極大,且線程利用率低,難以承受請求的高并發。BIO 雖然可以使用線程池+等待隊列進行優化,避免使用過多的線程,但是依然無法解決線程利用率低的問題。

055

使用 BIO 構建 C/S 系統的 Java 編程組件是 ServerSocket 和 Socket。服務端示例代碼為:

public?static?void?main(String[]?args)?throws?IOException?{ExecutorService?threadPool?=?Executors.newCachedThreadPool();ServerSocket?serverSocket?=?new?ServerSocket(8080);while?(true)?{Socket?socket?=?serverSocket.accept();threadPool.execute(()?->?{handler(socket);});} }/***?處理客戶端請求*/ private?static?void?handler(Socket?socket)?throws?IOException?{byte[]?bytes?=?new?byte[1024];InputStream?inputStream?=?socket.getInputStream();socket.close();while?(true)?{int?read?=?inputStream.read(bytes);if?(read?!=?-1)?{System.out.println("msg?from?client:?"?+?new?String(bytes,?0,?read));}?else?{break;}} }

2)NIO:同步的、非阻塞式 IO。在這種模型中,服務器上一個線程處理多個連接,即多個客戶端請求都會被注冊到多路復用器(后文要講的 Selector)上,多路復用器會輪訓這些連接,輪訓到連接上有 IO 活動就進行處理。NIO 降低了線程的需求量,提高了線程的利用率。Netty 就是基于 NIO 的(這里有一個問題:前文大力宣揚 Netty 是一個異步高性能網絡應用框架,為何這里又說 Netty 是基于同步的 NIO 的?請讀者跟著文章的描述找尋答案)。

NIO 是面向緩沖區編程的,從緩沖區讀取數據的時候游標在緩沖區中是可以前后移動的,這就增加了數據處理的靈活性。這和面向流的 BIO 只能順序讀取流中數據有很大的不同。

Java NIO 的非阻塞模式,使得一個線程從某個通道讀取數據的時候,若當前有可用數據,則該線程進行處理,若當前無可用數據,則該線程不會保持阻塞等待狀態,而是可以去處理其他工作(比如處理其他通道的讀寫);同樣,一個線程向某個通道寫入數據的時候,一旦開始寫入,該線程無需等待寫完即可去處理其他工作(比如處理其他通道的讀寫)。這種特性使得一個線程能夠處理多個客戶端請求,而不是像 BIO 那樣,一個線程只能處理一個請求。

使用 NIO 構建 C/S 系統的 Java 編程組件是 Channel、Buffer、Selector。服務端示例代碼為:

public?static?void?main(String[]?args)?throws?IOException?{ServerSocketChannel?serverSocketChannel?=?ServerSocketChannel.open();Selector?selector?=?Selector.open();//?綁定端口serverSocketChannel.socket().bind(new?InetSocketAddress(8080));//?設置?serverSocketChannel?為非阻塞模式serverSocketChannel.configureBlocking(false);//?注冊?serverSocketChannel?到?selector,關注?OP_ACCEPT?事件serverSocketChannel.register(selector,?SelectionKey.OP_ACCEPT);while?(true)?{//?沒有事件發生if?(selector.select(1000)?==?0)?{continue;}//?有事件發生,找到發生事件的?Channel?對應的?SelectionKey?的集合Set<SelectionKey>?selectionKeys?=?selector.selectedKeys();Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();while?(iterator.hasNext())?{SelectionKey?selectionKey?=?iterator.next();//?發生?OP_ACCEPT?事件,處理連接請求if?(selectionKey.isAcceptable())?{SocketChannel?socketChannel?=?serverSocketChannel.accept();//?將?socketChannel?也注冊到?selector,關注?OP_READ//?事件,并給?socketChannel?關聯?BuffersocketChannel.register(selector,?SelectionKey.OP_READ,?ByteBuffer.allocate(1024));}//?發生?OP_READ?事件,讀客戶端數據if?(selectionKey.isReadable())?{SocketChannel?channel?=?(SocketChannel)?selectionKey.channel();ByteBuffer?buffer?=?(ByteBuffer)?selectionKey.attachment();channel.read(buffer);System.out.println("msg?form?client:?"?+?new?String(buffer.array()));}//?手動從集合中移除當前的?selectionKey,防止重復處理事件iterator.remove();}} }

3)AIO:異步非阻塞式 IO。在這種模型中,由操作系統完成與客戶端之間的 read/write,之后再由操作系統主動通知服務器線程去處理后面的工作,在這個過程中服務器線程不必同步等待 read/write 完成。由于不同的操作系統對 AIO 的支持程度不同,AIO 目前未得到廣泛應用。因此本文對 AIO 不做過多描述。

使用 Java NIO 構建的 IO 程序,它的工作模式是:主動輪訓 IO 事件,IO 事件發生后程序的線程主動處理 IO 工作,這種模式也叫做 Reactor 模式。使用 Java AIO 構建的 IO 程序,它的工作模式是:將 IO 事件的處理托管給操作系統,操作系統完成 IO 工作之后會通知程序的線程去處理后面的工作,這種模式也叫做 Proactor 模式。

本節最后,討論一下網路 IO 中阻塞、非阻塞、異步、同步這幾個術語的含義和關系:

  • 阻塞:如果線程調用 read/write 過程,但 read/write 過程沒有就緒或沒有完成,則調用 read/write 過程的線程會一直等待,這個過程叫做阻塞式讀寫。

  • 非阻塞:如果線程調用 read/write 過程,但 read/write 過程沒有就緒或沒有完成,調用 read/write 過程的線程并不會一直等待,而是去處理其他工作,等到 read/write 過程就緒或完成后再回來處理,這個過程叫做非阻塞式讀寫。

  • 異步:read/write 過程托管給操作系統來完成,完成后操作系統會通知(通過回調或者事件)應用網絡 IO 程序(其中的線程)來進行后續的處理。

  • 同步:read/write 過程由網絡 IO 程序(其中的線程)來完成。

基于以上含義,可以看出:異步 IO 一定是非阻塞 IO;同步 IO 既可以是阻塞 IO、也可以是非阻塞 IO。

1.4. Java NIO API 簡單回顧

BIO 以流的方式處理數據,而 NIO 以緩沖區(也被叫做塊)的方式處理數據,塊 IO 效率比流 IO 效率高很多。BIO 基于字符流或者字節流進行操作,而 NIO 基于 Channel 和 Buffer 進行操作,數據總是從通道讀取到緩沖區或者從緩沖區寫入到通道。Selector 用于監聽多個通道上的事件(比如收到連接請求、數據達到等等),因此使用單個線程就可以監聽多個客戶端通道。如下圖所示:

關于上圖,再進行幾點說明:

  • 一個 Selector 對應一個處理線程

  • 一個 Selector 上可以注冊多個 Channel

  • 每個 Channel 都會對應一個 Buffer(有時候一個 Channel 可以使用多個 Buffer,這時候程序要進行多個 Buffer 的分散和聚集操作),Buffer 的本質是一個內存塊,底層是一個數組

  • Selector 會根據不同的事件在各個 Channel 上切換

  • Buffer 是雙向的,既可以讀也可以寫,切換讀寫方向要調用 Buffer 的 flip()方法

  • 同樣,Channel 也是雙向的,數據既可以流入也可以流出

1.4.1. 緩沖區(Buffer)

緩沖區(Buffer)本質上是一個可讀可寫的內存塊,可以理解成一個容器對象,Channel 讀寫文件或者網絡都要經由 Buffer。在 Java NIO 中,Buffer 是一個頂層抽象類,它的常用子類有(前綴表示該 Buffer 可以存儲哪種類型的數據):

  • ByteBuffer

  • CharBuffer

  • ShortBuffer

  • IntBuffer

  • LongBuffer

  • DoubleBuffer

  • FloatBuffer

涵蓋了 Java 中除 boolean 之外的所有的基本數據類型。其中 ByteBuffer 支持類型化的數據存取,即可以往 ByteBuffer 中放 byte 類型數據、也可以放 char、int、long、double 等類型的數據,但讀取的時候要做好類型匹配處理,否則會拋出 BufferUnderflowException。

另外,Buffer 體系中還有一個重要的 MappedByteBuffer(ByteBuffer 的子類),可以讓文件內容直接在堆外內存中被修改,而如何同步到文件由 NIO 來完成。本文重點不在于此,有興趣的可以去探究一下 MappedByteBuffer 的底層原理。

1.4.2. 通道(Channel)

通道(Channel)是雙向的,可讀可寫。在 Java NIO 中,Buffer 是一個頂層接口,它的常用子類有:

  • FileChannel:用于文件讀寫

  • DatagramChannel:用于 UDP 數據包收發

  • ServerSocketChannel:用于服務端 TCP 數據包收發

  • SocketChannel:用于客戶端 TCP 數據包收發

1.4.3. 選擇器(Selector)

選擇器(Selector)是實現 IO 多路復用的關鍵,多個 Channel 注冊到某個 Selector 上,當 Channel 上有事件發生時,Selector 就會取得事件然后調用線程去處理事件。也就是說只有當連接上真正有讀寫等事件發生時,線程才會去進行讀寫等操作,這就不必為每個連接都創建一個線程,一個線程可以應對多個連接。這就是 IO 多路復用的要義。

Netty 的 IO 線程 NioEventLoop 聚合了 Selector,可以同時并發處理成百上千的客戶端連接,后文會展開描述。

在 Java NIO 中,Selector 是一個抽象類,它的常用方法有:

public?abstract?class?Selector?implements?Closeable?{....../***?得到一個選擇器對象*/public?static?Selector?open()?throws?IOException?{return?SelectorProvider.provider().openSelector();}....../***?返回所有發生事件的?Channel?對應的?SelectionKey?的集合,通過*?SelectionKey?可以找到對應的?Channel*/public?abstract?Set<SelectionKey>?selectedKeys();....../***?返回所有?Channel?對應的?SelectionKey?的集合,通過?SelectionKey*?可以找到對應的?Channel*/public?abstract?Set<SelectionKey>?keys();....../***?監控所有注冊的?Channel,當其中的?Channel?有?IO?操作可以進行時,*?將這些 Channel 對應的 SelectionKey 找到。參數用于設置超時時間*/public?abstract?int?select(long?timeout)?throws?IOException;/***?無超時時間的?select?過程,一直等待,直到發現有?Channel?可以進行*?IO?操作*/public?abstract?int?select()?throws?IOException;/***?立即返回的?select?過程*/public?abstract?int?selectNow()?throws?IOException;....../***?喚醒?Selector,對無超時時間的?select?過程起作用,終止其等待*/public?abstract?Selector?wakeup();...... }

在上文的使用 Java NIO 編寫的服務端示例代碼中,服務端的工作流程為:

1)當客戶端發起連接時,會通過 ServerSocketChannel 創建對應的 SocketChannel。

2)調用 SocketChannel 的注冊方法將 SocketChannel 注冊到 Selector 上,注冊方法返回一個 SelectionKey,該 SelectionKey 會被放入 Selector 內部的 SelectionKey 集合中。該 SelectionKey 和 Selector 關聯(即通過 SelectionKey 可以找到對應的 Selector),也和 SocketChannel 關聯(即通過 SelectionKey 可以找到對應的 SocketChannel)。

4)Selector 會調用 select()/select(timeout)/selectNow()方法對內部的 SelectionKey 集合關聯的 SocketChannel 集合進行監聽,找到有事件發生的 SocketChannel 對應的 SelectionKey。

5)通過 SelectionKey 找到有事件發生的 SocketChannel,完成數據處理。

以上過程的相關源碼為:

/** *?SocketChannel?繼承?AbstractSelectableChannel */ public?abstract?class?SocketChannelextends?AbstractSelectableChannelimplements?ByteChannel,?ScatteringByteChannel,?GatheringByteChannel,?NetworkChannel {...... }public?abstract?class?AbstractSelectableChannelextends?SelectableChannel {....../***?AbstractSelectableChannel?中包含注冊方法,SocketChannel?實例*?借助該注冊方法注冊到?Selector?實例上去,該方法返回?SelectionKey*/public?final?SelectionKey?register(//?指明注冊到哪個?Selector?實例Selector?sel,?//?ops?是事件代碼,告訴?Selector?應該關注該通道的什么事件int?ops,//?附加信息?attachmentObject?att)?throws?ClosedChannelException?{......}...... }public?abstract?class?SelectionKey?{....../***?獲取該?SelectionKey?對應的?Channel*/public?abstract?SelectableChannel?channel();/***?獲取該?SelectionKey?對應的?Selector*/public?abstract?Selector?selector();....../***?事件代碼,上面的?ops?參數取這里的值*/public?static?final?int?OP_READ?=?1?<<?0;public?static?final?int?OP_WRITE?=?1?<<?2;public?static?final?int?OP_CONNECT?=?1?<<?3;public?static?final?int?OP_ACCEPT?=?1?<<?4;....../***?檢查該?SelectionKey?對應的?Channel?是否可讀*/public?final?boolean?isReadable()?{return?(readyOps()?&?OP_READ)?!=?0;}/***?檢查該?SelectionKey?對應的?Channel?是否可寫*/public?final?boolean?isWritable()?{return?(readyOps()?&?OP_WRITE)?!=?0;}/***?檢查該?SelectionKey?對應的?Channel?是否已經建立起?socket?連接*/public?final?boolean?isConnectable()?{return?(readyOps()?&?OP_CONNECT)?!=?0;}/***?檢查該?SelectionKey?對應的?Channel?是否準備好接受一個新的?socket?連接*/public?final?boolean?isAcceptable()?{return?(readyOps()?&?OP_ACCEPT)?!=?0;}/***?添加附件(例如?Buffer)*/public?final?Object?attach(Object?ob)?{return?attachmentUpdater.getAndSet(this,?ob);}/***?獲取附件*/public?final?Object?attachment()?{return?attachment;}...... }

下圖用于輔助讀者理解上面的過程和源碼:

首先說明,本文以 Linux 系統為對象來研究文件 IO 模型和網絡 IO 模型。

1.5. 零拷貝技術

注:本節討論的是 Linux 系統下的 IO 過程。并且對于零拷貝技術的講解采用了一種淺顯易懂但能觸及其本質的方式,因為這個話題,展開來講實在是有太多的細節要關注。

在“將本地磁盤中文件發送到網絡中”這一場景中,零拷貝技術是提升 IO 效率的一個利器,為了對比出零拷貝技術的優越性,下面依次給出使用直接 IO 技術、內存映射文件技術、零拷貝技術實現將本地磁盤文件發送到網絡中的過程。

1)直接 IO 技術

使用直接 IO 技術實現文件傳輸的過程如下圖所示。

上圖中,內核緩沖區是 Linux 系統的 Page Cahe。為了加快磁盤的 IO,Linux 系統會把磁盤上的數據以 Page 為單位緩存在操作系統的內存里,這里的 Page 是 Linux 系統定義的一個邏輯概念,一個 Page 一般為 4K。

可以看出,整個過程有四次數據拷貝,讀進來兩次,寫回去又兩次:磁盤-->內核緩沖區-->Socket 緩沖區-->網絡。

直接 IO 過程使用的 Linux 系統 API 為:

ssize_t?read(int?filedes,?void?*buf,?size_t?nbytes); ssize_t?write(int?filedes,?void?*buf,?size_t?nbytes);

等函數。

2)內存映射文件技術

使用內存映射文件技術實現文件傳輸的過程如下圖所示。

可以看出,整個過程有三次數據拷貝,不再經過應用程序內存,直接在內核空間中從內核緩沖區拷貝到 Socket 緩沖區。

內存映射文件過程使用的 Linux 系統 API 為:

void?*mmap(void?*addr,?size_t?length,?int?prot,?int?flags,?int?fd,?off_t?offset);

3)零拷貝技術

使用零拷貝技術,連內核緩沖區到 Socket 緩沖區的拷貝也省略了,如下圖所示:

內核緩沖區到 Socket 緩沖區之間并沒有做數據的拷貝,只是一個地址的映射。底層的網卡驅動程序要讀取數據并發送到網絡上的時候,看似讀取的是 Socket 的緩沖區中的數據,其實直接讀的是內核緩沖區中的數據。

零拷貝中所謂的“零”指的是內存中數據拷貝的次數為 0。

零拷貝過程使用的 Linux 系統 API 為:

ssize_t?sendfile(int?out_fd,?int?in_fd,?off_t?*offset,?size_t?count);

在 JDK 中,提供的:

FileChannel.transderTo(long?position,?long?count,?WritableByteChannel?target);

方法實現了零拷貝過程,其中的第三個參數可以傳入 SocketChannel 實例。例如客戶端使用以上的零拷貝接口向服務器傳輸文件的代碼為:

public?static?void?main(String[]?args)?throws?IOException?{SocketChannel?socketChannel?=?SocketChannel.open();socketChannel.connect(new?InetSocketAddress("127.0.0.1",?8080));String?fileName?=?"test.zip";//?得到一個文件?channelFileChannel?fileChannel?=?new?FileInputStream(fileName).getChannel();//?使用零拷貝?IO?技術發送long?transferSize?=?fileChannel.transferTo(0,?fileChannel.size(),?socketChannel);System.out.println("file?transfer?done,?size:?"?+?transferSize);fileChannel.close(); }

以上部分為第一章,學習 Netty 需要的基礎知識。

2. Netty 的架構與原理

2.1. 為什么要制造 Netty

既然 Java 提供了 NIO,為什么還要制造一個 Netty,主要原因是 Java NIO 有以下幾個缺點:

1)Java NIO 的類庫和 API 龐大繁雜,使用起來很麻煩,開發工作量大。

2)使用 Java NIO,程序員需要具備高超的 Java 多線程編碼技能,以及非常熟悉網絡編程,比如要處理斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常流處理等一系列棘手的工作。

3)Java NIO 存在 Bug,例如 Epoll Bug 會導致 Selector 空輪訓,極大耗費 CPU 資源。

Netty 對于 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述問題,提高了 IO 程序的開發效率和可靠性,同時 Netty:

1)設計優雅,提供阻塞和非阻塞的 Socket;提供靈活可拓展的事件模型;提供高度可定制的線程模型。

2)具備更高的性能和更大的吞吐量,使用零拷貝技術最小化不必要的內存復制,減少資源的消耗。

3)提供安全傳輸特性。

4)支持多種主流協議;預置多種編解碼功能,支持用戶開發私有協議。

**注:所謂支持 TCP、UDP、HTTP、WebSocket 等協議,就是說 Netty 提供了相關的編程類和接口,因此本文后面主要對基于 Netty 的 TCP Server/Client 開發案例進行講解,以展示 Netty 的核心原理,對于其他協議 Server/Client 開發不再給出示例,幫助讀者提升內力而非教授花招是我寫作的出發點 :-) **

下圖為 Netty 官網給出的 Netty 架構圖。

062

我們從其中的幾個關鍵詞就能看出 Netty 的強大之處:零拷貝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等協議;提供安全傳輸、壓縮、大文件傳輸、編解碼支持等等。

2.2. 幾種 Reactor 線程模式

傳統的 BIO 服務端編程采用“每線程每連接”的處理模型,弊端很明顯,就是面對大量的客戶端并發連接時,服務端的資源壓力很大;并且線程的利用率很低,如果當前線程沒有數據可讀,它會阻塞在 read 操作上。這個模型的基本形態如下圖所示(圖片來源于網絡)。

063

BIO 服務端編程采用的是 Reactor 模式(也叫做 Dispatcher 模式,分派模式),Reactor 模式有兩個要義:

1)基于 IO 多路復用技術,多個連接共用一個多路復用器,應用程序的線程無需阻塞等待所有連接,只需阻塞等待多路復用器即可。當某個連接上有新數據可以處理時,應用程序的線程從阻塞狀態返回,開始處理這個連接上的業務。

2)基于線程池技術復用線程資源,不必為每個連接創建專用的線程,應用程序將連接上的業務處理任務分配給線程池中的線程進行處理,一個線程可以處理多個連接的業務。

下圖反應了 Reactor 模式的基本形態(圖片來源于網絡):

064

Reactor 模式有兩個核心組成部分:

1)Reactor(圖中的 ServiceHandler):Reactor 在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理線程來對 IO 事件做出反應。

2)Handlers(圖中的 EventHandler):處理線程執行處理方法來響應 I/O 事件,處理線程執行的是非阻塞操作。

Reactor 模式就是實現網絡 IO 程序高并發特性的關鍵。它又可以分為單 Reactor 單線程模式、單 Reactor 多線程模式、主從 Reactor 多線程模式。

2.2.1. 單 Reactor 單線程模式

單 Reactor 單線程模式的基本形態如下(圖片來源于網絡):

065

這種模式的基本工作流程為:

1)Reactor 通過 select 監聽客戶端請求事件,收到事件之后通過 dispatch 進行分發

2)如果事件是建立連接的請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 Handler 對象處理連接建立后的后續業務處理。

3)如果事件不是建立連接的請求事件,則由 Reactor 對象分發給連接對應的 Handler 處理。

4)Handler 會完成 read-->業務處理-->send 的完整處理流程。

這種模式的優點是:模型簡單,沒有多線程、進程通信、競爭的問題,一個線程完成所有的事件響應和業務處理。當然缺點也很明顯:

1)存在性能問題,只有一個線程,無法完全發揮多核 CPU 的性能。Handler 在處理某個連接上的業務時,整個進程無法處理其他連接事件,很容易導致性能瓶頸。

2)存在可靠性問題,若線程意外終止,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障。

單 Reactor 單線程模式使用場景為:客戶端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間復雜度為 O(1)的情況。

2.2.2. 單 Reactor 多線程模式

單 Reactor 單線程模式的基本形態如下(圖片來源于網絡):

066

這種模式的基本工作流程為:

1)Reactor 對象通過 select 監聽客戶端請求事件,收到事件后通過 dispatch 進行分發。

2)如果事件是建立連接的請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 Handler 對象處理連接建立后的后續業務處理。

3)如果事件不是建立連接的請求事件,則由 Reactor 對象分發給連接對應的 Handler 處理。Handler 只負責響應事件,不做具體的業務處理,Handler 通過 read 讀取到請求數據后,會分發給后面的 Worker 線程池來處理業務請求。

4)Worker 線程池會分配獨立線程來完成真正的業務處理,并將處理結果返回給 Handler。Handler 通過 send 向客戶端發送響應數據。

這種模式的優點是可以充分的利用多核 cpu 的處理能力,缺點是多線程數據共享和控制比較復雜,Reactor 處理所有的事件的監聽和響應,在單線程中運行,面對高并發場景還是容易出現性能瓶頸。

2.2.3. 主從 Reactor 多線程模式

主從 Reactor 多線程模式的基本形態如下(第一章圖片來源于網絡,第二章圖片是 JUC 作者 Doug Lea 老師在《Scalable IO in Java》中給出的示意圖,兩張圖表達的含義一樣):

068069

針對單 Reactor 多線程模型中,Reactor 在單個線程中運行,面對高并發的場景易成為性能瓶頸的缺陷,主從 Reactor 多線程模式讓 Reactor 在多個線程中運行(分成 MainReactor 線程與 SubReactor 線程)。這種模式的基本工作流程為:

1)Reactor 主線程 MainReactor 對象通過 select 監聽客戶端連接事件,收到事件后,通過 Acceptor 處理客戶端連接事件。

2)當 Acceptor 處理完客戶端連接事件之后(與客戶端建立好 Socket 連接),MainReactor 將連接分配給 SubReactor。(即:MainReactor 只負責監聽客戶端連接請求,和客戶端建立連接之后將連接交由 SubReactor 監聽后面的 IO 事件。)

3)SubReactor 將連接加入到自己的連接隊列進行監聽,并創建 Handler 對各種事件進行處理。

4)當連接上有新事件發生的時候,SubReactor 就會調用對應的 Handler 處理。

5)Handler 通過 read 從連接上讀取請求數據,將請求數據分發給 Worker 線程池進行業務處理。

6)Worker 線程池會分配獨立線程來完成真正的業務處理,并將處理結果返回給 Handler。Handler 通過 send 向客戶端發送響應數據。

7)一個 MainReactor 可以對應多個 SubReactor,即一個 MainReactor 線程可以對應多個 SubReactor 線程。

這種模式的優點是:

1)MainReactor 線程與 SubReactor 線程的數據交互簡單職責明確,MainReactor 線程只需要接收新連接,SubReactor 線程完成后續的業務處理。

2)MainReactor 線程與 SubReactor 線程的數據交互簡單, MainReactor 線程只需要把新連接傳給 SubReactor 線程,SubReactor 線程無需返回數據。

3)多個 SubReactor 線程能夠應對更高的并發請求。

這種模式的缺點是編程復雜度較高。但是由于其優點明顯,在許多項目中被廣泛使用,包括 Nginx、Memcached、Netty 等。

這種模式也被叫做服務器的 1+M+N 線程模式,即使用該模式開發的服務器包含一個(或多個,1 只是表示相對較少)連接建立線程+M 個 IO 線程+N 個業務處理線程。這是業界成熟的服務器程序設計模式。

2.3. Netty 的模樣

Netty 的設計主要基于主從 Reactor 多線程模式,并做了一定的改進。本節將使用一種漸進式的描述方式展示 Netty 的模樣,即先給出 Netty 的簡單版本,然后逐漸豐富其細節,直至展示出 Netty 的全貌。

簡單版本的 Netty 的模樣如下:

070

關于這張圖,作以下幾點說明:

1)BossGroup 線程維護 Selector,ServerSocketChannel 注冊到這個 Selector 上,只關注連接建立請求事件(相當于主 Reactor)。

2)當接收到來自客戶端的連接建立請求事件的時候,通過 ServerSocketChannel.accept 方法獲得對應的 SocketChannel,并封裝成 NioSocketChannel 注冊到 WorkerGroup 線程中的 Selector,每個 Selector 運行在一個線程中(相當于從 Reactor)。

3)當 WorkerGroup 線程中的 Selector 監聽到自己感興趣的 IO 事件后,就調用 Handler 進行處理。

我們給這簡單版的 Netty 添加一些細節:

關于這張圖,作以下幾點說明:

1)有兩組線程池:BossGroup 和 WorkerGroup,BossGroup 中的線程(可以有多個,圖中只畫了一個)專門負責和客戶端建立連接,WorkerGroup 中的線程專門負責處理連接上的讀寫。

2)BossGroup 和 WorkerGroup 含有多個不斷循環的執行事件處理的線程,每個線程都包含一個 Selector,用于監聽注冊在其上的 Channel。

3)每個 BossGroup 中的線程循環執行以下三個步驟:

3.1)輪訓注冊在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)

3.2)處理 accept 事件,與客戶端建立連接,生成一個 NioSocketChannel,并將其注冊到 WorkerGroup 中某個線程上的 Selector 上

3.3)再去以此循環處理任務隊列中的下一個事件

4)每個 WorkerGroup 中的線程循環執行以下三個步驟:

4.1)輪訓注冊在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)

4.2)在對應的 NioSocketChannel 上處理 read/write 事件

4.3)再去以此循環處理任務隊列中的下一個事件

我們再來看下終極版的 Netty 的模樣,如下圖所示(圖片來源于網絡):

關于這張圖,作以下幾點說明:

1)Netty 抽象出兩組線程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每個線程池中都有 NioEventLoop 線程。BossGroup 中的線程專門負責和客戶端建立連接,WorkerGroup 中的線程專門負責處理連接上的讀寫。BossGroup 和 WorkerGroup 的類型都是 NioEventLoopGroup。

2)NioEventLoopGroup 相當于一個事件循環組,這個組中含有多個事件循環,每個事件循環就是一個 NioEventLoop。

3)NioEventLoop 表示一個不斷循環的執行事件處理的線程,每個 NioEventLoop 都包含一個 Selector,用于監聽注冊在其上的 Socket 網絡連接(Channel)。

4)NioEventLoopGroup 可以含有多個線程,即可以含有多個 NioEventLoop。

5)每個 BossNioEventLoop 中循環執行以下三個步驟:

5.1)select:輪訓注冊在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)

5.2)processSelectedKeys:處理 accept 事件,與客戶端建立連接,生成一個 NioSocketChannel,并將其注冊到某個 WorkerNioEventLoop 上的 Selector 上

5.3)runAllTasks:再去以此循環處理任務隊列中的其他任務

6)每個 WorkerNioEventLoop 中循環執行以下三個步驟:

6.1)select:輪訓注冊在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)

6.2)processSelectedKeys:在對應的 NioSocketChannel 上處理 read/write 事件

6.3)runAllTasks:再去以此循環處理任務隊列中的其他任務

7)在以上兩個processSelectedKeys步驟中,會使用 Pipeline(管道),Pipeline 中引用了 Channel,即通過 Pipeline 可以獲取到對應的 Channel,Pipeline 中維護了很多的處理器(攔截處理器、過濾處理器、自定義處理器等)。這里暫時不詳細展開講解 Pipeline。

2.4. 基于 Netty 的 TCP Server/Client 案例

下面我們寫點代碼來加深理解 Netty 的模樣。下面兩段代碼分別是基于 Netty 的 TCP Server 和 TCP Client。

服務端代碼為:

/***?需要的依賴:*?<dependency>*?<groupId>io.netty</groupId>*?<artifactId>netty-all</artifactId>*?<version>4.1.52.Final</version>*?</dependency>*/ public?static?void?main(String[]?args)?throws?InterruptedException?{//?創建?BossGroup?和?WorkerGroup//?1.?bossGroup?只處理連接請求//?2.?業務處理由?workerGroup?來完成EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{//?創建服務器端的啟動對象ServerBootstrap?bootstrap?=?new?ServerBootstrap();//?配置參數bootstrap//?設置線程組.group(bossGroup,?workerGroup)//?說明服務器端通道的實現類(便于?Netty?做反射處理).channel(NioServerSocketChannel.class)//?設置等待連接的隊列的容量(當客戶端連接請求速率大//?于?NioServerSocketChannel?接收速率的時候,會使用//?該隊列做緩沖)//?option()方法用于給服務端的?ServerSocketChannel//?添加配置.option(ChannelOption.SO_BACKLOG,?128)//?設置連接保活//?childOption()方法用于給服務端?ServerSocketChannel//?接收到的?SocketChannel?添加配置.childOption(ChannelOption.SO_KEEPALIVE,?true)//?handler()方法用于給?BossGroup?設置業務處理器//?childHandler()方法用于給?WorkerGroup?設置業務處理器.childHandler(//?創建一個通道初始化對象new?ChannelInitializer<SocketChannel>()?{//?向?Pipeline?添加業務處理器@Overrideprotected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{socketChannel.pipeline().addLast(new?NettyServerHandler());//?可以繼續調用?socketChannel.pipeline().addLast()//?添加更多?Handler}});System.out.println("server?is?ready...");//?綁定端口,啟動服務器,生成一個?channelFuture?對象,//?ChannelFuture?涉及到?Netty?的異步模型,后面展開講ChannelFuture?channelFuture?=?bootstrap.bind(8080).sync();//?對通道關閉進行監聽channelFuture.channel().closeFuture().sync();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();} }/***?自定義一個?Handler,需要繼承?Netty?規定好的某個?HandlerAdapter(規范)*?InboundHandler?用于處理數據流入本端(服務端)的?IO?事件*?InboundHandler?用于處理數據流出本端(服務端)的?IO?事件*/ static?class?NettyServerHandler?extends?ChannelInboundHandlerAdapter?{/***?當通道有數據可讀時執行**?@param?ctx?上下文對象,可以從中取得相關聯的?Pipeline、Channel、客戶端地址等*?@param?msg?客戶端發送的數據*?@throws?Exception*/@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)throws?Exception?{//?接收客戶端發來的數據System.out.println("client?address:?"+?ctx.channel().remoteAddress());//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高ByteBuf?byteBuf?=?(ByteBuf)?msg;System.out.println("data?from?client:?"+?byteBuf.toString(CharsetUtil.UTF_8));}/***?數據讀取完畢后執行**?@param?ctx?上下文對象*?@throws?Exception*/@Overridepublic?void?channelReadComplete(ChannelHandlerContext?ctx)throws?Exception?{//?發送響應給客戶端ctx.writeAndFlush(//?Unpooled?類是?Netty?提供的專門操作緩沖區的工具//?類,copiedBuffer?方法返回的?ByteBuf?對象類似于//?NIO?中的?ByteBuffer,但性能更高Unpooled.copiedBuffer("hello?client!?i?have?got?your?data.",CharsetUtil.UTF_8));}/***?發生異常時執行**?@param?ctx???上下文對象*?@param?cause?異常對象*?@throws?Exception*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)throws?Exception?{//?關閉與客戶端的?Socket?連接ctx.channel().close();} }

客戶端端代碼為:

/***?需要的依賴:*?<dependency>*?<groupId>io.netty</groupId>*?<artifactId>netty-all</artifactId>*?<version>4.1.52.Final</version>*?</dependency>*/ public?static?void?main(String[]?args)?throws?InterruptedException?{//?客戶端只需要一個事件循環組,可以看做?BossGroupEventLoopGroup?eventLoopGroup?=?new?NioEventLoopGroup();try?{//?創建客戶端的啟動對象Bootstrap?bootstrap?=?new?Bootstrap();//?配置參數bootstrap//?設置線程組.group(eventLoopGroup)//?說明客戶端通道的實現類(便于?Netty?做反射處理).channel(NioSocketChannel.class)//?handler()方法用于給?BossGroup?設置業務處理器.handler(//?創建一個通道初始化對象new?ChannelInitializer<SocketChannel>()?{//?向?Pipeline?添加業務處理器@Overrideprotected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{socketChannel.pipeline().addLast(new?NettyClientHandler());//?可以繼續調用?socketChannel.pipeline().addLast()//?添加更多?Handler}});System.out.println("client?is?ready...");//?啟動客戶端去連接服務器端,ChannelFuture?涉及到?Netty?的異步模型,后面展開講ChannelFuture?channelFuture?=?bootstrap.connect("127.0.0.1",8080).sync();//?對通道關閉進行監聽channelFuture.channel().closeFuture().sync();}?finally?{eventLoopGroup.shutdownGracefully();} }/***?自定義一個?Handler,需要繼承?Netty?規定好的某個?HandlerAdapter(規范)*?InboundHandler?用于處理數據流入本端(客戶端)的?IO?事件*?InboundHandler?用于處理數據流出本端(客戶端)的?IO?事件*/ static?class?NettyClientHandler?extends?ChannelInboundHandlerAdapter?{/***?通道就緒時執行**?@param?ctx?上下文對象*?@throws?Exception*/@Overridepublic?void?channelActive(ChannelHandlerContext?ctx)throws?Exception?{//?向服務器發送數據ctx.writeAndFlush(//?Unpooled?類是?Netty?提供的專門操作緩沖區的工具//?類,copiedBuffer?方法返回的?ByteBuf?對象類似于//?NIO?中的?ByteBuffer,但性能更高Unpooled.copiedBuffer("hello?server!",CharsetUtil.UTF_8));}/***?當通道有數據可讀時執行**?@param?ctx?上下文對象*?@param?msg?服務器端發送的數據*?@throws?Exception*/@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)throws?Exception?{//?接收服務器端發來的數據System.out.println("server?address:?"+?ctx.channel().remoteAddress());//?ByteBuf?是?Netty?提供的類,比?NIO?的?ByteBuffer?性能更高ByteBuf?byteBuf?=?(ByteBuf)?msg;System.out.println("data?from?server:?"+?byteBuf.toString(CharsetUtil.UTF_8));}/***?發生異常時執行**?@param?ctx???上下文對象*?@param?cause?異常對象*?@throws?Exception*/@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)throws?Exception?{//?關閉與服務器端的?Socket?連接ctx.channel().close();} }

什么?你覺得使用 Netty 編程難度和工作量更大了?不會吧不會吧,你要知道,你通過這么兩段簡短的代碼得到了一個基于主從 Reactor 多線程模式的服務器,一個高吞吐量和并發量的服務器,一個異步處理服務器……你還要怎樣?

對上面的兩段代碼,作以下簡單說明:

1)Bootstrap 和 ServerBootstrap 分別是客戶端和服務器端的引導類,一個 Netty 應用程序通常由一個引導類開始,主要是用來配置整個 Netty 程序、設置業務處理類(Handler)、綁定端口、發起連接等。

2)客戶端創建一個 NioSocketChannel 作為客戶端通道,去連接服務器。

3)服務端首先創建一個 NioServerSocketChannel 作為服務器端通道,每當接收一個客戶端連接就產生一個 NioSocketChannel 應對該客戶端。

4)使用 Channel 構建網絡 IO 程序的時候,不同的協議、不同的阻塞類型和 Netty 中不同的 Channel 對應,常用的 Channel 有:

  • NioSocketChannel:非阻塞的 TCP 客戶端 Channel(本案例的客戶端使用的 Channel)

  • NioServerSocketChannel:非阻塞的 TCP 服務器端 Channel(本案例的服務器端使用的 Channel)

  • NioDatagramChannel:非阻塞的 UDP Channel

  • NioSctpChannel:非阻塞的 SCTP 客戶端 Channel

  • NioSctpServerChannel:非阻塞的 SCTP 服務器端 Channel

    ......

啟動服務端和客戶端代碼,調試以上的服務端代碼,發現:

1)默認情況下 BossGroup 和 WorkerGroup 都包含 16 個線程(NioEventLoop),這是因為我的 PC 是 8 核的 NioEventLoop 的數量=coreNum*2。這 16 個線程相當于主 Reactor。

其實創建 BossGroup 和 WorkerGroup 的時候可以指定 NioEventLoop 數量,如下:

EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1); EventLoopGroup?workerGroup?=?new?NioEventLoopGroup(16);

這樣就能更好地分配線程資源。

2)每一個 NioEventLoop 包含如下的屬性(比如自己的 Selector、任務隊列、執行器等):

3)將代碼斷在服務端的 NettyServerHandler.channelRead 上:

可以看到 ctx 中包含的屬性如下:

可以看到:

  • 當前 ChannelHandlerContext ctx 是位于 ChannelHandlerContext 責任鏈中的一環,可以看到其 next、prev 屬性

  • 當前 ChannelHandlerContext ctx 包含一個 Handler

  • 當前 ChannelHandlerContext ctx 包含一個 Pipeline

  • Pipeline 本質上是一個雙向循環列表,可以看到其 tail、head 屬性

  • Pipeline 中包含一個 Channel,Channel 中又包含了該 Pipeline,兩者互相引用

    ……

從下一節開始,我將深入剖析以上兩段代碼,向讀者展示 Netty 的更多細節。

2.5. Netty 的 Handler 組件

無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 又繼承于 ChannelHandlerAdapter,ChannelHandlerAdapter 又實現了 ChannelHandler:

public?class?ChannelInboundHandlerAdapter?extends?ChannelHandlerAdapter?implements?ChannelInboundHandler?{...... public?abstract?class?ChannelHandlerAdapter?implements?ChannelHandler?{......

因此無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都可以統稱為 ChannelHandler。

Netty 中的 ChannelHandler 的作用是,在當前 ChannelHandler 中處理 IO 事件,并將其傳遞給 ChannelPipeline 中下一個 ChannelHandler 處理,因此多個 ChannelHandler 形成一個責任鏈,責任鏈位于 ChannelPipeline 中。

數據在基于 Netty 的服務器或客戶端中的處理流程是:讀取數據-->解碼數據-->處理數據-->編碼數據-->發送數據。其中的每個過程都用得到 ChannelHandler 責任鏈。

Netty 中的 ChannelHandler 體系如下(第一張圖來源于網絡):

其中:

  • ChannelInboundHandler 用于處理入站 IO 事件

  • ChannelOutboundHandler 用于處理出站 IO 事件

  • ChannelInboundHandlerAdapter 用于處理入站 IO 事件

  • ChannelOutboundHandlerAdapter 用于處理出站 IO 事件

ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例,如果事件的方向是從客戶端到服務器的,我們稱事件是出站的,那么客戶端發送給服務器的數據會通過 Pipeline 中的一系列 ChannelOutboundHandler 進行處理;如果事件的方向是從服務器到客戶端的,我們稱事件是入站的,那么服務器發送給客戶端的數據會通過 Pipeline 中的一系列 ChannelInboundHandler 進行處理。

無論是服務端代碼中自定義的 NettyServerHandler 還是客戶端代碼中自定義的 NettyClientHandler,都繼承于 ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter 提供的方法如下:

從方法名字可以看出,它們在不同的事件發生后被觸發,例如注冊 Channel 時執行 channelRegistred()、添加 ChannelHandler 時執行 handlerAdded()、收到入站數據時執行 channelRead()、入站數據讀取完畢后執行 channelReadComplete()等等。

2.6. Netty 的 Pipeline 組件

上一節說到,Netty 的 ChannelPipeline,它維護了一個 ChannelHandler 責任鏈,負責攔截或者處理 inbound(入站)和 outbound(出站)的事件和操作。這一節給出更深層次的描述。

ChannelPipeline 實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式,以及 Channel 中各個 ChannelHandler 如何相互交互。

每個 Netty Channel 包含了一個 ChannelPipeline(其實 Channel 和 ChannelPipeline 互相引用),而 ChannelPipeline 又維護了一個由 ChannelHandlerContext 構成的雙向循環列表,其中的每一個 ChannelHandlerContext 都包含一個 ChannelHandler。(前文描述的時候為了簡便,直接說 ChannelPipeline 包含了一個 ChannelHandler 責任鏈,這里給出完整的細節。)

如下圖所示(圖片來源于網絡):

還記得下面這張圖嗎?這是上文中基于 Netty 的 Server 程序的調試截圖,可以從中看到 ChannelHandlerContext 中包含了哪些成分:

ChannelHandlerContext 除了包含 ChannelHandler 之外,還關聯了對應的 Channel 和 Pipeline。可以這么來講:ChannelHandlerContext、ChannelHandler、Channel、ChannelPipeline 這幾個組件之間互相引用,互為各自的屬性,你中有我、我中有你。

在處理入站事件的時候,入站事件及數據會從 Pipeline 中的雙向鏈表的頭 ChannelHandlerContext 流向尾 ChannelHandlerContext,并依次在其中每個 ChannelInboundHandler(例如解碼 Handler)中得到處理;出站事件及數據會從 Pipeline 中的雙向鏈表的尾 ChannelHandlerContext 流向頭 ChannelHandlerContext,并依次在其中每個 ChannelOutboundHandler(例如編碼 Handler)中得到處理。

2.7. Netty 的 EventLoopGroup 組件

在基于 Netty 的 TCP Server 代碼中,包含了兩個 EventLoopGroup——bossGroup 和 workerGroup,EventLoopGroup 是一組 EventLoop 的抽象。

追蹤 Netty 的 EventLoop 的繼承鏈,可以發現 EventLoop 最終繼承于 JUC Executor,因此 EventLoop 本質就是一個 JUC Executor,即線程,JUC Executor 的源碼為:

public?interface?Executor?{/***?Executes?the?given?command?at?some?time?in?the?future.*/void?execute(Runnable?command); }

Netty 為了更好地利用多核 CPU 的性能,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護著一個 Selector 實例,Selector 實例監聽注冊其上的 Channel 的 IO 事件。

EventLoopGroup 含有一個 next 方法,它的作用是按照一定規則從 Group 中選取一個 EventLoop 處理 IO 事件。

在服務端,通常 Boss EventLoopGroup 只包含一個 Boss EventLoop(單線程),該 EventLoop 維護者一個注冊了 ServerSocketChannel 的 Selector 實例。該 EventLoop 不斷輪詢 Selector 得到 OP_ACCEPT 事件(客戶端連接事件),然后將接收到的 SocketChannel 交給 Worker EventLoopGroup,Worker EventLoopGroup 會通過 next()方法選取一個 Worker EventLoop 并將這個 SocketChannel 注冊到其中的 Selector 上,由這個 Worker EventLoop 負責該 SocketChannel 上后續的 IO 事件處理。整個過程如下圖所示:

2.8. Netty 的 TaskQueue

在 Netty 的每一個 NioEventLoop 中都有一個 TaskQueue,設計它的目的是在任務提交的速度大于線程的處理速度的時候起到緩沖作用。或者用于異步地處理 Selector 監聽到的 IO 事件。

Netty 中的任務隊列有三種使用場景:

1)處理用戶程序的自定義普通任務的時候

2)處理用戶程序的自定義定時任務的時候

3)非當前 Reactor 線程調用當前 Channel 的各種方法的時候。

對于第一種場景,舉個例子,2.4 節的基于 Netty 編寫的服務端的 Handler 中,假如 channelRead 方法中執行的過程很耗時,那么以下的阻塞式處理方式無疑會降低當前 NioEventLoop 的并發度:

/***?當通道有數據可讀時執行**?@param?ctx?上下文對象*?@param?msg?客戶端發送的數據*?@throws?Exception*/ @Override public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)throws?Exception?{//?借助休眠模擬耗時操作Thread.sleep(LONG_TIME);ByteBuf?byteBuf?=?(ByteBuf)?msg;System.out.println("data?from?client:?"+?byteBuf.toString(CharsetUtil.UTF_8)); }

改進方法就是借助任務隊列,代碼如下:

/***?當通道有數據可讀時執行**?@param?ctx?上下文對象*?@param?msg?客戶端發送的數據*?@throws?Exception*/ @Override public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)throws?Exception?{//?假如這里的處理非常耗時,那么就需要借助任務隊列異步執行final?Object?finalMsg?=?msg;//?通過?ctx.channel().eventLoop().execute()將耗時//?操作放入任務隊列異步執行ctx.channel().eventLoop().execute(new?Runnable()?{public?void?run()?{//?借助休眠模擬耗時操作try?{Thread.sleep(LONG_TIME);}?catch?(InterruptedException?e)?{e.printStackTrace();}ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;System.out.println("data?from?client:?"+?byteBuf.toString(CharsetUtil.UTF_8));}});//?可以繼續調用?ctx.channel().eventLoop().execute()//?將更多操作放入隊列System.out.println("return?right?now."); }

斷點跟蹤這個函數的執行,可以發現該耗時任務確實被放入的當前 NioEventLoop 的 taskQueue 中了。

對于第二種場景,舉個例子,2.4 節的基于 Netty 編寫的服務端的 Handler 中,假如 channelRead 方法中執行的過程并不需要立即執行,而是要定時執行,那么代碼可以這樣寫:

/***?當通道有數據可讀時執行**?@param?ctx?上下文對象*?@param?msg?客戶端發送的數據*?@throws?Exception*/ @Override public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)throws?Exception?{final?Object?finalMsg?=?msg;//?通過?ctx.channel().eventLoop().schedule()將操作//?放入任務隊列定時執行(5min?之后才進行處理)ctx.channel().eventLoop().schedule(new?Runnable()?{public?void?run()?{ByteBuf?byteBuf?=?(ByteBuf)?finalMsg;System.out.println("data?from?client:?"+?byteBuf.toString(CharsetUtil.UTF_8));}},?5,?TimeUnit.MINUTES);//?可以繼續調用?ctx.channel().eventLoop().schedule()//?將更多操作放入隊列System.out.println("return?right?now."); }

斷點跟蹤這個函數的執行,可以發現該定時任務確實被放入的當前 NioEventLoop 的 scheduleTasjQueue 中了。

對于第三種場景,舉個例子,比如在基于 Netty 構建的推送系統的業務線程中,要根據用戶標識,找到對應的 SocketChannel 引用,然后調用 write 方法向該用戶推送消息,這時候就會將這一 write 任務放在任務隊列中,write 任務最終被異步消費。這種情形是對前兩種情形的應用,且涉及的業務內容太多,不再給出示例代碼,讀者有興趣可以自行完成,這里給出以下提示:

2.9. Netty 的 Future 和 Promise

Netty**對使用者提供的多數 IO 接口(即 Netty Channel 中的 IO 方法)**是異步的(即都立即返回一個 Netty Future,而 IO 過程異步進行),因此,調用者調用 IO 操作后是不能直接拿到調用結果的。要想得到 IO 操作結果,可以借助 Netty 的 Future(上面代碼中的 ChannelFuture 就繼承了 Netty Future,Netty Future 又繼承了 JUC Future)查詢執行狀態、等待執行結果、獲取執行結果等,使用過 JUC Future 接口的同學會非常熟悉這個機制,這里不再展開描述了。也可以通過 Netty Future 的 addListener()添加一個回調方法來異步處理 IO 結果,如下:

//?啟動客戶端去連接服務器端 //?由于?bootstrap.connect()是一個異步操作,因此用.sync()等待 //?這個異步操作完成 final?ChannelFuture?channelFuture?=?bootstrap.connect("127.0.0.1",8080).sync();channelFuture.addListener(new?ChannelFutureListener()?{/***?回調方法,上面的?bootstrap.connect()操作執行完之后觸發*/public?void?operationComplete(ChannelFuture?future)throws?Exception?{if?(channelFuture.isSuccess())?{System.out.println("client?has?connected?to?server!");//?TODO?其他處理}?else?{System.out.println("connect?to?serverfail!");//?TODO?其他處理}} });

Netty Future 提供的接口有:

注:會有一些資料給出這樣的描述:“Netty 中所有的 IO 操作都是異步的”,這顯然是錯誤的。Netty 基于 Java NIO,Java NIO 是同步非阻塞 IO。Netty 基于 Java NIO 做了封裝,向使用者提供了異步特性的接口,因此本文說 Netty**對使用者提供的多數 IO 接口(即 Netty Channel 中的 IO 方法)**是異步的。例如在 io.netty.channel.ChannelOutboundInvoker(Netty Channel 的 IO 方法多繼承于此)提供的多數 IO 接口都返回 Netty Future:

Promise 是可寫的 Future,Future 自身并沒有寫操作相關的接口,Netty 通過 Promise 對 Future 進行擴展,用于設置 IO 操作的結果。Future 繼承了 Future,相關的接口定義如下圖所示,相比于上圖 Future 的接口,它多出了一些 setXXX 方法:

Netty 發起 IO 寫操作的時候,會創建一個新的 Promise 對象,例如調用 ChannelHandlerContext 的 write(Object object)方法時,會創建一個新的 ChannelPromise,相關代碼如下:

@Override public?ChannelFuture?write(Object?msg)?{return?write(msg,?newPromise()); } ...... @Override public?ChannelPromise?newPromise()?{return?new?DefaultChannelPromise(channel(),?executor()); } ......

當 IO 操作發生異常或者完成時,通過 Promise.setSuccess()或者 Promise.setFailure()設置結果,并通知所有 Listener。關于 Netty 的 Future/Promise 的工作原理,我將在下一篇文章中進行源碼級的解析。

3. 結束語

我想,到此為止,讀者再次看到這幅 Netty 的架構圖會有不一樣的感覺。它變得簡潔、生動、優雅,因為你已經熟知了它的細節和運作流程。



參考資料:

  • Netty官網文檔,https://netty.io/wiki/all-documents.html

  • 《Netty權威指南(第一版)》,李林鋒

  • 《Netty in Action》,Norman Maurer

  • 《Scalable IO in Java》,Doug Lea

  • 尚硅谷Netty系列教程,韓順平主講

  • 最后歡迎大家關注我的公號,加我好友:「GG_Stone」,一起交流,共同進步!

    往期推薦

    千萬不要這樣寫代碼!9種常見的OOM場景演示

    2020-11-30

    求求你,不要再使用!=null判空了!

    2020-12-01

    這8種常見的SQL錯誤用法,你還在用嗎?

    2020-11-27

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的45 张图深度解析 Netty 架构与原理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。