日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析

發(fā)布時(shí)間:2024/1/17 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

ThreadPoolExecutor是Executor執(zhí)行框架最重要的一個(gè)實(shí)現(xiàn)類,提供了線程池管理和任務(wù)管理是兩個(gè)最基本的能力。這篇通過(guò)分析ThreadPoolExecutor的源碼來(lái)看看如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)基于生產(chǎn)者消費(fèi)者模型的執(zhí)行器。

?

生產(chǎn)者消費(fèi)者模型

生產(chǎn)者消費(fèi)者模型包含三個(gè)角色:生產(chǎn)者,工作隊(duì)列,消費(fèi)者。對(duì)于ThreadPoolExecutor來(lái)說(shuō),

1. 生產(chǎn)者是任務(wù)的提交者,是外部調(diào)用ThreadPoolExecutor的線程

2. 工作隊(duì)列是一個(gè)阻塞隊(duì)列的接口,具體的實(shí)現(xiàn)類可以有很多種。BlockingQueue<Runnable> workQueue;

3. 消費(fèi)者是封裝了線程的Worker類的集合。HashSet<Worker> workers = new HashSet<Worker>();

?

?

主要屬性

明確了ThreadPoolExecutor的基本執(zhí)行模型之后,來(lái)看下它的幾個(gè)主要屬性:

1.?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));??? 一個(gè)32位的原子整形作為線程池的狀態(tài)控制描述符。低29位作為工作者線程的數(shù)量。所以工作者線程最多有2^29 -1個(gè)。高3位來(lái)保持線程池的狀態(tài)。ThreadPoolExecutor總共有5種狀態(tài):

???? *?? RUNNING:? 可以接受新任務(wù)并執(zhí)行
???? *?? SHUTDOWN: 不再接受新任務(wù),但是仍然執(zhí)行工作隊(duì)列中的任務(wù)
???? *?? STOP:???? 不再接受新任務(wù),不執(zhí)行工作隊(duì)列中的任務(wù),并且中斷正在執(zhí)行的任務(wù)
???? *?? TIDYING:? 所有任務(wù)被終止,工作線程的數(shù)量為0,會(huì)去執(zhí)行terminated()鉤子方法
???? *?? TERMINATED: terminated()執(zhí)行結(jié)束

?

下面是一系列ctl這個(gè)變量定義和工具方法

?

?
  • private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  • private static final int COUNT_BITS = Integer.SIZE - 3;

  • private static final int CAPACITY = (1 << COUNT_BITS) - 1;

  • ?
  • // runState is stored in the high-order bits

  • private static final int RUNNING = -1 << COUNT_BITS;

  • private static final int SHUTDOWN = 0 << COUNT_BITS;

  • private static final int STOP = 1 << COUNT_BITS;

  • private static final int TIDYING = 2 << COUNT_BITS;

  • private static final int TERMINATED = 3 << COUNT_BITS;

  • ?
  • // Packing and unpacking ctl

  • private static int runStateOf(int c) { return c & ~CAPACITY; }

  • private static int workerCountOf(int c) { return c & CAPACITY; }

  • private static int ctlOf(int rs, int wc) { return rs | wc; }

  • ?
  • private static boolean runStateLessThan(int c, int s) {

  • return c < s;

  • }

  • ?
  • private static boolean runStateAtLeast(int c, int s) {

  • return c >= s;

  • }

  • ?
  • private static boolean isRunning(int c) {

  • return c < SHUTDOWN;

  • }

  • ?
  • private boolean compareAndIncrementWorkerCount(int expect) {

  • return ctl.compareAndSet(expect, expect + 1);

  • }

  • ?
  • private boolean compareAndDecrementWorkerCount(int expect) {

  • return ctl.compareAndSet(expect, expect - 1);

  • }

  • ?
  • private void decrementWorkerCount() {

  • do {} while (! compareAndDecrementWorkerCount(ctl.get()));

  • }


  • 2. private final BlockingQueue<Runnable> workQueue; 工作隊(duì)列,采用了BlockingQueue阻塞隊(duì)列的接口,具體實(shí)現(xiàn)類可以按照不同的策略來(lái)選擇,比如有邊界的ArrayBlockingQueue,無(wú)邊界的LinkedBlockingQueue。

    ?

    3. private final ReentrantLock mainLock = new ReentrantLock();? 控制ThreadPoolExecutor的全局可重入鎖,所有需要同步的操作都要被這個(gè)鎖保護(hù)

    4. private final Condition termination = mainLock.newCondition(); mainLock的條件隊(duì)列,來(lái)進(jìn)行wait()和notify()等條件操作

    5. private final HashSet<Worker> workers = new HashSet<Worker>();? 工作線程集合

    6. private volatile ThreadFactory threadFactory; 創(chuàng)建線程的工廠,可以自定義線程創(chuàng)建的邏輯

    7. private volatile RejectedExecutionHandler handler;? 拒絕執(zhí)行任務(wù)的處理器,可以自定義拒絕的策略

    8. private volatile long keepAliveTime;?? 空閑線程的存活時(shí)間。可以根據(jù)這個(gè)存活時(shí)間來(lái)判斷空閑線程是否等待超時(shí),然后采取相應(yīng)的線程回收操作

    9. private volatile boolean allowCoreThreadTimeOut;? 是否允許coreThread線程超時(shí)回收

    10. private volatile int corePoolSize;? 可存活的線程的最小值。如果設(shè)置了allowCoreThreadTimeOut, 那么corePoolSize的值可以為0。

    11. private volatile int maximumPoolSize;? 可存活的線程的最大值

    ?

    工作線程創(chuàng)建和回收策略

    ThreadPoolExecutor通過(guò)corePoolSize,maximumPoolSize, allowCoreThreadTimeOut,keepAliveTime等幾個(gè)參數(shù)提供一個(gè)靈活的工作線程創(chuàng)建和回收的策略。

    創(chuàng)建策略:

    1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),不管其他線程是否空閑,都創(chuàng)建新的工作線程來(lái)處理新加入的任務(wù)

    2. 當(dāng)工作線程數(shù)量大于corePoolSize,小于maximumPoolSize時(shí),只有當(dāng)工作隊(duì)列滿了,才會(huì)創(chuàng)建新的工作線程來(lái)處理新加入的任務(wù)。當(dāng)工作隊(duì)列有空余時(shí),只把新任務(wù)加入隊(duì)列

    3. 把corePoolSize和maximumPoolSize 設(shè)置成相同的值時(shí),線程池就是一個(gè)固定(fixed)工作線程數(shù)的線程。

    回收策略:

    1. keepAliveTime變量設(shè)置了空閑工作線程超時(shí)的時(shí)間,當(dāng)工作線程數(shù)量超過(guò)了corePoolSize后,空閑的工作線程等待超過(guò)了keepAliveTime后,會(huì)被回收。后面會(huì)說(shuō)怎么確定一個(gè)工作線程是否“空閑”。

    2. 如果設(shè)置了allowCoreThreadTimeOut,那么core Thread也可以被回收,即當(dāng)core thread也空閑時(shí),也可以被回收,直到工作線程集合為0。

    工作隊(duì)列策略

    ?

    工作隊(duì)列BlockingQueue<Runnable> workQueue 是用來(lái)存放提交的任務(wù)的。它有4個(gè)基本的策略,并且根據(jù)不同的阻塞隊(duì)列的實(shí)現(xiàn)類可以引入更多的工作隊(duì)列的策略。

    4個(gè)基本策略:

    1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),新提交的任務(wù)總是會(huì)由新創(chuàng)建的工作線程執(zhí)行,不入隊(duì)列

    2. 當(dāng)工作線程數(shù)量大于corePoolSize,如果工作隊(duì)列沒(méi)滿,新提交的任務(wù)就入隊(duì)列

    3. 當(dāng)工作線程數(shù)量大于corePoolSize,小于MaximumPoolSize時(shí),如果工作隊(duì)列滿了,新提交的任務(wù)就交給新創(chuàng)建的工作線程,不入隊(duì)列

    4. 當(dāng)工作線程數(shù)量大于MaximumPoolSize,并且工作隊(duì)列滿了,那么新提交的任務(wù)會(huì)被拒絕執(zhí)行。具體看采用何種拒絕策略

    根據(jù)不同的阻塞隊(duì)列的實(shí)現(xiàn)類,又有幾種額外的策略

    1. 采用SynchronousQueue直接將任務(wù)傳遞給空閑的線程執(zhí)行,不額外存儲(chǔ)任務(wù)。這種方式需要無(wú)限制的MaximumPoolSize,可以創(chuàng)建無(wú)限制的工作線程來(lái)處理提交的任務(wù)。這種方式的好處是任務(wù)可以很快被執(zhí)行,適用于任務(wù)到達(dá)時(shí)間大于任務(wù)處理時(shí)間的情況。缺點(diǎn)是當(dāng)任務(wù)量很大時(shí),會(huì)占用大量線程

    2. 采用無(wú)邊界的工作隊(duì)列LinkedBlockingQueue。這種情況下,由于工作隊(duì)列永遠(yuǎn)不會(huì)滿,那么工作線程的數(shù)量最大就是corePoolSize,因?yàn)楫?dāng)工作線程數(shù)量達(dá)到corePoolSize時(shí),只有工作隊(duì)列滿的時(shí)候才會(huì)創(chuàng)建新的工作線程。這種方式好處是使用的線程數(shù)量是穩(wěn)定的,當(dāng)內(nèi)存足夠大時(shí),可以處理足夠多的請(qǐng)求。缺點(diǎn)是如果任務(wù)直接有依賴,很有可能形成死鎖,因?yàn)楫?dāng)工作線程被消耗完時(shí),不會(huì)創(chuàng)建新的工作現(xiàn)場(chǎng),只會(huì)把任務(wù)加入工作隊(duì)列。并且可能由于內(nèi)存耗盡引發(fā)內(nèi)存溢出OOM

    3. 采用有界的工作隊(duì)列AraayBlockingQueue。這種情況下對(duì)于內(nèi)存資源是可控的,但是需要合理調(diào)節(jié)MaximumPoolSize和工作隊(duì)列的長(zhǎng)度,這兩個(gè)值是相互影響的。當(dāng)工作隊(duì)列長(zhǎng)度比較小的時(shí),必定會(huì)創(chuàng)建更多的線程。而更多的線程會(huì)引起上下文切換等額外的消耗。當(dāng)工作隊(duì)列大,MaximumPoolSize小的時(shí)候,會(huì)影響吞吐量,并且會(huì)觸發(fā)拒絕機(jī)制。

    ?

    拒絕執(zhí)行策略

    當(dāng)Executor處于shutdown狀態(tài)或者工作線程超過(guò)MaximumPoolSize并且工作隊(duì)列滿了之后,新提交的任務(wù)將會(huì)被拒絕執(zhí)行。RejectedExecutionHandler接口定義了拒絕執(zhí)行的策略。具體的策略有

    CallerRunsPolicy:由調(diào)用者線程來(lái)執(zhí)行被拒絕的任務(wù),屬于同步執(zhí)行

    AbortPolicy:中止執(zhí)行,拋出RejectedExecutionException異常

    DiscardPolicy:丟棄任務(wù)

    DiscardOldestPolicy:丟棄最老的任務(wù)

    ?

    ?
  • public static class CallerRunsPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code CallerRunsPolicy}.

  • */

  • public CallerRunsPolicy() { }

  • ?
  • /**

  • * Executes task r in the caller's thread, unless the executor

  • * has been shut down, in which case the task is discarded.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • if (!e.isShutdown()) {

  • r.run();

  • }

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that throws a

  • * {@code RejectedExecutionException}.

  • */

  • public static class AbortPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates an {@code AbortPolicy}.

  • */

  • public AbortPolicy() { }

  • ?
  • /**

  • * Always throws RejectedExecutionException.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • * @throws RejectedExecutionException always.

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • throw new RejectedExecutionException("Task " + r.toString() +

  • " rejected from " +

  • e.toString());

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that silently discards the

  • * rejected task.

  • */

  • public static class DiscardPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code DiscardPolicy}.

  • */

  • public DiscardPolicy() { }

  • ?
  • /**

  • * Does nothing, which has the effect of discarding task r.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • }

  • }

  • ?
  • /**

  • * A handler for rejected tasks that discards the oldest unhandled

  • * request and then retries {@code execute}, unless the executor

  • * is shut down, in which case the task is discarded.

  • */

  • public static class DiscardOldestPolicy implements RejectedExecutionHandler {

  • /**

  • * Creates a {@code DiscardOldestPolicy} for the given executor.

  • */

  • public DiscardOldestPolicy() { }

  • ?
  • /**

  • * Obtains and ignores the next task that the executor

  • * would otherwise execute, if one is immediately available,

  • * and then retries execution of task r, unless the executor

  • * is shut down, in which case task r is instead discarded.

  • *

  • * @param r the runnable task requested to be executed

  • * @param e the executor attempting to execute this task

  • */

  • public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

  • if (!e.isShutdown()) {

  • e.getQueue().poll();

  • e.execute(r);

  • }

  • }

  • }

  • ?

    工作線程Worker的設(shè)計(jì)

    工作線程沒(méi)有直接使用Thread,而是采用了Worker類封裝了Thread,目的是更好地進(jìn)行中斷控制。Worker直接繼承了AbstractQueuedSynchronizer來(lái)進(jìn)行同步操作,它實(shí)現(xiàn)了一個(gè)不可重入的互斥結(jié)構(gòu)。當(dāng)它的state屬性為0時(shí)表示unlock,state為1時(shí)表示lock。任務(wù)執(zhí)行時(shí)必須在lock狀態(tài)的保護(hù)下,防止出現(xiàn)同步問(wèn)題。因此當(dāng)Worker處于lock狀態(tài)時(shí),表示它正在運(yùn)行,當(dāng)它處于unlock狀態(tài)時(shí),表示它“空閑”。當(dāng)它空閑超過(guò)keepAliveTime時(shí),就有可能被回收。

    Worker還實(shí)現(xiàn)了Runnable接口, 執(zhí)行它的線程是Worker包含的Thread對(duì)象,在Worker的構(gòu)造函數(shù)可以看到Thread創(chuàng)建時(shí),把Worker對(duì)象傳遞給了它。

    ?

    ?
  • private final class Worker

  • extends AbstractQueuedSynchronizer

  • implements Runnable

  • {

  • ?
  • /** Thread this worker is running in. Null if factory fails. */

  • final Thread thread;

  • /** Initial task to run. Possibly null. */

  • Runnable firstTask;

  • /** Per-thread task counter */

  • volatile long completedTasks;

  • ?
  • Worker(Runnable firstTask) {

  • setState(-1); // inhibit interrupts until runWorker

  • this.firstTask = firstTask;

  • ?
  • // 把Worker對(duì)象作為Runnable的實(shí)例傳遞給了新創(chuàng)建Thread對(duì)象

  • ?this.thread = getThreadFactory().newThread(this);

  • }

  • ?
  • public void run() {

  • runWorker(this);

  • }

  • ?
  • // Lock methods

  • //

  • // The value 0 represents the unlocked state.

  • // The value 1 represents the locked state.

  • ?
  • protected boolean isHeldExclusively() {

  • return getState() != 0;

  • }

  • ?
  • protected boolean tryAcquire(int unused) {

  • if (compareAndSetState(0, 1)) {

  • setExclusiveOwnerThread(Thread.currentThread());

  • return true;

  • }

  • return false;

  • }

  • ?
  • protected boolean tryRelease(int unused) {

  • setExclusiveOwnerThread(null);

  • setState(0);

  • return true;

  • }

  • ?
  • public void lock() { acquire(1); }

  • public boolean tryLock() { return tryAcquire(1); }

  • public void unlock() { release(1); }

  • public boolean isLocked() { return isHeldExclusively(); }

  • ?
  • void interruptIfStarted() {

  • Thread t;

  • if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

  • try {

  • t.interrupt();

  • } catch (SecurityException ignore) {

  • }

  • }

  • }

  • }


  • Worker被它的線程執(zhí)行時(shí),run方法調(diào)用了ThreadPoolExecutor的runWorker方法。

    1. wt指向當(dāng)前執(zhí)行Worker的run方法的線程,也就是指向了Worker包含的工作線程對(duì)象

    2. task指向Worker包含的firstTask對(duì)象,表示當(dāng)前要執(zhí)行的任務(wù)

    3. 當(dāng)task不為null或者從工作隊(duì)列中取到了新任務(wù),那么先加鎖w.lock表示正在運(yùn)行任務(wù)。在真正開始執(zhí)行task.run()之前,先判斷線程池的狀態(tài)是否已經(jīng)STOP,如果是,就中斷Worker的線程。

    4. 一旦判斷當(dāng)前線程不是STOP并且工作線程沒(méi)有中斷。那么就開始執(zhí)行task.run()了。Worker的interruptIfStarted方法可以中斷這個(gè)Worker的線程,從而中斷正在執(zhí)行任務(wù)。

    5.?beforeExecute(wt, task)和afterExecute(wt,task)是兩個(gè)鉤子方法,支持在任務(wù)真正開始執(zhí)行前就行擴(kuò)展。

    ?

    ?
  • final void runWorker(Worker w) {

  • Thread wt = Thread.currentThread();

  • Runnable task = w.firstTask;

  • w.firstTask = null;

  • w.unlock(); // allow interrupts

  • boolean completedAbruptly = true;

  • try {

  • while (task != null || (task = getTask()) != null) {

  • w.lock();

  • // If pool is stopping, ensure thread is interrupted;

  • // if not, ensure thread is not interrupted. This

  • // requires a recheck in second case to deal with

  • // shutdownNow race while clearing interrupt

  • if ((runStateAtLeast(ctl.get(), STOP) ||

  • (Thread.interrupted() &&

  • runStateAtLeast(ctl.get(), STOP))) &&

  • !wt.isInterrupted())

  • wt.interrupt();

  • try {

  • beforeExecute(wt, task);

  • Throwable thrown = null;

  • try {

  • task.run();

  • } catch (RuntimeException x) {

  • thrown = x; throw x;

  • } catch (Error x) {

  • thrown = x; throw x;

  • } catch (Throwable x) {

  • thrown = x; throw new Error(x);

  • } finally {

  • afterExecute(task, thrown);

  • }

  • } finally {

  • task = null;

  • w.completedTasks++;

  • w.unlock();

  • }

  • }

  • completedAbruptly = false;

  • } finally {

  • processWorkerExit(w, completedAbruptly);

  • }

  • }


  • 工作線程Worker創(chuàng)建和回收的源碼

    首先看一下ThreadPoolExecutor的execute方法,這個(gè)方式是任務(wù)提交的入口。可以看到它的邏輯符合之前說(shuō)的工作線程創(chuàng)建的基本策略

    1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),通過(guò)addWorker(command,true)來(lái)新建工作線程處理新建的任務(wù),不入工作隊(duì)列

    2. 當(dāng)工作線程數(shù)量大于等于corePoolSize時(shí),先入隊(duì)列,使用的是BlockingQueue的offer方法。當(dāng)工作線程數(shù)量為0時(shí),還會(huì)通過(guò)addWorker(null, false)添加一個(gè)新的工作線程

    3. 當(dāng)工作隊(duì)列滿了并且工作線程數(shù)量在corePoolSize和MaximumPoolSize之間,就創(chuàng)建新的工作線程去執(zhí)行新添加的任務(wù)。當(dāng)工作線程數(shù)量超過(guò)了MaximumPoolSize,就拒絕任務(wù)。

    ?

    ?
  • public void execute(Runnable command) {

  • if (command == null)

  • throw new NullPointerException();

  • ?
  • int c = ctl.get();

  • if (workerCountOf(c) < corePoolSize) {

  • if (addWorker(command, true))

  • return;

  • c = ctl.get();

  • }

  • if (isRunning(c) && workQueue.offer(command)) {

  • int recheck = ctl.get();

  • if (! isRunning(recheck) && remove(command))

  • reject(command);

  • else if (workerCountOf(recheck) == 0)

  • addWorker(null, false);

  • }

  • else if (!addWorker(command, false))

  • reject(command);

  • }


  • 可以看到addWorker方法是創(chuàng)建Worker工作線程的所在。

    1. retry這個(gè)循環(huán)判斷線程池的狀態(tài)和當(dāng)前工作線程數(shù)量的邊界。如果允許創(chuàng)建工作現(xiàn)場(chǎng),首先修改ctl變量表示的工作線程的數(shù)量

    2. 把工作線程添加到workers集合中的操作要在mainLock這個(gè)鎖的保護(hù)下進(jìn)行。所有和ThreadPoolExecutor狀態(tài)相關(guān)的操作都要在mainLock鎖的保護(hù)下進(jìn)行

    3. w = new Worker(firstTask); 創(chuàng)建Worker實(shí)例,把firstTask作為它當(dāng)前的任務(wù)。firstTask為null時(shí)表示先只創(chuàng)建Worker線程,然后去工作隊(duì)列中取任務(wù)執(zhí)行

    4. 把新創(chuàng)建的Worker實(shí)例加入到workers集合,修改相關(guān)統(tǒng)計(jì)變量。

    5. 當(dāng)加入集合成功后,開始啟動(dòng)這個(gè)Worker實(shí)例。啟動(dòng)的方法是調(diào)用Worker封裝的Thread的start()方法。之前說(shuō)了,這個(gè)Thread對(duì)應(yīng)的Runnable是Worker本身,會(huì)去調(diào)用Worker的run方法,然后調(diào)用ThreadPoolExecutor的runWorker方法。在runWorker方法中真正去執(zhí)行任務(wù)。

    ?

    ?
  • private boolean addWorker(Runnable firstTask, boolean core) {

  • retry:

  • for (;;) {

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • // Check if queue empty only if necessary.

  • if (rs >= SHUTDOWN &&

  • ! (rs == SHUTDOWN &&

  • firstTask == null &&

  • ! workQueue.isEmpty()))

  • return false;

  • ?
  • for (;;) {

  • int wc = workerCountOf(c);

  • if (wc >= CAPACITY ||

  • wc >= (core ? corePoolSize : maximumPoolSize))

  • return false;

  • if (compareAndIncrementWorkerCount(c))

  • break retry;

  • c = ctl.get(); // Re-read ctl

  • if (runStateOf(c) != rs)

  • continue retry;

  • // else CAS failed due to workerCount change; retry inner loop

  • }

  • }

  • ?
  • boolean workerStarted = false;

  • boolean workerAdded = false;

  • Worker w = null;

  • try {

  • final ReentrantLock mainLock = this.mainLock;

  • w = new Worker(firstTask);

  • final Thread t = w.thread;

  • if (t != null) {

  • mainLock.lock();

  • try {

  • // Recheck while holding lock.

  • // Back out on ThreadFactory failure or if

  • // shut down before lock acquired.

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • if (rs < SHUTDOWN ||

  • (rs == SHUTDOWN && firstTask == null)) {

  • if (t.isAlive()) // precheck that t is startable

  • throw new IllegalThreadStateException();

  • workers.add(w);

  • int s = workers.size();

  • if (s > largestPoolSize)

  • largestPoolSize = s;

  • workerAdded = true;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • if (workerAdded) {

  • t.start();

  • workerStarted = true;

  • }

  • }

  • } finally {

  • if (! workerStarted)

  • addWorkerFailed(w);

  • }

  • return workerStarted;

  • }


  • 工作線程回收的方法是processWorkerExit(),它在runWorker方法執(zhí)行結(jié)束的時(shí)候被調(diào)用。之前說(shuō)了空閑的工作線程可能會(huì)在keepAliveTime時(shí)間之后被回收。這個(gè)邏輯隱含在runWorker方法和getTask方法中,會(huì)在下面說(shuō)如何從工作隊(duì)列取任務(wù)時(shí)說(shuō)明。processWorkerExit方法單純只是處理工作線程的回收。

    1. 結(jié)合runWorker方法看,如果Worker執(zhí)行task.run()的時(shí)候拋出了異常,那么completedAbruptly為true,需要從workers集合中把這個(gè)工作線程移除掉。

    2. 如果是completedAbruptly為true,并且線程池不是STOP狀態(tài),那么就創(chuàng)建一個(gè)新的Worker工作線程

    3. 如果是completedAbruptly為false,并且線程池不是STOP狀態(tài),首先檢查是否allowCoreThreadTimeout,如果運(yùn)行,那么最少線程數(shù)可以為0,否則是corePoolSize。如果最少線程數(shù)為0,并且工作隊(duì)列不為空,那么最小值為1。最后檢查當(dāng)前的工作線程數(shù)量,如果小于最小值,就創(chuàng)建新的工作線程。

    ?

    ?
  • private void processWorkerExit(Worker w, boolean completedAbruptly) {

  • if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

  • decrementWorkerCount();

  • ?
  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • completedTaskCount += w.completedTasks;

  • workers.remove(w);

  • } finally {

  • mainLock.unlock();

  • }

  • ?
  • tryTerminate();

  • ?
  • int c = ctl.get();

  • if (runStateLessThan(c, STOP)) {

  • if (!completedAbruptly) {

  • int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

  • if (min == 0 && ! workQueue.isEmpty())

  • min = 1;

  • if (workerCountOf(c) >= min)

  • return; // replacement not needed

  • }

  • addWorker(null, false);

  • }

  • }


  • 任務(wù)的獲取

    工作線程從工作隊(duì)列中取任務(wù)的代碼在getTask方法中

    1. timed變量表示是否要計(jì)時(shí),當(dāng)計(jì)時(shí)超過(guò)keepAliveTime后還沒(méi)取到任務(wù),就返回null。結(jié)合runWorker方法可以知道,當(dāng)getTask返回null時(shí),該Worker線程會(huì)被回收,這就是如何回收空閑工作線程的方法。

    timed變量當(dāng)allowCoreThreadTimeout為true或者當(dāng)工作線程數(shù)大于corePoolSize時(shí)為true。

    2. 如果timed為true,就用BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法來(lái)計(jì)時(shí)從隊(duì)頭取任務(wù),否則直接用take()方法從隊(duì)頭取任務(wù)

    ?

    ?
  • private Runnable getTask() {

  • boolean timedOut = false; // Did the last poll() time out?

  • ?
  • retry:

  • for (;;) {

  • int c = ctl.get();

  • int rs = runStateOf(c);

  • ?
  • // Check if queue empty only if necessary.

  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

  • decrementWorkerCount();

  • return null;

  • }

  • ?
  • boolean timed; // Are workers subject to culling?

  • ?
  • for (;;) {

  • int wc = workerCountOf(c);

  • timed = allowCoreThreadTimeOut || wc > corePoolSize;

  • ?
  • if (wc <= maximumPoolSize && ! (timedOut && timed))

  • break;

  • if (compareAndDecrementWorkerCount(c))

  • return null;

  • c = ctl.get(); // Re-read ctl

  • if (runStateOf(c) != rs)

  • continue retry;

  • // else CAS failed due to workerCount change; retry inner loop

  • }

  • ?
  • try {

  • Runnable r = timed ?

  • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  • workQueue.take();

  • if (r != null)

  • return r;

  • timedOut = true;

  • } catch (InterruptedException retry) {

  • timedOut = false;

  • }

  • }

  • }


  • 線程池的關(guān)閉

    線程池有SHUTDOWN, STOP, TIDYING, TERMINATED這幾個(gè)狀態(tài)和線程池關(guān)閉相關(guān)。通常我們把關(guān)閉分為優(yōu)雅的關(guān)閉和強(qiáng)制立刻關(guān)閉。

    所謂優(yōu)雅的關(guān)閉就是調(diào)用shutdown()方法,線程池進(jìn)入SHUTDOWN狀態(tài),不在接收新的任務(wù),會(huì)把工作隊(duì)列的任務(wù)執(zhí)行完畢后再結(jié)束。

    強(qiáng)制立刻關(guān)閉就是調(diào)用shutdownNow()方法,線程池直接進(jìn)入STOP狀態(tài),會(huì)中斷正在執(zhí)行的工作線程,清空工作隊(duì)列。

    1. 在shutdown方法中,先設(shè)置線程池狀態(tài)為SHUTDOWN,然后先去中斷空閑的工作線程,再調(diào)用onShutdown鉤子方法。最后tryTerminate()

    2. 在shutdownNow方法中,先設(shè)置線程池狀態(tài)為STOP,然后先中斷所有的工作線程,再清空工作隊(duì)列。最后tryTerminate()。這個(gè)方法會(huì)把工作隊(duì)列中的任務(wù)返回給調(diào)用者處理。

    ?

    ?
  • public void shutdown() {

  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • checkShutdownAccess();

  • advanceRunState(SHUTDOWN);

  • interruptIdleWorkers();

  • onShutdown(); // hook for ScheduledThreadPoolExecutor

  • } finally {

  • mainLock.unlock();

  • }

  • tryTerminate();

  • }

  • ?
  • ? public List<Runnable> shutdownNow() {

  • ??????? List<Runnable> tasks;

  • ??????? final ReentrantLock mainLock = this.mainLock;

  • ??????? mainLock.lock();

  • ??????? try {

  • ??????????? checkShutdownAccess();

  • ??????????? advanceRunState(STOP);

  • ??????????? interruptWorkers();

  • ??????????? tasks = drainQueue();

  • ??????? } finally {

  • ??????????? mainLock.unlock();

  • ??????? }

  • ??????? tryTerminate();

  • ??????? return tasks;

  • ??? }


  • interruptIdleWorkers方法會(huì)去中斷空閑的工作線程,所謂空閑的工作線程即沒(méi)有上鎖的Worker。

    而interruptWorkers方法直接去中斷所有的Worker,調(diào)用Worker.interruptIfStarted()方法

    ?

    ?
  • private void interruptIdleWorkers(boolean onlyOne) {

  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • for (Worker w : workers) {

  • Thread t = w.thread;

  • if (!t.isInterrupted() && w.tryLock()) {

  • try {

  • t.interrupt();

  • } catch (SecurityException ignore) {

  • } finally {

  • w.unlock();

  • }

  • }

  • if (onlyOne)

  • break;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • }

  • ?
  • ?private void interruptWorkers() {

  • ??????? final ReentrantLock mainLock = this.mainLock;

  • ??????? mainLock.lock();

  • ??????? try {

  • ??????????? for (Worker w : workers)

  • ??????????????? w.interruptIfStarted();

  • ??????? } finally {

  • ??????????? mainLock.unlock();

  • ??????? }

  • ??? }

  • ?
  • ? void interruptIfStarted() {

  • ??????????? Thread t;

  • ??????????? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

  • ??????????????? try {

  • ??????????????????? t.interrupt();

  • ??????????????? } catch (SecurityException ignore) {

  • ??????????????? }

  • ??????????? }

  • ??????? }


  • tryTerminate方法會(huì)嘗試終止線程池,根據(jù)線程池的狀態(tài),在相應(yīng)狀態(tài)會(huì)中斷空閑工作線程,調(diào)用terminated()鉤子方法,設(shè)置狀態(tài)為TERMINATED。

    ?

    ?
  • final void tryTerminate() {

  • for (;;) {

  • int c = ctl.get();

  • if (isRunning(c) ||

  • runStateAtLeast(c, TIDYING) ||

  • (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

  • return;

  • if (workerCountOf(c) != 0) { // Eligible to terminate

  • interruptIdleWorkers(ONLY_ONE);

  • return;

  • }

  • ?
  • final ReentrantLock mainLock = this.mainLock;

  • mainLock.lock();

  • try {

  • if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

  • try {

  • terminated();

  • } finally {

  • ctl.set(ctlOf(TERMINATED, 0));

  • termination.signalAll();

  • }

  • return;

  • }

  • } finally {

  • mainLock.unlock();

  • }

  • // else retry on failed CAS

  • }

  • }


  • 最后說(shuō)明一下,JVM的守護(hù)進(jìn)程只有當(dāng)所有派生出來(lái)的線程都結(jié)束后才會(huì)退出,使用ThreadPoolExecutor線程池時(shí),如果有的任務(wù)一直執(zhí)行,并且不響應(yīng)中斷,那么會(huì)一直占用線程,那么JVM也會(huì)一直工作,不會(huì)退出。

    總結(jié)

    以上是生活随笔為你收集整理的聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。