超详细的线程池原理解析
說明
線程池作為常用的并發工具重要性不言而喻,本文針對線程池進行了抽絲剝繭般的深入解析,希望大家看后會有幫助。
1 ThreadPoolExecutor關系
2 結構
public ThreadPoolExecutor( int corePoolSize,//核心線程數量int maximumPoolSize,//最大線程數long keepAliveTime,//超時時間,默認超出核心線程數量以外的線程空余存活時間TimeUnit unit,//存活時間單位BlockingQueue<Runnable> workQueue,//保存執行任務的隊列ThreadFactory threadFactory,//創建新線程使用的工廠RejectedExecutionHandler handler//當任務無法執行的時候的處理方式) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }其實在線程池初始化時是不創建線程的,當執行任務會創建核心線程,當任務執行完后,核心線程會掛起等待,當再次有任務執行時,這些核心線程就會執行新的任務,從而實現了線程復用的效果。
3 線程池執行過程
3.1 執行流程圖
3.2 運行狀態
| RUNNING | 能夠處理新提交的任務,也能處理阻塞隊列中的任務 |
| SHUTDOWN | 不能處理新提交的任務,但能處理阻塞隊列中的任務 |
| STOP | 不能處理新提交任務,也不處理隊列中任務,會中斷正在處理任務的線程 |
| TIDYING | 所有任務都終止了,workerCount(有效線程為0) |
| TERMINATED | 在terminated()方法執行完后進入該狀態 |
線程池使用了一個ctl變量來維護運行狀態(runState)和worker數量 (workerCount)兩個值。高3位保存runState,低29位保存workerCount。
通過閱讀線程池源代碼也可以發現,經常出現要同時判斷線程池運行狀態和線程數量的情況。線程池也提供了若干方法去供用戶獲得線程池當前的運行狀態、線程個數。這里都使用的是位運算的方式,相比于基本運算,速度也會快很多。
// 1. `ctl`,可以看做一個int類型的數字,高3位表示線程池狀態,低29位表示worker數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));關于內部封裝的獲取生命周期狀態、獲取線程池線程數量的計算方法如以下代碼所示:
// `runStateOf()`,獲取線程池狀態,通過按位與操作,低29位將全部變成0 private static int runStateOf(int c) { return c & ~CAPACITY; } // `workerCountOf()`,獲取線程池worker數量,通過按位與操作,高3位將全部變成0 private static int workerCountOf(int c) { return c & CAPACITY; } // `ctlOf()`,根據線程池狀態和線程池worker數量,生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; }3.3 狀態的轉化
3.4 阻塞隊列成員
3.5 拒絕策略
任務拒絕模塊是線程池的保護部分,線程池有一個最大的容量,當線程池的任務緩存隊列已滿,并且線程池中的線程數目達到maximumPoolSize時,就需要拒絕掉該任務,采取任務拒絕策略,保護線程池。
拒絕策略是一個接口,其設計如下:
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }用戶可以通過實現這個接口去定制拒絕策略,也可以選擇JDK提供的四種已有拒絕策略
4 源碼解析
4.1 常用變量的解釋
// 1. `ctl`,可以看做一個int類型的數字,高3位表示線程池狀態,低29位表示worker數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 2. `COUNT_BITS`,`Integer.SIZE`為32,所以`COUNT_BITS`為29 private static final int COUNT_BITS = Integer.SIZE - 3; // 3. `CAPACITY`,線程池允許的最大線程數。1左移29位,然后減1,即為 2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits // 4. 線程池有5種狀態,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctl // 5. `runStateOf()`,獲取線程池狀態,通過按位與操作,低29位將全部變成0 private static int runStateOf(int c) { return c & ~CAPACITY; } // 6. `workerCountOf()`,獲取線程池worker數量,通過按位與操作,高3位將全部變成0 private static int workerCountOf(int c) { return c & CAPACITY; } // 7. `ctlOf()`,根據線程池狀態和線程池worker數量,生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/ // 8. `runStateLessThan()`,線程池狀態小于xx private static boolean runStateLessThan(int c, int s) {return c < s; } // 9. `runStateAtLeast()`,線程池狀態大于等于xx private static boolean runStateAtLeast(int c, int s) {return c >= s; }4.2 提交任務的過程
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// worker數量比核心線程數小,直接創建worker執行任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// worker數量超過核心線程數,但任務隊列未滿,任務直接進入隊列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 線程池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作。// 這兒為什么需要recheck,是因為任務入隊列前后,線程池的狀態可能會發生變化。if (! isRunning(recheck) && remove(command))//如果線程池處于非運行狀態,并且把當前的任務從任務隊列中移除成功,則拒絕該任務reject(command);// 這兒為什么需要判斷0值,主要是在線程池構造方法中,核心線程數允許為0// 如果之前的線程已被銷毀完,新建一個線程else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果線程池不是運行狀態,或者任務進入隊列失敗,則嘗試創建worker執行任務。// 這兒有3點需要注意:// 1. 線程池不是運行狀態時,addWorker內部會判斷線程池狀態// 2. addWorker第2個參數表示是否創建核心線程// 3. addWorker返回false,則說明任務執行失敗,需要執行reject操作else if (!addWorker(command, false))reject(command); }4.3 addworker解析
private boolean addWorker(Runnable firstTask, boolean core) {//goto 語句,避免死循環retry:// 外層自旋for (;;) {int c = ctl.get();int rs = runStateOf(c);// 這個條件寫得比較難懂,我對其進行了調整,和下面的條件等價// (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty())// 1. 線程池狀態大于SHUTDOWN時,直接返回false// 2. 線程池狀態等于SHUTDOWN,且firstTask不為null,直接返回false// 3. 線程池狀態等于SHUTDOWN,且隊列為空,直接返回false//通俗的解釋//(1). 線程池已經 shutdown 后,還要添加新的任務,拒絕//(2).(第二個判斷)SHUTDOWN 狀態不接受新任務,但仍然會執行已經加入任務隊列的任//務,所以當進入SHUTDOWN 狀態,而傳進來的任務為空,并且任務隊列不為空的時候,//是允許添加新線程的,如果把這個條件取反,就表示不允許添加 workerif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 內層自旋for (;;) {//獲得 Worker 的工作線程數int wc = workerCountOf(c);// worker數量超過容量,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 使用CAS的方式增加worker數量。如果 cas 失敗,則直接重試// 若增加成功,則直接跳出外層循環進入到第二部分if (compareAndIncrementWorkerCount(c))break retry;//再次獲取 ctl 的值c = ctl.get();// //這里如果不想等,說明線程的狀態發生了變化,繼續重試if (runStateOf(c) != rs)continue retry;// 其他情況,直接內層循環進行自旋即可} }//上面這段代碼主要是對 worker 數量做原子+1 操作,下面的邏輯才是正式構建一個 worker//工作線程是否啟動的標識boolean workerStarted = false;//工作線程是否已經添加成功的標識boolean workerAdded = false;Worker w = null;try {//構建一個 Worker,這個 worker 是什么呢?我們可以看到構造方法里面傳入了一個 Runnable 對象w = new Worker(firstTask);//從 worker 對象中取出線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// worker的添加必須是串行的,因此需要加鎖mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 這兒需要重新檢查線程池狀態int rs = runStateOf(ctl.get());//只有當前線程池是正在運行狀態,[或是 SHUTDOWN 且 firstTask 為空],才能添加到 workers集合中if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// worker已經調用過了start()方法,則不再創建workerif (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// worker創建并添加到workers成功workers.add(w);// 更新`largestPoolSize`變量int s = workers.size();//如果集合中的工作線程數大于最大線程數,這個最大線程數表示線程池曾經出現過的最大線程數if (s > largestPoolSize)//更新線程池出現過的最大線程數largestPoolSize = s;//表示工作線程創建成功了workerAdded = true;}} finally {//釋放鎖mainLock.unlock();}//如果 worker 添加成功if (workerAdded) {//啟動worker線程t.start();workerStarted = true;}}} finally {//如果添加失敗,就需要做一件事,就是遞減實際工作線程數(還記得我們最開始的時候增加了工作線程數嗎)if (! workerStarted)addWorkerFailed(w);}return workerStarted; }4.4 addWorkerFailed的解析
此方法是在添加Worker和啟動失敗而做的處理工作。主要做了這幾個邏輯:
4.5 Work類的說明
可以看到在addWorker方法種,最核心的是構建了一個Worker類,然后將任務firstTask放入到Work中。添加成功的話則執行Worker的run方法
Worker類實現了繼承了AbstractQueuedSynchronizer(AQS)類,實現了Runnable接口。
其中有兩個成員屬性尤為重要。
這兩個屬性是在通過Worker的構造方法中傳入的
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }firstTask用它來保存傳入的任務; thread是在調用構造方法時通過 ThreadFactory 來創建的
線程,是用來處理任務的線程。
在調用構造方法時,需要傳入任務,這里通過 getThreadFactory().newThread(this);來新建
一個線程,newThread方法傳入的參數是 this,因為Worker本身繼承了Runnable接口,
也就是一個線程,所以在之前的addWorker方法中如果work添加到workers結合成功的話就會調用Worke類中的run方法。
Worker 繼承了 AQS,使用 AQS 來實現獨占鎖的功能。為什么不使用 ReentrantLock 來實
現呢?可以看到 tryAcquire 方法,它是不允許重入的,而 ReentrantLock 是允許重入的:
lock 方法一旦獲取了獨占鎖,表示當前線程正在執行任務中; 那么它會有以下幾個作用
4.6 Worker類的解析
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {//真正執行task任務的線程,可知是在構造函數中由ThreadFactory創建的final Thread thread;//需要執行的任務taskRunnable firstTask;//完成的任務數,用于線程池統計volatile long completedTasks;Worker(Runnable firstTask) {//初始狀態 -1,防止在調用 runWorker(),也就是真正執行 task前中斷 thread。setState(-1); //提交的任務this.firstTask = firstTask;// 這兒是Worker的關鍵所在,使用了線程工廠創建了一個線程。傳入的參數為當前workerthis.thread = getThreadFactory().newThread(this);}//執行任務public void run() {runWorker(this);}// 省略... }4.7 runWorker方法解析
由以上可知Worker類中的run方法,也就是runWorker方法,是實現執行任務的真正邏輯。主要做了這幾種邏輯。
4.8 getTask方法解析
每個worker都會執行getTask從阻塞隊列中拿取任務,這是一個典型的消費者的角色。根據線程池中的線程數來判斷是拿取任務方法是poll有超時,還是take一直阻塞,從而實現核心線程的復用和最大線程的回收。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//進行自循環for (;;) {int c = ctl.get();int rs = runStateOf(c);//對線程池狀態的判斷,兩種情況會 workerCount-1,并且返回 null//1. 線程池狀態為 shutdown,且 workQueue 為空(反映了 shutdown 狀態的線程池還是//要執行 workQueue 中剩余的任務的)//2. 線程池狀態為 stop(shutdownNow()會導致變成 STOP)(此時不用考慮 workQueue的情況)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//將工作線程數減1decrementWorkerCount();//返回null,此線程就會結束return null;}//獲取工作線程數 int wc = workerCountOf(c);//timed變量的含義是,//如果我們自己設置了allowCoreThreadTimeOut為true(默認false),//或者工作線程池大于了核心線程數.//那么就會進行超時的控制boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//線程數量超過 maximumPoolSize 可能是線程池在運行時被調用了 setMaximumPoolSize()//被改變了大小,否則已經 addWorker()成功不會超過 maximumPoolSize//timed && timedOut 如果為 true,表示當前操作需要進行超時控制,并且上次從阻塞隊列中//獲取任務發生了超時.其實就是體現了空閑線程的存活時間if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())){if (compareAndDecrementWorkerCount(c))return null;continue;}try {//根據 timed 來判斷,如果為 true,則通過阻塞隊列 poll 方法進行超時控制,如果在//keepaliveTime 時間內沒有獲取到任務,則返回 null.//如果為 false,則通過 take 方法阻塞式獲取隊列中的任務Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//如果任務存在不為空,則返回給上一個addWorker方法進行執行。if (r != null)return r;//如果為true,說明超時了也沒去到任務,在下次循環時就會返回null結束。timedOut = true;} catch (InterruptedException retry) {// 如果獲取任務時當前線程發生了中斷,則設置 timedOut 為false 并返回循環重試timedOut = false;}} }由上述分析可知此方法的核心邏輯在于控制線程池的線程數量。
在執行 execute 方法時,如果當前線程池的線程數量超過了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已滿時,則可以增加工作線程,但這時如果超時沒有獲取到任務,也就是 timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前線程池中不需要那么多線程來執行任務了,可以把多于 corePoolSize 數量的線程銷毀掉,保持線程數量在 corePoolSize 即可。
什么時候會銷毀?當然是 runWorker 方法執行完之后,也就是 Worker 中的 run 方法執行
完,由 JVM 自動回收。
getTask 方法返回 null 時,在 runWorker 方法中會跳出 while 循環,然后會執行
processWorkerExit 方法。
processWorkerExit方法解析
在runWorker方法中,while中的task或者getTask方法 為null后會執行processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {//completedAbruptly為true,說明task任務執行時也就是run方法發生了異常,則需要將工作線程數減1。//那么如果task任務正常執行的呢,那么在getTask方法中已經做了減1操作了。if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//將worker中的任務完成數量匯總到線程池中的完成任務數量completedTaskCount += w.completedTasks;//將Set集合移除此workerworkers.remove(w);} finally {mainLock.unlock();}//嘗試終止線程池,主要判斷線程池是否滿足終止狀態條件,如果滿足但還有線程,嘗試進行中斷。//沒有線程的話 tidying狀態改為terminated狀態tryTerminate();int c = ctl.get();//如果狀態是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個workerif (runStateLessThan(c, STOP)) {//不是突然完成的,即沒有task任務可以獲取而完成的,計算min,并根據當前worker數量判斷是否需要addWorker()if (!completedAbruptly) {//allowCoreThreadTimeOut默認為false,即min默認為corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min為0,即不需要維持核心線程數量,且workQueue不為空,至少保持一個線程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果線程數量大于最少數量,直接返回,否則下面至少要addWorker一個if (workerCountOf(c) >= min)return; // replacement not needed}//添加一個沒有firstTask的worker//只要worker是completedAbruptly突然終止的,或者線程數量小于要維護的數量,//就新添一個worker線程,即使是shutdown狀態addWorker(null, false);} }流程:
worker數量-1
A、如果是突然終止,說明是task執行時異常情況導致,即run()方法執行時發生了異常,那么正在工作的worker線程數量需要-1
B、如果不是突然終止,說明是worker線程沒有task可執行了,不用-1,因為已經在getTask()方法中-1了
從Workers Set中移除worker,刪除時需要上鎖mainlock
tryTerminate():在對線程池有負效益的操作時,都需要“嘗試終止”線程池,大概邏輯:
判斷線程池是否滿足終止的狀態
A、如果狀態滿足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
B、沒有線程了,更新狀態為tidying->terminated
是否需要增加worker線程,如果線程池還沒有完全終止,仍需要保持一定數量的線程線程池狀態是running 或 shutdown
A、如果當前線程是突然終止的,addWorker()
B、如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker(),故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然后再逐漸銷毀這corePoolSize個線程
4.9 reject(Runnable command)解析
回到execute(Runnable command)方法,發現當隊列中的任務已滿,線程也超過最大線程數時,會執行異常策略也就是reject(Runnable command)方法
final void reject(Runnable command) {handler.rejectedExecution(command, this); }邏輯很簡單,執行RejectedExecutionHandler接口中的rejectedExecution方法。詳細拒絕策略已在上文做介紹。
5 Executors提供的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); }public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); }以上是Executors提供的常用的幾種封裝好的線程池,這幾種需要注意
所以這幾種在使用時,要注意。 阿里開發手冊不建議使用這幾種線程池,最好是自己定制。
參考資料
- https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
總結
以上是生活随笔為你收集整理的超详细的线程池原理解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 智能家居中物联网技术的应用
- 下一篇: 农业物联网的六大应用场景