每日一博 - Review线程池
文章目錄
- Pre
- 核心設計與實現
- 運行機制
- 線程池的生命周期
- ctl 解讀
- ctl的相關方法
- 線程池的狀態
- 任務執行機制
- 任務調度
- 任務緩沖
- 任務申請
- 任務拒絕
- Worker線程管理
- Worker線程
- 增加Worker線程
- 回收Worker線程
- Worker線程執行任務
Pre
線程池(Thread Pool)是一種基于池化思想管理線程的工具.
使用線程池可以帶來一系列好處:
- 降低資源消耗
- 提高響應速度
- 提高線程的可管理性
- …
線程池解決的核心問題就是資源管理問題。
在并發環境下,業務的不確定性,導致資源的依賴不確定。這種不確定性將帶來以下若干問題:
- 頻繁申請/銷毀資源和調度資源,將帶來額外的消耗,可能會非常巨大
- 對資源無限申請缺少抑制手段,易引發系統資源耗盡的風險
- 系統無法合理管理內部的資源分布,會降低系統的穩定性
為解決資源分配這個問題,線程池采用了“池化”(Pooling)思想。
核心設計與實現
在Java中的體現是ThreadPoolExecutor類 。 這里我們主要回顧 J.U.C提供的線程池 ThreadPoolExecutor類 。
ThreadPoolExecutor實現的頂層接口是Executor 。
- 頂層接口Executor提供了一種思想:將任務提交和任務執行進行解耦。用戶無需關注如何創建線程,如何調度線程來執行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執行器(Executor)中,由Executor框架完成線程的調配和任務的執行部分
- ExecutorService接口增加了一些能力:比如擴充執行任務的能力,補充可以為一個或一批異步任務生成Future的方法;提供了管控線程池的方法,比如停止線程池的運行等等。
- AbstractExecutorService則是上層的抽象類,將執行任務的流程串聯了起來,保證下層的實現只需關注一個執行任務的方法即可
- 最下層的實現類ThreadPoolExecutor實現最復雜的運行部分,ThreadPoolExecutor將會一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合從而執行并行任務
運行機制
運行機制如下圖所示
-
線程池在內部實際上構建了一個生產者消費者模型,將線程和任務兩者解耦,并不直接關聯,從而良好的緩沖任務,復用線程。
-
線程池的運行主要分成兩部分:任務管理、線程管理。
任務管理部分充當生產者的角色,當任務提交后,線程池會判斷該任務后續的流轉:(1)直接申請線程執行該任務;(2)緩沖到隊列中等待線程執行;(3)拒絕該任務。
線程管理部分是消費者,它們被統一維護在線程池內,根據任務請求進行線程的分配,當線程執行完任務后則會繼續獲取新的任務去執行,最終當線程獲取不到任務的時候,線程就會被回收。
線程池的生命周期
線程池運行的狀態,隨著線程池的運行,由內部來維護。
線程池內部使用一個變量維護兩個值:
- 運行狀態(runState)
- 線程數量 (workerCount)
線程池將運行狀態(runState)、線程數量 (workerCount)兩個關鍵參數的維護通過ctl這個參數放在了一起
ctl 解讀
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));AtomicInteger 類型, 該變量是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它同時包含兩部分的信息:線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount) .
高3位保存runState,低29位保存workerCount,兩個變量之間互不干擾
用一個變量去存儲兩個值,可避免在做相關決策時,出現不一致的情況,不必為了維護兩者的一致,而占用鎖資源。
線程池也提供了若干方法去供用戶獲得線程池當前的運行狀態、線程個數。這里都使用的是位運算的方式,相比于基本運算,速度也會快很多。
ctl的相關方法
// 獲取運行狀態; private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取活動線程數; private static int workerCountOf(int c) { return c & CAPACITY; } // 獲取運行狀態和活動線程數的值。 private static int ctlOf(int rs, int wc) { return rs | wc; }線程池的狀態
線程池存在5種狀態
1、RUNNING
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(02) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,并且線程池中的任務數為0!
2、 SHUTDOWN
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。
(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。
進入TERMINATED的條件如下:
- 線程池不是RUNNING狀態;
- 線程池狀態不是TIDYING狀態或TERMINATED狀態;
- 如果線程池狀態是SHUTDOWN并且workerQueue為空;
- workerCount為0;
- 設置TIDYING狀態成功。
任務執行機制
任務調度
任務調度是線程池的主要入口,當用戶提交了一個任務,接下來這個任務將如何執行都是由這個階段決定的。這就是線程池的核心運行機制。
首先,所有任務的調度都是由execute方法完成的,這部分完成的工作是:檢查現在線程池的運行狀態、運行線程數、運行策略,決定接下來執行的流程,是直接申請線程執行,或是緩沖到隊列中執行,亦或是直接拒絕該任務。
首先檢測線程池運行狀態,如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態下執行任務。
如果workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任務。
如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新提交的任務。
如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
流程圖如下
任務緩沖
任務緩沖模塊是線程池能夠管理任務的核心部分。
線程池的本質是對任務和線程的管理,而做到這一點關鍵在于將任務和線程兩者解耦,不讓兩者直接關聯,才可以做后續的分配工作。
線程池中是以生產者消費者模式,通過一個阻塞隊列來實現的。阻塞隊列緩存任務,工作線程從阻塞隊列中獲取任務。
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。
- 在隊列為空時,獲取元素的線程會等待隊列變為非空。
- 當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
【線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素】
使用不同的隊列可以實現不一樣的任務存取策略。
任務申請
任務的執行有兩種可能:
-
一種是任務直接由新創建的線程執行。
-
另一種是線程從任務隊列中獲取任務然后執行,執行完任務的空閑線程會再次去從隊列中申請任務再去執行。
第一種情況僅出現在線程初始創建的時候,第二種是線程獲取任務絕大多數的情況。
線程需要從任務緩存模塊中不斷地取任務執行,幫助線程從阻塞隊列中獲取任務,實現線程管理模塊和任務管理模塊之間的通信。這部分策略由getTask方法實現。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}getTask 進行了多次判斷,為的是控制線程的數量,使其符合線程池的狀態。如果線程池現在不應該持有那么多線程,則會返回null值。工作線程Worker會不斷接收新任務去執行,而當工作線程Worker接收不到任務的時候,就會開始被回收。
任務拒絕
任務拒絕模塊是線程池的保護部分,線程池有一個最大的容量,當線程池的任務緩存隊列已滿,并且線程池中的線程數目達到maximumPoolSize時,就需要拒絕掉該任務,采取任務拒絕策略,保護線程池。
可以通過實現這個接口去定制拒絕策略,也可以選擇JDK提供的四種已有拒絕策略
Worker線程管理
Worker線程
線程池為了掌握線程的狀態并維護線程的生命周期,設計了線程池內的工作線程Worker
Worker這個工作線程,實現了Runnable接口,并持有一個線程thread,一個初始化的任務firstTask。
thread是在調用構造方法時通過ThreadFactory來創建的線程,可以用來執行任務;
firstTask用它來保存傳入的第一個任務,這個任務可以有也可以為null。
如果這個值是非空的,那么線程就會在啟動初期立即執行這個任務,也就對應核心線程創建時的情況;如果這個值是null,那么就需要創建一個線程去執行任務列表(workQueue)中的任務,也就是非核心線程的創建。
線程池需要管理線程的生命周期,需要在線程長時間不運行的時候進行回收。線程池使用一張Hash表去持有線程的引用,這樣可以通過添加引用、移除引用這樣的操作來控制線程的生命周期。這個時候重要的就是如何判斷線程是否在運行。
Worker是通過繼承AQS,使用AQS來實現獨占鎖這個功能。沒有使用可重入鎖ReentrantLock,而是使用AQS,為的就是實現不可重入的特性去反應線程現在的執行狀態。
- 1.lock方法一旦獲取了獨占鎖,表示當前線程正在執行任務中。
- 2.如果正在執行任務,則不應該中斷線程。
- 3.如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該線程進行中斷。
- 4.線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;如果線程是空閑狀態則可以安全回收。
在線程回收過程中就使用到了這種特性,回收過程如下圖所示:
增加Worker線程
增加線程是通過線程池中的addWorker方法,該方法的功能就是增加一個線程,該方法不考慮線程池是在哪個階段增加的該線程,這個分配線程的策略是在上個步驟完成的,該步驟僅僅完成增加線程,并使它運行,最后返回是否成功這個結果。
addWorker方法有兩個參數:firstTask、core。
- firstTask參數用于指定新增的線程執行的第一個任務,該參數可以為空;
- core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize 。
其執行流程如下圖所示:
回收Worker線程
線程池中線程的銷毀依賴JVM自動的回收,線程池做的工作是根據當前線程池的狀態維護一定數量的線程引用,防止這部分線程被JVM回收,當線程池決定哪些線程需要回收時,只需要將其引用消除即可。
Worker被創建出來后,就會不斷地進行輪詢,然后獲取任務去執行,核心線程可以無限等待獲取任務,非核心線程要限時獲取任務。
當Worker無法獲取到任務,也就是獲取的任務為空時,循環會結束,Worker會主動消除自身在線程池內的引用。
線程回收的工作是在processWorkerExit方法完成的。
事實上,在這個方法中,將線程引用移出線程池就已經結束了線程銷毀的部分。但由于引起線程銷毀的可能性有很多,線程池還要判斷是什么引發了這次銷毀,是否要改變線程池的現階段狀態,是否要根據新狀態,重新分配線程。
Worker線程執行任務
在Worker類中的run方法調用了runWorker方法來執行任務,runWorker方法的執行過程如下:
- 1.while循環不斷地通過getTask()方法獲取任務。
- 2.getTask()方法從阻塞隊列中取任務。
- 3.如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是中斷狀態。
- 4.執行任務。
- 5.如果getTask結果為null則跳出循環,執行processWorkerExit()方法,銷毀線程。
參考: Java線程池實現原理及其在美團業務中的實踐
總結
以上是生活随笔為你收集整理的每日一博 - Review线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每日一博 - ThreadLocal V
- 下一篇: 每日一博 - Review线程池_02