日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

腾讯面试题Java 并发包之线程池综述

發(fā)布時間:2025/3/21 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 腾讯面试题Java 并发包之线程池综述 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Java 并發(fā)包之線程池綜述
■ 線程池的創(chuàng)建

在Java中,您可以通過調(diào)整-Xss參數(shù)來調(diào)節(jié)每個線程棧的大小(64bit系統(tǒng)默認1024KB),當減小該值時意味著可以創(chuàng)建更多的線程數(shù),但問題是JVM資源是有限的,線程不能無限創(chuàng)建!從筆者開發(fā)經(jīng)驗來看,線程池應該是并發(fā)包中使用頻率和運用場景最多的并發(fā)框架,幾乎所有并發(fā)/異步執(zhí)行任務的需求都需要用到線程池,線程復用,以內(nèi)部線程池的形式對外提供管理任務執(zhí)行,線程調(diào)度,線程池管理等等服務。合理的使用線程池可以帶來如下三個好處:1.降低資源消耗:通過重用已創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的消耗2.提高響應速度:任務到達時不需要等待線程創(chuàng)建就可以立即執(zhí)行3.提高線程的可管理性:線程池可以統(tǒng)一管理、分配、調(diào)優(yōu)和監(jiān)控

■ ThreadPoolExecutor —— 線程池最核心的類

- 類定義: 實現(xiàn)了 AbstractExecutorService 類,ExecutorService,Executor 接口
  - 構(gòu)造器:通過觀察每個構(gòu)造器的源碼具體實現(xiàn),發(fā)現(xiàn)前面三個構(gòu)造器都是調(diào)用的第四個構(gòu)造器進行的初始化工作

public class ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService,Executor { /*** 線程工廠默認為DefaultThreadFactory* 飽和策略默認為AbortPolicy*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); } /*** 線程工廠可配置* 飽和策略默認為AbortPolicy*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } /*** 線程工廠默認為DefaultThreadFactory* 飽和策略可配置*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } /*** 線程工廠可配置* 飽和策略可配置*/ public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null : AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }
  • 重要變量
//線程池控制器 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //任務隊列 private final BlockingQueue<Runnable> workQueue; //全局鎖 private final ReentrantLock mainLock = new ReentrantLock(); //工作線程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //終止條件 - 用于等待任務完成后才終止線程池 private final Condition termination = mainLock.newCondition(); //曾創(chuàng)建過的最大線程數(shù) private int largestPoolSize; //線程池已完成總?cè)蝿諗?shù) private long completedTaskCount; //工作線程創(chuàng)建工廠 private volatile ThreadFactory threadFactory; //飽和拒絕策略執(zhí)行器 private volatile RejectedExecutionHandler handler; //工作線程活動保持時間(超時后會被回收) - 納秒 private volatile long keepAliveTime; /*** 允許核心工作線程響應超時回收* false:核心工作線程即使空閑超時依舊存活* true:核心工作線程一旦超過keepAliveTime仍然空閑就被回收*/ private volatile boolean allowCoreThreadTimeOut; //核心工作線程數(shù) private volatile int corePoolSize; //最大工作線程數(shù) private volatile int maximumPoolSize; //默認飽和策略執(zhí)行器 - AbortPolicy -> 直接拋出異常 private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();

■ ThreadPoolExecutor 的使用

- 創(chuàng)建線城池實際上就是實例化一個線程池對象,這里我們使用最完整的構(gòu)造器來描述最完整的創(chuàng)建過程:1. corePoolSize(核心工作線程數(shù)):無任務時,線程池允許(維護)的最小空閑線程池數(shù);當一個任務被提交到線程池就新建一個工作線程來執(zhí)行任務(即使此時有空閑的核心工作線程)直到(實際工作線程數(shù) >= 核心工作線程數(shù))為止;調(diào)用 prestartAllCoreThreads()方法會提前創(chuàng)建并啟動所有核心工作線程2. maximumPoolSize(最大工作線程數(shù)):線程池允許創(chuàng)建的最大工作線程數(shù);當(隊列已滿 && 實際工作線程數(shù) < 最大工作線程數(shù))時,線程池會創(chuàng)建新的工作線程(即使此時仍有空閑的工作線程)執(zhí)行任務直到最大工作線程數(shù)為止;設(shè)置無界隊列時該參數(shù)其實無效3. keepAliveTime(工作線程最大空閑時間):單位納秒,滿足超時條件且空閑的工作線程會被回收;超時的非核心工作線程會被回收,核心工作線程不會被回收;當allowCoreThreadTimeOut=true 時,則超時的核心工作線程也會被回收;若該值沒有設(shè)置則線程會永遠存活;建議當場景為任務短而多時,可以調(diào)高時間以提高線程利用率4. unit(線程活動保持時間單位): 線程活動保持時間單位,可選的包括NANOSECONDS納秒、MICROSECONDS微秒、MILLISECONDS毫秒、SECONDS秒、MINUTES分、HOURS時、DAYS天5. workQueue(任務隊列): 用來保存等待執(zhí)行的任務的阻塞隊列;當 (實際工作線程數(shù) >= 核心工作線程數(shù)) && (任務數(shù) < 任務隊列長度)時,任務會offer()入隊等待;關(guān)于任務隊列詳見下文的任務隊列與排隊策略6. threadFactory(線程創(chuàng)建工廠): 顧名思義,就是用于創(chuàng)建線程的工廠,允許自定義創(chuàng)建工廠,可以線程進行初始化配置,比如名字、守護線程、異常處理等等7. handler(飽和策略執(zhí)行器): 當線程池和隊列都已滿,此時說明線程已無力再接收更多的任務,即任務數(shù)飽和,沒法接單了;此時需要使用一種飽和策略處理新提交的任務,默認是Abort(直拋Reject異常),還包括Discard(LIFO規(guī)則丟棄)、DiscardOldest(LRU規(guī)則丟棄) 以及 CallerRuns(調(diào)用者線程執(zhí)行),允許自定義執(zhí)行器從上面給出的 ThreadPoolExecutor 類的代碼可以知道,ThreadPoolExecutor 繼承了 AbstractExecutorService,我們來看一下 AbstractExecutorService 的實現(xiàn): public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };public Future<?> submit(Runnable task) {};public <T> Future<T> submit(Runnable task, T result) { };public <T> Future<T> submit(Callable<T> task) { };private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {}; } AbstractExecutorService 是一個抽象類,它實現(xiàn)了ExecutorService 接口: public interface ExecutorService extends Executor {void shutdown();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; } 而ExecutorService 又是繼承了Executor 頂層接口: 而ExecutorService 又是繼承了Executor 頂層接口:```javascript public interface Executor {void execute(Runnable command); }
  • 提交、執(zhí)行和關(guān)閉任務 (重要方法)

1. execute(): 適用于提交無須返回值的任務
   - 該方法是無法判斷任務是否被線程池執(zhí)行成功

2. submit(): 適用于提交需要返回值的任務
   -可以通過返回的Future對象得知任務是否已經(jīng)執(zhí)行成功

-get() 方法會阻塞當前線程直到任務完成,但要注意防范無限阻塞!!!

-使用 get(long timeout,TimeUnit unit) 方法會阻塞當前線程直到任務完成或超時,不會有無限阻塞的發(fā)生但需要注意超時后任務可能還沒完成!!!

3. shutdown() : 有序地關(guān)閉線程池,已提交的任務會被執(zhí)行(包含正在執(zhí)行和任務隊列中的),但會拒絕新任務shutdownNow(): 立即(嘗試)停止執(zhí)行所有任務(包含正在執(zhí)行和任務隊列中的),并返回待執(zhí)行任務列表

■ ThreadPoolExecutor 實現(xiàn)原理

  • 線程池的狀態(tài)

線程狀態(tài)的流轉(zhuǎn)遵循如下順序,即由小到大順序排列:
  RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

* 補充:數(shù)值的變遷感覺就好比我們的年齡,越大離上帝就越近 = =

//線程池狀態(tài)控制器,用于保證線程池狀態(tài)和工作線程數(shù) ps:低29位為工作線程數(shù)量,高3位為線程池狀態(tài) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //設(shè)定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; //確定最大的容量2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //獲取線程池狀態(tài),取高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取工作線程數(shù)量,取低29位 private static int workerCountOf(int c) { return c & CAPACITY; } /** * 獲取線程池狀態(tài)控制器* @param rs 表示runState 線程池狀態(tài)* @param wc 表示workerCount 工作線程數(shù)量*/ private static int ctlOf(int rs, int wc) { return rs | wc; }

這里補充一點二進制運算符基礎(chǔ)知識方便忘卻的讀者理解一下:
   &:與運算符,同位都為1才為1,否則為0

|:或運算符,同位有一個為1即為1,否則為0

~:非運算符,0和1互換,即若是0變成1,1則變成0

^:異或運算符,同位相同則為0,不同則為1

- 工人生產(chǎn)(生產(chǎn)者與消費者模式)

之前每個變量的作用都已經(jīng)標明出來了,這里通過實例展示其作用:

/**
  假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。
  因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;

當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
  如果說新任務數(shù)目增長的速度遠遠大于工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
  然后就將任務也分配給這4個臨時工人做;

如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
  當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
**/

那么我們知道其實線程就相當于工人,所以我們來看下線程池的內(nèi)部類 Worker:

繼承AQS類: 實現(xiàn)簡單的不可重入互斥鎖,以提供便捷的鎖操作,目的用于處理中斷情況
實現(xiàn)Runnable接口: "投機取巧"的設(shè)計,主要是借用Runnable接口的統(tǒng)一寫法,好處是不用重新寫一個同功能接口
工作線程: Worker會通過thread變量綁定一個真正執(zhí)行任務的工作線程(一對一),初始化時就由線程工廠分配好,它會反復地獲取和執(zhí)行任務
任務: Worker每次都會將新任務賦值給firstTask變量,工作線程每次通過該變量處理新獲取到的任務(初始化時該值允許為null,有特殊作用,下文會詳述)

/**  Worker類封裝了 ( 鎖 + 線程 + 任務 ) 這三個部分,從而成為了一個多面手的存在*/ private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/** 實際上真正的工作線程 - 幕后大佬,但可能因線程工廠創(chuàng)建失敗而為null */final Thread thread;/** 待執(zhí)行任務,可能為null */Runnable firstTask;/** 該工作線程已完成的任務數(shù) -- 論KPI的重要性 */volatile long completedTasks; Worker(Runnable firstTask) {//設(shè)置鎖狀態(tài)為-1,目的是為了阻止在runWorker()之前被中斷setState(-1); /*** 新任務,任務來源有兩個:* 1.調(diào)用addWorker()方法新建線程時傳入的第一個任務* 2.調(diào)用runWorker()方法時內(nèi)部循環(huán)調(diào)用getTask() -- 這就是線程復用的具現(xiàn)*/this.firstTask = firstTask;/*** 創(chuàng)建一個新的線程 -> 這個是真正的工作線程* 注意Worker本身就是個Runnable對象* 因此newThread(this)中的this也是個Runnable對象*/this.thread = getThreadFactory().newThread(this);} }
  • 執(zhí)行任務
// An highlighted block /** * 工作線程運行 * runWorker方法內(nèi)部會通過輪詢的方式 * 不停地獲取任務和執(zhí)行任務直到線程被回收 */ public void run() {runWorker(this); } (重點) 這里簡單介紹一下線程在線程池執(zhí)行任務的工作流程:

1.工作線程開始執(zhí)行前,需先對worker加鎖,任務完成解鎖

2.任務執(zhí)行前后分別執(zhí)行beforeExecute()和afterExecute()方法

3.執(zhí)行中遇到異常會向外拋出,線程是否死亡取決于您對于異常的處理

4.每個任務執(zhí)行完后,當前工作線程任務完成數(shù)自增,同時會循環(huán)調(diào)用getTask()從任務隊列中反復獲取任務并執(zhí)行,無任務可執(zhí)行時線程會阻塞在該方法上

5.當工作線程因各種理由退出時,會執(zhí)行processWorkerExit()回收線程(核心是將該worker從workers集合中移除,注意之前worker已經(jīng)退出任務循環(huán),因此已經(jīng)不再做工了,從集合移除后就方便gc了)

- 鎖方法 // Lock methods // The value 0 represents the unlocked state. 0表示未鎖定 // The value 1 represents the locked state. 1表示已鎖定 protected boolean isHeldExclusively() {return getState() != 0; } protected boolean tryAcquire(int unused) {//鎖狀態(tài)非0即1,即不可重入//特殊情況:只有初始化時才為-1,目的是防止線程初始化階段被中斷if (compareAndSetState(0, 1)) {//當前線程占有鎖setExclusiveOwnerThread(Thread.currentThread());return true;}return false; } protected boolean tryRelease(int unused) {//釋放鎖setExclusiveOwnerThread(null);//狀態(tài)恢復成未鎖定狀態(tài)setState(0);return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()){try {t.interrupt();} catch (SecurityException ignore) {}} } - 動態(tài)控制 /*** 設(shè)置核心工作線程數(shù)* 1.若新值<當前值時,將調(diào)用interruptIdleWorkers()處理超出部分線程* 2.若新值>當前值時,新創(chuàng)建的線程(若有必要)直接會處理隊列中的任務*/ public void setCorePoolSize(int corePoolSize) /*** 設(shè)置是否響應核心工作線程超時處理* 1.設(shè)置false時,核心工作線程不會因為任務數(shù)不足(空閑)而被終止* 2.設(shè)置true時,核心工作線程和非核心工作線程待遇一樣,會因為超時而終止* 注意:為了禁止出現(xiàn)持續(xù)性的線程替換,當設(shè)置true時,超時時間必須>0* 注意:該方法通常應在線程池被使用之前調(diào)用*/ public void allowCoreThreadTimeOut(boolean value) /*** 設(shè)置最大工作線程數(shù)* 1.若新值<當前值時,將調(diào)用interruptIdleWorkers()處理超出部分線程* 注意:當新值>當前值時是無需做任何處理的,跟設(shè)置核心工作線程數(shù)不一樣*/ public void setMaximumPoolSize(int maximumPoolSize) /** * 設(shè)置超時時間,超時后工作線程將被終止* 注意:若實際工作線程數(shù)只剩一個,除非線程池被終止,否則無須響應超時*/ public void setKeepAliveTime(long time, TimeUnit unit)

■ 任務提交與執(zhí)行

- execute() - 提交任務 /*** 在未來的某個時刻執(zhí)行給定的任務* 這個任務由一個新線程執(zhí)行,或者用一個線程池中已經(jīng)存在的線程執(zhí)行* 如果任務無法被提交執(zhí)行,要么是因為這個Executor已經(jīng)被shutdown關(guān)閉* 要么是已經(jīng)達到其容量上限,任務會被當前的RejectedExecutionHandler處理*/ public void execute(Runnable command) {//新任務不允許為空,空則拋出NPEif (command == null)throw new NullPointerException();/*** 1.若實際工作線程數(shù) < 核心工作線程數(shù),會嘗試創(chuàng)建一個工作線程去執(zhí)行該* 任務,即該command會作為該線程的第一個任務,即第一個firstTask* * 2.若任務入隊成功,仍需要執(zhí)行雙重校驗,原因有兩點:* - 第一個是去確認是否需要新建一個工作線程,因為可能存在* 在上次檢查后已經(jīng)死亡died的工作線程* - 第二個是可能在進入該方法后線程池被關(guān)閉了,* 比如執(zhí)行shutdown()* 因此需要再次檢查state狀態(tài),并分別處理以上兩種情況:* - 若線程池中已無可用工作線程了,則需要新建一個工作線程 * - 若線程池已被關(guān)閉,則需要回滾入隊列(若有必要)* * 3.若任務入隊失敗(比如隊列已滿),則需要新建一個工作線程; * 若新建線程失敗,說明線程池已停止或者已飽和,必須執(zhí)行拒絕策略*/int c = ctl.get();/*** 情況一:當實際工作線程數(shù) < 核心工作線程數(shù)時* 執(zhí)行方案:會創(chuàng)建一個新的工作線程去執(zhí)行該任務* 注意:此時即使有其他空閑的工作線程也還是會新增工作線程,* 直到達到核心工作線程數(shù)為止*/if (workerCountOf(c) < corePoolSize) {/*** 新增工作線程,true表示要對比的是核心工作線程數(shù)* 一旦新增成功就開始執(zhí)行當前任務* 期間也會通過自旋獲取隊列任務進行執(zhí)行*/if (addWorker(command, true))return;/*** 需要重新獲取控制器狀態(tài),說明新增線程失敗* 線程失敗的原因可能有兩種:* - 1.線程池已被關(guān)閉,非RUNNING狀態(tài)的線程池是不允許接收新任務的* - 2.并發(fā)時,假如都通過了workerCountOf(c) < corePoolSize校驗,但其他線程* 可能會在addWorker先創(chuàng)建出線程,導致workerCountOf(c) >= corePoolSize,* 即實際工作線程數(shù) >= 核心工作線程數(shù),此時需要進入情況二*/c = ctl.get();}/*** 情況二:當實際工作線程數(shù)>=核心線程數(shù)時,新提交任務需要入隊* 執(zhí)行方案:一旦入隊成功,仍需要處理線程池狀態(tài)突變和工作線程死亡的情況*/if (isRunning(c) && workQueue.offer(command)) {//雙重校驗int recheck = ctl.get();/*** recheck的目的是為了防止線程池狀態(tài)的突變 - 即被關(guān)閉* 一旦線程池非RUNNING狀態(tài)時,除了從隊列中移除該任務(回滾)外* 還需要執(zhí)行任務拒絕策略處理新提交的任務*/if (!isRunning(recheck) && remove(command))//執(zhí)行任務拒絕策略reject(command);/*** 若線程池還是RUNNING狀態(tài) 或 隊列移除失敗(可能正好被一個工作線程拿到處理了)* 此時需要確保至少有一個工作線程還可以干活* 補充一句:之所有無須與核心工作線程數(shù)或最大線程數(shù)相比,而只是比較0的原因是* 只要保證有一個工作線程可以干活就行,它會自動去獲取任務*/else if (workerCountOf(recheck) == 0)/*** 若工作線程都已死亡,需要新增一個工作線程去干活* 死亡原因可能是線程超時或者異常等等復雜情況** 第一個參數(shù)為null指的是傳入一個空任務,* 目的是創(chuàng)建一個新工作線程去處理隊列中的剩余任務* 第二個參數(shù)為false目的是提示可以擴容到最大工作線程數(shù)*/addWorker(null, false);}/*** 情況三:一旦線程池被關(guān)閉 或者 新任務入隊失敗(隊列已滿)* 執(zhí)行方案:會嘗試創(chuàng)建一個新的工作線程,并允許擴容到最大工作線程數(shù)* 注意:一旦創(chuàng)建失敗,比如超過最大工作線程數(shù),需要執(zhí)行任務拒絕策略*/else if (!addWorker(command, false))//執(zhí)行任務拒絕策略reject(command);}
  • addWorker() - 新增工作線程
/*** 新增工作線程需要遵守線程池控制狀態(tài)規(guī)定和邊界限制** @param core core為true時允許擴容到核心工作線程數(shù),否則為最大工作線程數(shù)* @return 新增成功返回true,失敗返回false*/ private boolean addWorker(Runnable firstTask, boolean core) {//重試標簽retry:/**** 外部自旋 -> 目的是確認是否能夠新增工作線程* 允許新增線程的條件有兩個:* 1.滿足線程池狀態(tài)條件 -> 條件一* 2.實際工作線程滿足數(shù)量邊界條件 -> 條件二* 不滿足條件時會直接返回false,表示新增工作線程失敗*/for (;;) {//讀取原子控制量 - 包含workerCount(實際工作線程數(shù))和runState(線程池狀態(tài))int c = ctl.get();//讀取線程池狀態(tài)int rs = runStateOf(c);/*** 條件一.判斷是否滿足線程池狀態(tài)條件* 1.只有兩種情況允許新增線程:* 1.1 線程池狀態(tài)==RUNNING* 1.2 線程池狀態(tài)==SHUTDOWN且firstTask為null同時隊列非空** 2.線程池狀態(tài)>=SHUTDOWN時不允許接收新任務,具體如下:* 2.1 線程池狀態(tài)>SHUTDOWN,即為STOP、TIDYING、TERMINATED* 2.2 線程池狀態(tài)==SHUTDOWN,但firstTask非空* 2.3 線程池狀態(tài)==SHUTDOWN且firstTask為空,但隊列為空* 補充:針對1.2、2.2、2.3的情況具體請參加后面的"小問答"環(huán)節(jié)*/if (rs >= SHUTDOWN &&!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;/**** 內(nèi)部自旋 -> 條件二.判斷實際工作線程數(shù)是否滿足數(shù)量邊界條件* -數(shù)量邊界條件滿足會對嘗試workerCount實現(xiàn)CAS自增,否則新增失敗* -當CAS失敗時會再次重新判斷是否滿足新增條件:* 1.若此期間線程池狀態(tài)突變(被關(guān)閉),重新判斷線程池狀態(tài)條件和數(shù)量邊界條件* 2.若此期間線程池狀態(tài)一致,則只需重新判斷數(shù)量邊界條件*/for (;;) {//讀取實際工作線程數(shù)int wc = workerCountOf(c);/*** 新增工作線程會因兩種實際工作線程數(shù)超標情況而失敗:* 1.實際工作線程數(shù) >= 最大容量* 2.實際工作線程數(shù) > 工作線程比較邊界數(shù)(當前最大擴容數(shù))* -若core = true,比較邊界數(shù) = 核心工作線程數(shù)* -若core = false,比較邊界數(shù) = 最大工作線程數(shù)*/if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;/*** 實際工作線程計數(shù)CAS自增:* 1.一旦成功直接退出整個retry循環(huán),表明新增條件都滿足* 2.因并發(fā)競爭導致CAS更新失敗的原因有三種: * 2.1 線程池剛好已新增一個工作線程* -> 計數(shù)增加,只需重新判斷數(shù)量邊界條件* 2.2 剛好其他工作線程運行期發(fā)生錯誤或因超時被回收* -> 計數(shù)減少,只需重新判斷數(shù)量邊界條件* 2.3 剛好線程池被關(guān)閉 * -> 計數(shù)減少,工作線程被回收,* 需重新判斷線程池狀態(tài)條件和數(shù)量邊界條件*/if (compareAndIncrementWorkerCount(c))break retry;//重新讀取原子控制量 -> 原因是在此期間可能線程池被關(guān)閉了c = ctl.get();/*** 快速檢測是否發(fā)生線程池狀態(tài)突變* 1.若狀態(tài)突變,重新判斷線程池狀態(tài)條件和數(shù)量邊界條件* 2.若狀態(tài)一致,則只需重新判斷數(shù)量邊界條件*/if (runStateOf(c) != rs)continue retry;}}/*** 這里是addWorker方法的一個分割線* 前面的代碼的作用是決定了線程池接受還是拒絕新增工作線程* 后面的代碼的作用是真正開始新增工作線程并封裝成Worker接著執(zhí)行后續(xù)操作* PS:雖然筆者覺得這個方法其實可以拆分成兩個方法的(在break retry的位置)*///記錄新增的工作線程是否開始工作boolean workerStarted = false;//記錄新增的worker是否成功添加到workers集合中boolean workerAdded = false;Worker w = null;try {//將新提交的任務和當前線程封裝成一個Workerw = new Worker(firstTask);//獲取新創(chuàng)建的實際工作線程final Thread t = w.thread;/*** 檢測是否有可執(zhí)行任務的線程,即是否成功創(chuàng)建了新的工作線程* 1.若存在,則選擇執(zhí)行任務* 2.若不存在,則需要執(zhí)行addWorkerFailed()方法*/if (t != null) {/*** 新增工作線程需要加全局鎖* 目的是為了確保安全更新workers集合和largestPoolSize*/final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {/*** 獲得全局鎖后,需再次檢測當前線程池狀態(tài)* 原因在于預防兩種非法情況:* 1.線程工廠創(chuàng)建線程失敗* 2.在鎖被獲取之前,線程池就被關(guān)閉了*/int rs = runStateOf(ctl.get());/*** 只有兩種情況是允許添加work進入works集合的* 也只有進入workers集合后才是真正的工作線程,并開始執(zhí)行任務* 1.線程池狀態(tài)為RUNNING(即rs<SHUTDOWN)* 2.線程池狀態(tài)為SHUTDOWN且傳入一個空任務* (理由參見:小問答之快速檢測線程池狀態(tài)?) */if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {/*** 若線程處于活動狀態(tài)時,說明線程已啟動,需要立即拋出"線程狀態(tài)非法異常"* 原因是線程是在后面才被start的,已被start的不允許再被添加到workers集合中* 換句話說該方法新增線程時,而線程是新的,本身應該是初始狀態(tài)(new)* 可能出現(xiàn)的場景:自定義線程工廠newThread有可能會提前啟動線程*/if (t.isAlive())throw new IllegalThreadStateException();//由于加鎖,所以可以放心的加入集合workers.add(w);int s = workers.size();//更新最大工作線程數(shù),由于持有鎖,所以無需CASif (s > largestPoolSize)largestPoolSize = s;//確認新建的worker已被添加到workers集合中 workerAdded = true;}} finally {//千萬不要忘記主動解鎖mainLock.unlock();}/*** 一旦新建工作線程被加入工作線程集合中,就意味著其可以開始干活了* 有心的您肯定發(fā)現(xiàn)在線程start之前已經(jīng)釋放鎖了* 原因在于一旦workerAdded為true時,說明鎖的目的已經(jīng)達到* 根據(jù)最小化鎖作用域的原則,線程執(zhí)行任務無須加鎖,這是種優(yōu)化* 也希望您在使用鎖時盡量保證鎖的作用域最小化*/if (workerAdded) {/*** 啟動線程,開始干活啦* 若您看過筆者的"并發(fā)番@Thread一文通"肯定知道start()后,* 一旦線程初始化完成便會立即調(diào)用run()方法*/t.start();//確認該工作線程開始干活了workerStarted = true;}}} finally {//若新建工作線程失敗或新建工作線程后沒有成功執(zhí)行,需要做新增失敗處理if (!workerStarted)addWorkerFailed(w);}//返回結(jié)果表明新建的工作線程是否已啟動執(zhí)行return workerStarted; }

結(jié)論之啟動調(diào)用會經(jīng)歷一下過程:

(1) worker = new Worker(Runnable) --> (2) thread = newThread(worker) --> (3) thread.start() --> (4) thread.run()[JVM自動調(diào)用] --> (5) worker.run() --> (6) threadPoolExecuter.runWorker(worker)

- runWorker() - 執(zhí)行任務 final void runWorker(Worker w) {//讀取當前線程 -即調(diào)用execute()方法的線程(一般是主線程)Thread wt = Thread.currentThread();//讀取待執(zhí)行任務Runnable task = w.firstTask;//清空任務 -> 目的是用來接收下一個任務w.firstTask = null;/*** 注意Worker本身也是一把不可重入的互斥鎖!* 由于Worker初始化時state=-1,因此此處的解鎖的目的是:* 將state-1變成0,因為只有state>=0時才允許中斷;* 同時也側(cè)面說明在worker調(diào)用runWorker()之前是不允許被中斷的,* 即運行前不允許被中斷*/w.unlock();//記錄是否因異常/錯誤突然完成,默認有異常/錯誤發(fā)生boolean completedAbruptly = true;try {/*** 獲取任務并執(zhí)行任務,取任務分兩種情況:* 1.初始任務:Worker被初始化時賦予的第一個任務(firstTask)* 2.隊列任務:當firstTask任務執(zhí)行好后,線程不會被回收,而是之后自動自旋從任務隊列中取任務(getTask)* 此時即體現(xiàn)了線程的復用*/while (task != null || (task = getTask()) != null) {/*** Worker加鎖的目的是為了在shutdown()時不要立即終止正在運行的worker,* 因為需要先持有鎖才能終止,而不是為了處理并發(fā)情況(注意不是全局鎖)* 在shutdownNow()時會立即終止worker,因為其無須持有鎖就能終止* 關(guān)于關(guān)閉線程池下文會再具體詳述*/w.lock();/*** 當線程池被關(guān)閉且主線程非中斷狀態(tài)時,需要重新中斷它* 由于調(diào)用線程一般是主線程,因此這里是主線程代指調(diào)用線程*/if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {/*** 每個任務執(zhí)行前都會調(diào)用"前置方法",* 在"前置方法"可能會拋出異常,* 結(jié)果是退出循環(huán)且completedAbruptly=true,* 從而線程死亡,任務未執(zhí)行(并被丟棄)*/beforeExecute(wt, task);Throwable thrown = null;try {//執(zhí)行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {/*** 任務執(zhí)行結(jié)束后,會調(diào)用"后置方法"* 該方法也可能拋異常從而導致線程死亡* 但值得注意的是任務已經(jīng)執(zhí)行完畢*/afterExecute(task, thrown);}} finally {//清空任務 help gctask = null;//無論成功失敗任務數(shù)都要+1,由于持有鎖所以無須CASw.completedTasks++;//必須要主動釋放鎖w.unlock();}}//無異常時需要清除異常狀態(tài)completedAbruptly = false;} finally {/*** 工作線程退出循環(huán)的原因有兩個:* 1.因意外的錯誤/異常退出* 2.getTask()返回空 -> 原因有四種,下文會詳述* 工作線程退出循環(huán)后,需要執(zhí)行相對應的回收處理*/processWorkerExit(w, completedAbruptly);} } - getTask() - 獲取任務造成getTask()方法返回null的原因有5種:

1.線程池被關(guān)閉,狀態(tài)為(STOP || TIDYING || TERMINATED)

2.線程池被關(guān)閉,狀態(tài)為SHUTDOWN且任務隊列為空

3.實際工作線程數(shù)超過最大工作線程數(shù)

4.工作線程滿足超時條件后,同時符合下述的任意一種情況:
   4.1 線程池中還存在至少一個其他可用的工作線程

4.2 線程池中已沒有其他可用的工作線程但任務隊列為空

private Runnable getTask() {// 記錄任務隊列的poll()是否超時,默認未超時boolean timedOut = false; //自旋獲取任務for (;;) {/*** 線程池會依次判斷五種情況,滿足任意一種就返回null:* 1.線程池被關(guān)閉,狀態(tài)為(STOP || TIDYING || TERMINATED)* 2.線程池被關(guān)閉,狀態(tài)為SHUTDOWN且任務隊列為空* 3.實際工作線程數(shù)超過最大工作線程數(shù)* 4.工作線程滿足超時條件后,同時符合下述的任意一種情況:* 4.1 線程池中還存在至少一個其他可用的工作線程* 4.2 線程池中已沒有其他可用的工作線程但任務隊列為空*/int c = ctl.get();int rs = runStateOf(c);/*** 判斷線程池狀態(tài)條件,有兩種情況直接返回null* 1.線程池狀態(tài)大于SHUTDOWN(STOP||TIDYING||TERMINATED),說明不允許再執(zhí)行任務* - 因為>=STOP以上狀態(tài)時不允許接收新任務同時會中斷正在執(zhí)行中的任務,任務隊列的任務也不執(zhí)行了 * * 2.線程池狀態(tài)為SHUTDOWN且任務隊列為空,說明已經(jīng)無任務可執(zhí)行* - 因為SHUTDOWN時還需要執(zhí)行任務隊列的剩余任務,只有當無任務才可退出*/if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {/*** 減少一個工作線程數(shù)* 值得注意的是工作線程的回收是放在processWorkerExit()中進行的* decrementWorkerCount()方法是內(nèi)部不斷循環(huán)執(zhí)行CAS的,保證最終一定會成功* 補充:因線程池被關(guān)閉而計數(shù)減少可能與addWorker()的* 計數(shù)CAS自增發(fā)生并發(fā)競爭*/decrementWorkerCount();return null;}//讀取實際工作線程數(shù)int wc = workerCountOf(c);/*** 判斷是否需要處理超時:* 1.allowCoreThreadTimeOut = true 表示需要回收空閑超時的核心工作線程* 2.wc > corePoolSize 表示存在空閑超時的非核心工作線程需要回收*/boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;/*** 有三種情況會實際工作線程計數(shù)-1且直接返回null** 1.實際工作線程數(shù)超過最大線程數(shù)* 2.該工作線程滿足空閑超時條件需要被回收:* 2.1 當線程池中還存在至少一個其他可用的工作線程* 2.2 線程池中已沒有其他可用的工作線程但任務隊列為空* * 結(jié)合2.1和2.2我們可以推導出:** 1.當任務隊列非空時,線程池至少需要維護一個可用的工作線程,* 因此此時即使該工作線程超時也不會被回收掉而是繼續(xù)獲取任務** 2.當實際工作線程數(shù)超標或獲取任務超時時,線程池會因為* 一直沒有新任務可執(zhí)行,而逐漸減少線程直到核心線程數(shù)為止;* 若設(shè)置allowCoreThreadTimeOut為true,則減少到1為止;** 提示:由于wc > maximumPoolSize時必定wc > 1,因此無須比較* (wc > maximumPoolSize && workQueue.isEmpty()) 這種情況*/if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {/*** CAS失敗的原因還是出現(xiàn)并發(fā)競爭,具體參考上文* 當CAS失敗后,說明實際工作線程數(shù)已經(jīng)發(fā)生變化,* 必須重新判斷實際工作線程數(shù)和超時情況* 因此需要countinue*/if (compareAndDecrementWorkerCount(c))return null;/** */ continue;}//若滿足獲取任務條件,根據(jù)是否需要超時獲取會調(diào)用不同方法try {/*** 從任務隊列中取任務分兩種:* 1.timed=true 表明需要處理超時情況* -> 調(diào)用poll(),超過keepAliveTime返回null* 2.timed=fasle 表明無須處理超時情況* -> 調(diào)用take(),無任務則掛起等待*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//一旦獲取到任務就返回該任務并退出循環(huán)if (r != null)return r;//當任務為空時說明poll超時timedOut = true;/*** 關(guān)于中斷異常獲取簡單講一些超出本章范疇的內(nèi)容* take()和poll(long timeout, TimeUnit unit)都會throws InterruptedException* 原因在LockSupport.park(this)不會拋出異常但會響應中斷;* 但ConditionObject的await()會通過reportInterruptAfterWait()響應中斷* 具體內(nèi)容筆者會在阻塞隊列相關(guān)番中進一步介紹*/} catch (InterruptedException retry) {/*** 一旦該工作線程被中斷,需要清除超時標記* 這表明當工作線程在獲取隊列任務時被中斷,* 若您不對中斷異常做任務處理,線程池就默認* 您希望線程繼續(xù)執(zhí)行,這樣就會重置之前的超時標記*/timedOut = false;}} }

■ 關(guān)閉線程池

  • 使用shutdown()關(guān)閉線程池最主要執(zhí)行5個操作:
      1.獲取全局鎖

2.CAS自旋變更線程池狀態(tài)為SHUTDOWN

3.中斷所有空閑工作線程(設(shè)置中斷標記) -> 注意是空閑

4.釋放全局鎖

5.嘗試終止線程池

/*** 有序關(guān)閉線程池* 在關(guān)閉過程中,之前已提交的任務將被執(zhí)行(包括正在和隊列中的),* 但新提交的任務會被拒絕* 如果線程池已經(jīng)被關(guān)閉,調(diào)用該方法不會有任何附加效果*/ public void shutdown() {//1.獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//2.CAS自旋變更線程池狀態(tài)為SHUTDOWNadvanceRunState(SHUTDOWN);//3.中斷所有空閑工作線程interruptIdleWorkers();//專門提供給ScheduledThreadPoolExecutor的鉤子方法onShutdown();} finally {//4.釋放全局鎖mainLock.unlock();}/*** 5.嘗試終止線程池,此時線程池滿足兩個條件:* 1.線程池狀態(tài)為SHUTDOWN* 2.所有空閑工作線程已被中斷*/tryTerminate(); }
  • 使用shutdownNow()關(guān)閉線程池最主要執(zhí)行六個操作:
      1.獲取全局鎖

2.CAS自旋變更線程池狀態(tài)為SHUTDOWN

3.中斷所有工作線程(設(shè)置中斷標記)

4.將剩余任務重新放入一個list中并清空任務隊列

5.釋放全局鎖

6.嘗試終止線程池

/*** 嘗試中斷所有工作線程,并返回待處理任務列表集合(從任務隊列中移除)** 1.若想等待執(zhí)行中的線程完成任務,可使用awaitTermination()* 2.由于取消任務操作是通過Thread#interrupt實現(xiàn),因此* 響應中斷失敗的任務可能永遠都不會被終止(謹慎使用!!!)* 響應中斷失敗指的是您選擇捕獲但不處理該中斷異常*/ public List<Runnable> shutdownNow() {List<Runnable> tasks;//1.獲取全局鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//2.CAS自旋更新線程池狀態(tài)為STOPadvanceRunState(STOP);//3.中斷所有工作線程interruptWorkers();//4.將剩余任務重新放入一個list中并清空任務隊列tasks = drainQueue();} finally {//5.釋放全局鎖mainLock.unlock();}/*** 6.嘗試終止線程池,此時線程池滿足兩個條件:* 1.線程池狀態(tài)為STOP* 2.任務隊列為空* 注意:此時不一定所有工作線程都被中斷回收,詳述見* 7.3 tryTerminate*/tryTerminate();//5.返回待處理任務列表集合return tasks; }

■ 飽和拒絕策略

線程池的飽和拒絕策略主要用于拒絕任務(但這并不意味著該任務不會被執(zhí)行),線程池原生提供了四種飽和拒絕策略,基本涵蓋常見的飽和處理場景:

AbortPolicy:默認策略,直接拋出異常

CallerRunsPolicy:只用調(diào)用線程執(zhí)行該任務

DiscardPolicy:直接丟棄任務

DiscardOldestPolicy:丟棄隊尾任務并用線程池重新嘗試執(zhí)行該任務

所有的拒絕策略都需要實現(xiàn)該拒絕處理器接口,以統(tǒng)一口徑: /*** 用于拒絕線程池任務的處理器*/ public interface RejectedExecutionHandler {/*** 該方法用于拒絕接受線程池任務* * 有三種情況可能調(diào)用該方法:* 1.沒有更多的工作線程可用* 2.任務隊列已滿* 3.關(guān)閉線程池** 當沒有其他處理選擇時,該方法會選擇拋出RejectedExecutionException異常* 該異常會向上拋出直到execute()的調(diào)用者*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
  • CallerRunsPolicy

處理規(guī)則:新提交任務由調(diào)用者線程直接執(zhí)行

推薦:拒絕策略推薦使用CallerRunsPolicy,理由是該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調(diào)用者線程中執(zhí)行

/*** 不會直接丟棄,而是直接用調(diào)用execute()方法的線程執(zhí)行該方法* 當然一旦線程池已經(jīng)被關(guān)閉,還是要丟棄的** 補充:值得注意的是所有策略類都是public的靜態(tài)內(nèi)部類,* 其目的應該是告知使用者 -> 該類與線程池相關(guān)但無需線程池實例便可直接使用*/ public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }/*** 直接使用調(diào)用該方法的線程執(zhí)行任務* 除非線程池被關(guān)閉時才會丟棄該任務*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {//一旦線程池被關(guān)閉,丟棄該任務if (!e.isShutdown()) {//注意此時不是線程池執(zhí)行該任務r.run();}} } - DiscardPolicy處理規(guī)則:根據(jù)LIFO(后進先出)規(guī)則直接丟棄最新提交的任務 /*** 直接丟棄任務* 這個太狠了,連個案底都沒有,慎用啊*/ public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }/*** 無作為即為丟棄*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} } - DiscardOldestPolicy

處理規(guī)則:根據(jù)LRU(最近最少使用)規(guī)則丟棄最后一個任務,然后嘗試執(zhí)行新提交的任務

/*** 比起直接丟棄,該類會丟棄隊列里最后一個但仍未被處理的任務,* 然后會重新調(diào)用execute()方法處理當前任務* 除非線程池被關(guān)閉時才會丟棄該任務* 此類充分證明了"來得早不如來的巧"*/ public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }/*** 丟棄隊列里最近的一個任務,并執(zhí)行當前任務* 除非線程池被關(guān)閉時才會丟棄該任務* 原因是隊列是遵循先進先出FIFO原則,poll()會彈出隊尾元素*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {//一旦線程池被關(guān)閉,直接丟棄if (!e.isShutdown()) {//彈出隊尾元素e.getQueue().poll();//直接用線程池執(zhí)行當前任務e.execute(r);}} }

總結(jié)

以上是生活随笔為你收集整理的腾讯面试题Java 并发包之线程池综述的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。