创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动
前言
前三篇文章分別分析了 Netty 服務(wù)端 channel 的初始化、注冊以及綁定過程的源碼,理論上這篇文章應(yīng)該開始分析新連接接入過程的源碼了,但是在看源碼的過程中,發(fā)現(xiàn)有一個非常重要的組件:NioEventLoop,出現(xiàn)得非常頻繁,以至于影響到了后面源碼的閱讀,因此決定先分析下NioEventLoop的源碼,再分析新連接接入的源碼。關(guān)于NioEventLoop這個組件的源碼分析,將會寫兩篇文章來分享。第一篇文章將主要分析NioEventLoop 的創(chuàng)建與啟動,第二篇將主要分析NioEventLoop 的執(zhí)行流程。
在開始之前,先來思考一下一下兩個問題。
功能說明
NioEventLoop 從功能上,可以把它當(dāng)做一個線程來理解,當(dāng)它啟動以后,它就會不停地循環(huán)處理三種任務(wù)(從類名上也能體現(xiàn)出循環(huán)處理的思想:Loop)。這三種任務(wù)分別是哪三種任務(wù)呢?
NioEventLoop 類的繼承關(guān)系特別復(fù)雜,它的 UML 圖如下。
從圖中可以看到,它實(shí)現(xiàn)了ScheduledExecutorService接口,因此它可以實(shí)現(xiàn)定時任務(wù)相關(guān)的功能;同時它還繼承了SingleThreadEventExecutor類,從類名看,這是一個單線程的線程執(zhí)行器。
創(chuàng)建流程
在 netty 中,我們通過NioEventLoopGroup來創(chuàng)建NioEventLoop,入口就是下面這一行代碼。
EventLoopGroup workerGroup = new NioEventLoopGroup()當(dāng)使用NioEventLoopGroup的無參構(gòu)造器時,netty 會默認(rèn)創(chuàng)建2 倍 CPU 核數(shù)數(shù)量的 NioEventLoop;當(dāng)使用 NioEventLoopGroup 的有參構(gòu)造方法時,向構(gòu)造方法中傳入一個 int 值,就表示創(chuàng)建指定個數(shù)的 NioEventLoop。無論是使用 NioEventLoopGroup 有參構(gòu)造方法,還是無參構(gòu)造方法,最終都會調(diào)用到 NioEventLoopGroup 類中的如下構(gòu)造方法。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {// 調(diào)用父類super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }這個構(gòu)造方法有很多參數(shù),此時每個參數(shù)的解釋如下。
- nThreads:要創(chuàng)建的線程的數(shù)量,如果前面使用的是 NioEventLoopGroup 無參構(gòu)造器,此時 nThreads 的值為 0,如果使用的是 NioEventLoopGroup 的有參構(gòu)造方法,nThreads 的值為構(gòu)造方法中傳入的值。
- executor:線程執(zhí)行器,默認(rèn)是 null,這個屬性的值會在后面創(chuàng)建 NioEventLoop 時,進(jìn)行初始化。用戶可以自定義實(shí)現(xiàn) executor,如果用戶自定義了,那么此時 executor 就不為 null,后面就不會再進(jìn)行初始化。
- selectorProvider:SelectorProvider 類型,它是通過SelectorProvider.provider() 創(chuàng)建出來的,這是 JDK 中 NIO 相關(guān)的 API,會創(chuàng)建出一個 SelectorProvider 對象,這個對象的作用就是創(chuàng)建多路復(fù)用器 Selector 和服務(wù)端 channel。
- selectStrategyFactory:選擇策略工廠,通過 DefaultSelectStrategyFactory.INSTANCE 創(chuàng)建,INSTANCE這個常量的值又是通過new DefaultSelectStrategyFactory() 來創(chuàng)建的。
- RejectedExecutionHandlers.reject():返回的是一個拒絕策略,當(dāng)向線程池中添加任務(wù)時,如果線程池任務(wù)隊(duì)列已滿,這個時候任務(wù)就會被拒絕,此時線程池就會執(zhí)行拒絕策略。
接著又會調(diào)用父類的構(gòu)造方法,NioEventLoopGroup直接繼承了MultithreadEventLoopGroup類,此時會調(diào)用到MultithreadEventLoopGroup的如下構(gòu)造方法。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }可以看到,構(gòu)造方法的參數(shù)中,有一個 args 參數(shù),是一個 Object 類型的可變數(shù)組。因此當(dāng)在 NioEventLoopGroup 的構(gòu)造方法調(diào)用到父類中時,selectorProvider、selectStrategyFactory、RejectedExecutionHandlers.reject() 都變成了 args 這個可變數(shù)組中的元素了。另外,我們從代碼中可以知道,如果前面?zhèn)鬟f過來的 nThread 為 0,那么就令 nThread 的值等于DEFAULT_EVENT_LOOP_THREADS,而DEFAULT_EVENT_LOOP_THREADS這個常量的值就是 2 倍的 CPU 核數(shù);如果前面?zhèn)鬟f過來的 nThread 不為 0,就使用傳遞過來的 nThread。
接著繼續(xù)向上調(diào)用父類的構(gòu)造器,MultithreadEventLoopGroup 繼承了MultithreadEventExecutorGroup類,因此會調(diào)用到MultithreadEventExecutorGroup的如下構(gòu)造方法。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }其中nThreads、executor、args這些參數(shù)就是前面?zhèn)鬟^來,這里就不多說了。然后通過DefaultEventExecutorChooserFactory.INSTANCE創(chuàng)建的是一個事件執(zhí)行選擇工廠,INSTANCE 常量的值是通過new DefaultEventExecutorChooserFactory() 創(chuàng)建出來的對象。 接著又通過 this 調(diào)用了 MultithreadEventExecutorGroup 類中的另一個構(gòu)造方法,接下來這個構(gòu)造方法就是核心代碼了。該構(gòu)造方法的代碼很長,為了方便閱讀,我進(jìn)行了精簡,精簡后的源碼如下。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {/*** 創(chuàng)建線程執(zhí)行器:ThreadPerTaskExecutor* newDefaultThreadFactory()會創(chuàng)建一個線程工廠,該線程工廠的作用就是用來創(chuàng)建線程,同時給線程設(shè)置名稱:nioEventLoop-1-XX*/executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}// 根據(jù)傳進(jìn)來的線程數(shù),來創(chuàng)建指定大小的數(shù)組大小,這個數(shù)組就是用來存放NioEventLoop對象實(shí)例children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {//出現(xiàn)異常標(biāo)識boolean success = false;try {//創(chuàng)建nThreads個nioEventLoop保存到children數(shù)組中children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {// 異常處理...}}// 通過線程執(zhí)行器選擇工廠來創(chuàng)建一個線程執(zhí)行器chooser = chooserFactory.newChooser(children);// 省略部分代碼... }這個方法中,有三處主要的邏輯。第一處邏輯:當(dāng) executor 為空時,創(chuàng)建一個ThreadPerTaskExecutor類型的線程執(zhí)行器;第二處邏輯:通過newChild(executor, args) 來創(chuàng)建NIoEventLoop;第三處:通過chooserFactory.newChooser(children) 來創(chuàng)建一個線程執(zhí)行器的選擇器。下面將逐步詳細(xì)分析這三處邏輯。
創(chuàng)建線程執(zhí)行器
if (executor == null) {/*** 創(chuàng)建線程執(zhí)行器:ThreadPerTaskExecutor* newDefaultThreadFactory()會創(chuàng)建一個線程工廠,該線程工廠的作用就是用來創(chuàng)建線程,同時給線程設(shè)置名稱:nioEventLoop-1-XX*/executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }在第一處核心邏輯處,首先判斷 executor 是否為空,如果用戶沒有自己指定,默認(rèn)情況下,executor 是 null,因此就會通過 new 關(guān)鍵字來創(chuàng)建一個ThreadPerTaskExecutor類型的線程執(zhí)行器。
在調(diào)用ThreadPerTaskExecutor的構(gòu)造方法之前,先通過new DefaultThreadFactory() 創(chuàng)建了一個線程工廠,該線程工廠是DefaultThreadFactory類型,它實(shí)現(xiàn)了 ThreadFactory 接口,它的作用就是:當(dāng)調(diào)用 threadFactory 的 newThread()方法時,就會創(chuàng)建出一個線程,同時給線程取一個有意義的名稱,名稱生成規(guī)則為:nioEventLoop-xx-xx。第一個 xx 的含義表示的 NiEventLoopGroup 的組號,在 netty 中可能同時創(chuàng)建 bossGroup 和 workerGroup 兩個線程組,所以第一個 xx 表示線程組的序號。第二個 xx 表示的是線程在線程組中的序號。如:nioEventLoop-1-1 表示的是該線程是第一個 NioEventLoopGroup 線程組的第一個線程。
ThreadPerTaskExecutor類的源碼比較簡單,它實(shí)現(xiàn)了Executor接口,重寫了execute() 方法,當(dāng)每次調(diào)用ThreadPerTaskExecutor類的execute() 方法時,會創(chuàng)建一個線程,并啟動線程。這里可能會有一個疑問:每次調(diào)用execute() 方法,都會創(chuàng)建一個線程,豈不是意味著會創(chuàng)建很多線程?實(shí)際上,在每個NioEventLoop中,只會調(diào)用一次ThreadPerTaskExecutor的execute() 方法,因此對于每個NioEventLoop而言,只會創(chuàng)建一個線程,且當(dāng)線程啟動后,就不會再調(diào)用ThreadPerTaskExecutor的execute() 方法了,也就不會造成在系統(tǒng)中創(chuàng)建多個線程。
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {// threadFactory就是前面創(chuàng)建的DefaultThreadFactory// 通過線程工廠的newThread()方法來創(chuàng)建一個線程,并啟動線程threadFactory.newThread(command).start();} }創(chuàng)建 NioEventLoop
// 根據(jù)傳進(jìn)來的線程數(shù),來創(chuàng)建指定大小的數(shù)組大小,這個數(shù)組就是用來存放NioEventLoop對象實(shí)例 children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {//出現(xiàn)異常標(biāo)識boolean success = false;try {//創(chuàng)建nThreads個nioEventLoop保存到children數(shù)組中children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {// 異常處理...} }在執(zhí)行第二處核心邏輯之前,先創(chuàng)建了一個EventExecutor類型的數(shù)組,數(shù)組的大小就是前面?zhèn)鬟M(jìn)來的線程個數(shù),然后將數(shù)組賦值給children屬性,這個屬性是 NioEventLoopGroup 的屬性,NioEventLoopGroup 包含一組 NioEventLoop 線程,children 屬性就是用來存放這一組 NioEventLoop 線程的。此時只是創(chuàng)建出了數(shù)組,但是數(shù)組中的元素都是 null,所以接下來通過 for 循環(huán)來為數(shù)組填充元素,通過newChild(executor, args) 創(chuàng)建出一個 NioEventLoop 對象,然后將對象賦值給數(shù)組中的元素。
當(dāng)調(diào)用newChild(executor, args) 方法時,第一個參數(shù) executor 就是上一步創(chuàng)建出來的ThreadPerTaskExecutor對象,第二個參數(shù)是一個可變數(shù)組,它的每一個元素是什么,有什么作用,在前面已經(jīng)解釋過了。newChild(executor, args) 定義在 NioEventLoopGroup 類中,源碼如下。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {/*** executor: ThreadPerTaskExecutor* args: args是一個可變數(shù)組的參數(shù),實(shí)際上它包含三個元素,也就是前面?zhèn)鬟f過來的三個參數(shù),如下:* SelectorProvider.provider()是JDK中NIO相關(guān)的API,會創(chuàng)建出一個SelectorProvider,它的作用就是在后面創(chuàng)建多路復(fù)用器Selector和服務(wù)端channel* DefaultSelectStrategyFactory.INSTANCE 是一個默認(rèn)選擇策略工廠,new DefaultSelectStrategyFactory()* RejectedExecutionHandlers.reject()返回的是一個拒絕策略,當(dāng)向線程池中添加任務(wù)時,如果線程池任務(wù)隊(duì)列已滿,這個時候任務(wù)就會被拒絕,然后執(zhí)行拒絕策略**/return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }可以看見,在newChild() 中直接調(diào)用了 NioEventLoop 的構(gòu)造方法。NioEventLoop 的構(gòu)造方法源碼如下。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {/*** executor: ThreadPerTaskExecutor* args: args是一個可變數(shù)組的參數(shù),實(shí)際上它包含三個元素,也就是前面?zhèn)鬟f過來的三個參數(shù),如下:* SelectorProvider.provider()是JDK中NIO相關(guān)的API,會創(chuàng)建出一個SelectorProvider,它的作用就是在后面創(chuàng)建多路復(fù)用器Selector和服務(wù)端channel* DefaultSelectStrategyFactory.INSTANCE 是一個默認(rèn)選擇策略工廠,new DefaultSelectStrategyFactory()* RejectedExecutionHandlers.reject()返回的是一個拒絕策略,當(dāng)向線程池中添加任務(wù)時,如果線程池任務(wù)隊(duì)列已滿,這個時候任務(wù)就會被拒絕,然后執(zhí)行拒絕策略**/super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// openSelector()方法會創(chuàng)建一個多路復(fù)用器,但是這個多路復(fù)用器的selectedKey的底層數(shù)據(jù)接口被替換了final SelectorTuple selectorTuple = openSelector();//替換了數(shù)據(jù)結(jié)構(gòu)selectedKeys publicSelectedKeys的原生selectorselector = selectorTuple.selector;//子類包裝的selector 底層數(shù)據(jù)結(jié)構(gòu)也是被替換了的unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy; }在 NioEventLoop 的構(gòu)造方法中主要干了兩件事,一是繼續(xù)向上調(diào)用父類的構(gòu)造方法,二是調(diào)用openSelector() 方法。在父類的構(gòu)造方法中,會初始化兩個任務(wù)隊(duì)列:tailTasks 和 taskQueue,最終兩個屬性創(chuàng)建出來的都是MpscQueue類型的隊(duì)列,同時還將傳入的 executor 進(jìn)行了一次包裝,通過 ThreadExecutorMap 將其包裝成了一個匿名類。(MpscQueue 是個什么東西呢?它是many producer single consumer的簡寫,意思就是同一時刻可以有多個生產(chǎn)者往隊(duì)列中存東西,但是同一時刻只允許一個線程從隊(duì)列中取東西。)
接著是調(diào)用openSelector() 來創(chuàng)建多路復(fù)用器 Selector,然后將多路復(fù)用器保存到 NioEventLoop 當(dāng)中,在這一步 Netty 對多路復(fù)用器進(jìn)行了優(yōu)化。原生的 Selector 底層存放 SelectionKey 的數(shù)據(jù)結(jié)構(gòu)是HashSet,HashSet 在極端情況下,添加操作的時間復(fù)雜度是 O(n) ,Netty 則將 HashSet 類型替換成了數(shù)組類型,這樣添加操作的時間復(fù)雜度始終是 O(1) 。openSelector()方法的源碼很長,下面以圖片的方式貼出其源碼,你也可以直接跳過源碼,看我后面的總結(jié)。
openSelector()方法的源碼很長,經(jīng)過整理后,可以總結(jié)為如下幾個步驟:
- 先調(diào)用 JDK 的 API 創(chuàng)建多路復(fù)用器 Selector:provider.openSelector();
- 通過DISABLE_KEY_SET_OPTIMIZATION屬性判斷是否禁用優(yōu)化,如果為 true,則表示不進(jìn)行底層數(shù)據(jù)結(jié)構(gòu)的替換,即不優(yōu)化,直接返回原生的 Selector。DISABLE_KEY_SET_OPTIMIZATION常量的含義是是否禁用優(yōu)化:即是否禁止替換底層數(shù)據(jù)結(jié)構(gòu),默認(rèn)為 false,不禁止優(yōu)化。可以通過 io.netty.noKeySetOptimization 來配置。
- 通過反射加載 SelectorImpl:Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader()) ;
- 通過反射獲取原生 SelectorImpl 中的selectedKeys、publicSelectedKeys屬性(這兩個屬性的數(shù)據(jù)類型是HashSet 類型),然后再將這兩個屬性的訪問權(quán)限設(shè)置為 true,接著再通過反射,將selectedKeys、publicSelectedKeys這連個屬性的類型替換為 Netty 中自定義的數(shù)據(jù)類型:SelectedSelectionKeySet。該類型的底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組類型;
- 最后將SelectedSelectionKeySet封裝到 netty 自定義的多路復(fù)用器SelectedSelectionKeySetSelector中,然后將 JDK 原生的 Selector 和 Netty 自定義的 Selector 封裝到SelectorTuple中,再將SelectorTuple返回。注意:這里原生的 selector 的底層數(shù)據(jù)結(jié)構(gòu)在返回時已經(jīng)被替換成了數(shù)組。
至此,NioEventLoop 的創(chuàng)建已經(jīng)完成了,總結(jié)一下創(chuàng)建 NioEventLoop 的創(chuàng)建過程干了哪些事。
- 初始化了兩個隊(duì)列:taskQueue 和 tailQueue,類型均為 MpscQueue。taskQueue 隊(duì)列是用來存放任務(wù)的隊(duì)列,后面 NioEventLoop 啟動后,就會循環(huán)的從這個隊(duì)列中取出任務(wù)執(zhí)行;tailQueue 是用來存放一些收尾工作的隊(duì)列。
- 將前面?zhèn)魅氲腡hreadPerTaskExecutor通過ThreadExecutorMap將其包裝成了一個匿名類,然后保存到 NioEventLoop 的 executor 屬性中,后面就能通過 NioEventLoop 來獲取到線程執(zhí)行器,然后執(zhí)行任務(wù)了。
- 將拒絕策略:RejectedExecutionHandlers.reject()和選擇策略工廠 DefaultSelectStrategyFactory.INSTANCE 保存到 NioEventLoop 中,方便后面從 NioEventLoop 中獲取。
- 將 JDK 原生的多路復(fù)用器 Selector 保存到 NioEventLoop 的unwrappedSelector屬性中,將 Netty 自定義的多路復(fù)用器 SelectedSelectionKeySetSelector 保存到 NioEventLoop 的selector屬性中。unwrappedSelector 和 selector 底層的數(shù)據(jù)類型都是數(shù)組類型。
線程執(zhí)行器選擇工廠
當(dāng) NioEventLoop 全部創(chuàng)建完成后,就會接著執(zhí)行第三處核心邏輯,這一步做的工作是通過一個選擇工廠來創(chuàng)建一個線程執(zhí)行器的選擇器,即給 chooser 屬性賦值。看到這兒,可能有點(diǎn)懵,什么意思呢?為什么要創(chuàng)建這個選擇器呢?
Netty 的 NioEventLoopGroup 包含了一組線程,即一組 NioEventLoop,當(dāng)有新的連接接入到服務(wù)端后,后面需要對這個新連接來進(jìn)行 IO 事件的讀寫,那這個時候需要使用一個 NioEventLoop 來和這個新連接綁定,也就是和客戶端 channel 綁定,后續(xù)對這個客戶端 channel 的數(shù)據(jù)讀寫都是基于綁定的這個 NioEventLoop 來進(jìn)行的。既然有多個 NioEventLoop 線程,那么這個時候應(yīng)該從線程組中選擇哪一個 NioEventLoop 來和客戶端 channel 綁定呢?
Netty 的做法是:輪詢,第一個客戶端 channel 來了后,取線程組中的第一個線程,即 children 數(shù)組中的第一個元素;然后當(dāng)?shù)诙€線程來時,取數(shù)組中的第二個元素,以此類推,循環(huán)的從 children 數(shù)組中取 NioEventLoop。這個算法很簡單,如何實(shí)現(xiàn)呢?就是每來一個客戶端 channel,先獲取計(jì)數(shù)器的值,然后用計(jì)數(shù)器的值對數(shù)組取模,然后再將計(jì)數(shù)器加一。
由于取模運(yùn)算相對于位運(yùn)算而言,是一個相對耗時的過程,因此 netty 對此進(jìn)行了優(yōu)化。當(dāng)線程數(shù)是 2 的整數(shù)次方時,netty 就采用位運(yùn)算的方式來進(jìn)行取模運(yùn)算;當(dāng)線程數(shù)不是 2 的整數(shù)次方時,netty 就還是采用取模的方法去進(jìn)行計(jì)算。這兩種計(jì)算方法分別是由兩個類來實(shí)現(xiàn)的:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,這兩個類都是 EventExecutorChooser 類型,翻譯過來就是事件執(zhí)行器的選擇器。
而 chooser 就是這兩個選擇器的實(shí)例,究竟是PowerOfTwoEventExecutorChooser類型的實(shí)例還是GenericEventExecutorChooser類型的實(shí)例呢,這取決于 nThread 的數(shù)量。newChooser(EventExecutor[] executors) 方法的源碼如下。
public EventExecutorChooser newChooser(EventExecutor[] executors) {// executors是 new NioEventLoop() 的對象數(shù)組,// executors.length的值就是前面nThread參數(shù)的值if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);} }isPowerOfTwo(int val) 方法就是判斷傳入的值是否是 2 的整數(shù)次方。如何判斷呢?又是通過位運(yùn)算。下面代碼可能不太直觀,舉個栗子:比如傳入的參數(shù)是 8,那么 8 和-8 用二進(jìn)制表示就是:
8: 00000000000000000000000000001000 -8: 11111111111111111111111111111000將 8 和-8 進(jìn)行與運(yùn)算,結(jié)果還是 8,與原數(shù)值相等,因此 8 是 2 的整數(shù)次方。
private static boolean isPowerOfTwo(int val) {return (val & -val) == val; }至此,NioEventLoopGroup 的創(chuàng)建過程就結(jié)束了,那么 NioEventLoop 的創(chuàng)建過程也就跟著結(jié)束了。那么問題來了,我們說 NioEventLoop 實(shí)際上就是一個線程,既然是線程,它就必須先啟動,才能輪詢地執(zhí)行任務(wù),而在整個創(chuàng)建過程的源碼中,我們都沒有看到 NioEventLoop 線程啟動相關(guān)的代碼,那么 NioEventLoop 是什么時候啟動的呢?
啟動
NioEventLoop 啟動的觸發(fā)時機(jī)有兩個,一是在服務(wù)端啟動的過程中觸發(fā),另一個是在新連接接入的時候。下面以服務(wù)端啟動的過程為例子,進(jìn)行分析。在服務(wù)端啟動過程中,會執(zhí)行如下一行代碼。
public ChannelFuture register(Channel channel) {return next().register(channel); }next() 就是 chooser 的一個方法,chooser 有兩種不同的實(shí)現(xiàn):PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,這兩種不同的實(shí)現(xiàn)對next() 方法有不同的實(shí)現(xiàn)邏輯,區(qū)別就是:是用位運(yùn)算從 children 數(shù)組中取出一個 NioEventLoop,還是通過取模的方式從 children 數(shù)組中取出一個 NioEventLoop,但是最終都是返回一個 NioEventLoop。
所以這兒實(shí)際上是執(zhí)行NioEventLoop 的 register(channel)方法,這個方法一直向下執(zhí)行,最終會執(zhí)行到如下代碼:
eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);} });在這兒會調(diào)用 NioEventLoop 的 execute()方法,NioEventLoop 繼承了 SingleThreadEventExecutor,execute(task)定義在 SingleThreadEventExecutor 類中,刪減后的源碼如下。
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判斷當(dāng)前線程是否和NioEventLoop中的線程是否相等,返回true表示相等boolean inEventLoop = inEventLoop();// 將任務(wù)加入線程隊(duì)列addTask(task);if (!inEventLoop) {// 啟動線程startThread();// 省略部分代碼...}// 省略部分代碼 }可以看到,先判斷當(dāng)前線程是否和 NioEventLoop 中的線程是否相等,此時由于線程是 main 線程,inEventLoop() 會返回 false,所以會進(jìn)入到 if 邏輯塊中,并調(diào)用startThread() 方法來啟動的線程。
private void startThread() {// 處于為啟動狀態(tài),才會去嘗試啟動線程if (state == ST_NOT_STARTED) {// 嘗試將ST_NOT_STARTED設(shè)置為ST_STARTEDif (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {// 如果執(zhí)行doStartThread()出現(xiàn)異常 將STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED回滾if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}} }在doStart() 中,會先判斷 NioEventLoop 是否處于未啟動狀態(tài),只有處于未啟動狀態(tài)才會去嘗試啟動線程。在啟動線程之前,會先利用 CAS 方法,將狀態(tài)標(biāo)識為啟動狀態(tài),CAS 成功后,然后再調(diào)用doStartThread() 方法。doStartThread() 方法精簡后的源碼如下。
private void doStartThread() {assert thread == null;//真正的啟動線程executor.execute(new Runnable() {@Overridepublic void run() {// 將此線程保存起來thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 啟動NioEventLoopSingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 省略部分代碼....}}}); }可以發(fā)現(xiàn),在doStartThread() 方法中,調(diào)用的 executor 屬性的execute() 方法,注意,此時 executor 屬性值是什么?在創(chuàng)建 NioEventLoop 時,創(chuàng)建了一個ThreadPerTaskExecutor類型的對象,然后再通過ThreadExecutorMap將其包裝成了一個匿名類,最后將這個匿名類賦值給了 executor 屬性。所以此時會調(diào)用匿名類的execute(Runnable task) 方法,而這個匿名類最最終還是調(diào)用的是ThreadPerTaskExecutor的execute(Runnable task) 方法。在前面已經(jīng)簡單分析了ThreadPerTaskExecutor的execute(Runnable task) 方法,現(xiàn)在為了方便閱讀,再次貼出這部分代碼。
public void execute(Runnable command) {// threadFactory就是前面創(chuàng)建的DefaultThreadFactory// 通過線程工廠的newThread()方法來創(chuàng)建一個線程,并啟動線程threadFactory.newThread(command).start(); }可以看到,該execute() 方法,就是調(diào)用線程工廠的newThread(command) 方法來創(chuàng)建一個線程,然后調(diào)用線程的start() 的方法啟動線程。當(dāng)線程啟動后,就會回調(diào)傳入的 Runnable 任務(wù)的 run()方法,所以接著會回調(diào)到doStartThread() 方法中傳入的 Runnable 的 run()方法。從doStartThread() 的源碼中可以看到,在 run()方法中先將創(chuàng)建出來的線程保存了起來,然后會調(diào)用 SingleThreadEventExecutor.this.run() 。這一行代碼就是啟動 NioEventLoop 線程,該方法的源碼很長,整個 NioEventLoop 的核心都在這個方法上,它實(shí)際上就是在一個無限 for 循環(huán)中,不停的去處理事件和任務(wù)。關(guān)于這個方法的源碼會在下一篇文章詳細(xì)分析。
至此,NioEventLoop 中的 Thread 線程已經(jīng)啟動了,同時會連帶著 NioEventLoop 不停的在無限 for 循環(huán)中執(zhí)行,也就是 NioEventLoop 啟動起來了。
總結(jié)
- 本文以new NioEventLoopGroup() 為切入點(diǎn),通過分析NioEventLoopGroup的源碼,從而分析了NioEventLoop的創(chuàng)建過程,同時還介紹了 Netty 對 NIO 的優(yōu)化。接著以服務(wù)端 channel 啟動的流程為入口,分析了 NioEventLoop 是如何啟動的。
- 默認(rèn)情況下,netty 會創(chuàng)建 2 倍 CPU 核數(shù)數(shù)量的NioEventLoop線程,如果顯示指定了數(shù)量,則創(chuàng)建指定數(shù)量的NioEventLoop。
- 最后回答下文章開頭的兩個問題。
- 第一個問題:Netty 中的線程是何時啟動的?啟動時機(jī)有兩個,一個是在服務(wù)端啟動的過程中觸發(fā),另一個是在新連接接入的時候,但是最終都是調(diào)用ThreadPerTaskExecutor類的execute(Runnable command) 方法,通過線程工廠來創(chuàng)建一個線程,然后調(diào)用線程的start() 方法啟動線程,當(dāng)線程啟動后,又會回調(diào)傳入的 Runnable 任務(wù)的 run()方法,在任務(wù)的 run()方法中通過調(diào)用SingleThreadEventExecutor.this.run() 來調(diào)用 NioEventLoop 的 run()方法,這樣就啟動了 NioEventLoop。
- 第二個問題:Netty 中的線程是如何實(shí)現(xiàn)串行無鎖化的?從源碼中我們可以知道,每個 NioEventLoop 中只包含一個線程,而每個 channel 只會綁定在一個 NioEventLoop 上,一但綁定上了,后面這個 channel 的所有 IO 操作都會交由這個 NioEventLoop 線程來處理,因此不會出現(xiàn)多個 NioEventLoop 線程來爭奪處理 channel 的情況,因此說在 NioEventLoop 上,所有的操作都是串行處理的,不存在鎖的競爭,即串行無鎖化。可能有人會問,串行處理任務(wù),豈不是降低了系統(tǒng)的吞吐量?顯然不是的,因?yàn)?netty 中有多個 NioEventLoop 線程,多個 NioEventLoop 同時串行處理,這樣服務(wù)既是多線程并行運(yùn)行,各個線程間又不存在鎖的競爭,大大提高了服務(wù)性能。
總結(jié)
以上是生活随笔為你收集整理的创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我精心珍藏的Python代码技巧
- 下一篇: 文件夹_【教程】创建透明文件夹(非隐藏文