线程池invokeAll方法详解
線程池invokeAll方法詳解
- 問(wèn)題起源與抽象
- 問(wèn)題排查與猜測(cè)
- 猜測(cè)一:invokeAll 在異步執(zhí)行后會(huì)不會(huì)同步等待線程執(zhí)行完畢獲取最終結(jié)果
- 猜測(cè)二:隊(duì)列里面可能存在第一次調(diào)用 invokeAll 執(zhí)行了但沒(méi)有刪掉的任務(wù),所以才會(huì)導(dǎo)致第二次放入隊(duì)列失敗
- 兩次猜測(cè)失敗后的總結(jié)
- 復(fù)查源碼,真相大白
- 問(wèn)題解決方案
- 參考
線上真實(shí)案例,多次調(diào)用線程池 ThreadPoolExecutor 的 invokeAll() 方法進(jìn)行數(shù)據(jù)統(tǒng)計(jì)時(shí)任務(wù)被拒絕,故事從此開始。
本文重在講述問(wèn)題的產(chǎn)生、抽象、尋找解決方法的過(guò)程,并結(jié)合源碼對(duì)原因進(jìn)行抽絲剝繭般的分析。bug 千千萬(wàn)萬(wàn),唯有合理的邏輯推理思維才能讓這些 bug 顯露原形。
問(wèn)題起源與抽象
先來(lái)看一段簡(jiǎn)單的代碼,定義一個(gè)核心線程數(shù)5、有界隊(duì)列5的線程池,然后創(chuàng)建10個(gè)任務(wù)丟進(jìn)去執(zhí)行2次。
按照以前對(duì)線程池執(zhí)行邏輯的理解,創(chuàng)建的10個(gè)線程,會(huì)先交給核心線程去執(zhí)行,5個(gè)核心線程滿了之后,存放到隊(duì)列中,剛好存儲(chǔ)剩下的5個(gè),按理說(shuō)10個(gè)任務(wù)都會(huì)正常執(zhí)行完畢。本次只測(cè)試固定大小的線程池。
public class InvokeAllTest {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,60 * 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),new MyThreadFactory());public static void main(String[] args) {List<Callable<Void>> tasks = new ArrayList<>();for (int i = 0; i < 10; i++) {tasks.add(new InvokeAllThread());}System.out.println("第一次任務(wù)執(zhí)行前的executor: " + executor);try {executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一次任務(wù)執(zhí)行完畢后的executor: " + executor);System.out.println("==============第一次任務(wù)執(zhí)行完畢,開始第二次任務(wù)============");try {Thread.sleep(1000);executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次任務(wù)執(zhí)行完畢后的executor:" + executor);}// 任務(wù)執(zhí)行線程。通過(guò)打印線程名稱,觀察提交的任務(wù)被哪個(gè)線程執(zhí)行static class InvokeAllThread implements Callable<Void> {@Overridepublic Void call() throws Exception {System.out.println(Thread.currentThread().getName());return null;}}// 給工作線程自定義名字,方便觀察提交的任務(wù)被哪個(gè)線程執(zhí)行static class MyThreadFactory implements ThreadFactory {private AtomicInteger threadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, String.valueOf(threadNum.getAndIncrement()));if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}運(yùn)行程序后發(fā)現(xiàn),第一次調(diào)用 invokeAll 正常執(zhí)行,第二次調(diào)用報(bào)錯(cuò)。多次執(zhí)行結(jié)果相同。
第一次任務(wù)執(zhí)行前的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 1 2 3 4 4 5 3 2 3 3 第一次任務(wù)執(zhí)行完畢后的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10] ==============第一次任務(wù)執(zhí)行完畢,開始第二次任務(wù)============ 2 4 5 2 1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 13]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:238)at com.aaron.hp.thread.pool.InvokeAllTest.main(InvokeAllTest.java:36)問(wèn)題排查與猜測(cè)
既然程序出現(xiàn)異常,就該調(diào)用 debug 模式進(jìn)行排查,并遵循"大膽猜測(cè),小心求證"的態(tài)度,去解決這個(gè)問(wèn)題。
猜測(cè)一:invokeAll 在異步執(zhí)行后會(huì)不會(huì)同步等待線程執(zhí)行完畢獲取最終結(jié)果
由于 invokeAll 封裝的太好,之前只知道最后會(huì)同步等待才能獲取返回值。那么現(xiàn)在就需要去證實(shí)這個(gè)概念。
進(jìn)入 invokeAll 方法后,發(fā)現(xiàn)調(diào)用了f.get(),那么毫無(wú)疑問(wèn),這個(gè)猜測(cè)可以排除掉了。
其實(shí)從執(zhí)行過(guò)程的輸出內(nèi)容也可以看出,兩次調(diào)用 invokeAll 的執(zhí)行順序和界限(打印語(yǔ)句) 非常明顯。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 任務(wù)被添加后的具體執(zhí)行execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 此處同步等待f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);} }猜測(cè)二:隊(duì)列里面可能存在第一次調(diào)用 invokeAll 執(zhí)行了但沒(méi)有刪掉的任務(wù),所以才會(huì)導(dǎo)致第二次放入隊(duì)列失敗
由于未閱讀源碼,猜測(cè)只有當(dāng)創(chuàng)建的任務(wù)執(zhí)行完畢并且銷毀之后,才會(huì)從隊(duì)列中真正移除。
那么就需要查看入隊(duì)列和出隊(duì)列的時(shí)機(jī)。查看 invokeAll 方法中的 execute(f) 方法。
查看 ThreadPoolExecutor 類下的 execute 方法源碼:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 判斷工作線程數(shù)是否小于核心線程數(shù),如果是則創(chuàng)建 Worker 工作線并返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 判斷主線程是否在運(yùn)行,并判斷是否入隊(duì)列成功if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 否則重新創(chuàng)建 Worker 線程,創(chuàng)建失敗則拋出拒絕策略else if (!addWorker(command, false))reject(command); }此時(shí)就會(huì)發(fā)現(xiàn)入隊(duì)列的操作在workQueue.offer(command)處完成,而我們提交的任務(wù)是由一個(gè)叫 Worker 類的實(shí)例來(lái)執(zhí)行,addWorker(command, true)創(chuàng)建 Worker 實(shí)例。
那么我們就分別進(jìn)去這兩個(gè)方法來(lái)看下源碼:
矮油黑人問(wèn)號(hào)臉。。沒(méi)想到這個(gè) ThreadPoolExecutor 類的 addWorker 這么長(zhǎng),給核心代碼寫個(gè)注釋重點(diǎn)關(guān)注,掃一眼然后去看 offer 方法(英文注釋是源碼中自帶的)。前面都是校驗(yàn),創(chuàng)建核心線程處為new Worker(firstTask):
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 上面一堆都是校驗(yàn),此處才是 Worker 被創(chuàng)建的地方,注意被傳入的 firstTaskw = new Worker(firstTask);// 此處發(fā)現(xiàn) Worker 里面居然還有個(gè) therad 線程,不過(guò)想想也是,沒(méi)有線程怎么異步執(zhí)行呢。點(diǎn)進(jìn) Worker 的構(gòu)造方法看一眼就會(huì)發(fā)現(xiàn),這個(gè)線程就是由我們自定義的 threadFactory 來(lái)創(chuàng)建的,所以核心線程名稱就是我們之前設(shè)定好的名字。this.thread = getThreadFactory().newThread(this);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 實(shí)例成功創(chuàng)建后,讓它啟動(dòng)起來(lái)t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }接著是 ArrayBlockingQueue 類的 offer 方法,在 enqueue(e)處進(jìn)入隊(duì)列:
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {// 進(jìn)入隊(duì)列enqueue(e);return true;}} finally {lock.unlock();} }此時(shí)我們先來(lái)調(diào)試一波,看看入隊(duì)列時(shí)這些方法的執(zhí)行情況,在三個(gè) if 處分別設(shè)置斷點(diǎn),在 addWorker 和 offer 方法靠前的未知打斷點(diǎn),確定是否會(huì)進(jìn)入。
第一次調(diào)用 invokeAll:addWorker 進(jìn)入5次,offer 方法進(jìn)入5次。
第二次調(diào)用 invokeAll:addWorker 進(jìn)入0次,offer 方法進(jìn)入10次(可能是5-10次)。
那么發(fā)現(xiàn)了新的問(wèn)題:程序居然沒(méi)報(bào)錯(cuò)!正常執(zhí)行完成!這不科學(xué)!
帶著疑惑,重新 debug,居然還沒(méi)報(bào)錯(cuò)!難道之前的異常是偶然嗎?
以最快速度連按 F9 debug了幾次,有時(shí)候報(bào)錯(cuò)。。
重新運(yùn)行 run 了幾次,次次報(bào)錯(cuò)。。
懷疑人生了。。
此時(shí)墨菲定律在我頭腦中回響,“偶然事件存在必然的因素”。那么大膽猜測(cè),這個(gè)原因極有可能是隊(duì)列消費(fèi)速度較慢導(dǎo)致的,去查看消費(fèi)部分的源碼。由于 worker 也是一個(gè)線程,那么肯定有類似的 run 方法:
查看 ThreadPoolExecutor 類 的 Worker 這個(gè)內(nèi)部類,找到 run() 方法:
public void run() {runWorker(this); }而 run 方法調(diào)用的是 ThreadPoolExecutor 類里的 runWorker(this):
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 此處注意,將 worker 里存入的 firstTask 取出來(lái),交給下面的 while 去執(zhí)行Runnable task = w.firstTask;// 將 worker 里的 firstTask 屬性置空w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 不為空,即取出的 firstTask 不為空,則執(zhí)行;否則調(diào)用 getTask() 方法獲取 task 再執(zhí)行while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 此處為空實(shí)現(xiàn),可自定義beforeExecute(wt, task);Throwable thrown = null;try {// 調(diào)用 task 的 run 方法執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);} }查看 ThreadPoolExecutor 類下的 getTask() 方法:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 此處為出隊(duì)列操作,poll 和 take 的區(qū)別在于,poll 會(huì)等待指定時(shí)間,而 take 是阻塞的,會(huì)一直等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }看到這里,猜測(cè)二也就不攻自破,出隊(duì)列后任務(wù)才會(huì)被執(zhí)行,所以某個(gè)任務(wù)出隊(duì)列后,執(zhí)行成功與否與隊(duì)列再無(wú)瓜葛。(注意這個(gè)說(shuō)法只針對(duì)默認(rèn)代碼,如果自定義了拒絕策略是可以將被 interrupt 的線程重新塞回隊(duì)列里的)
兩次猜測(cè)失敗后的總結(jié)
隊(duì)列是異步消費(fèi)的,但入隊(duì)是同步進(jìn)行的,如果隊(duì)列的容量不足以承載要存入隊(duì)列的任務(wù)數(shù),就會(huì)被拒絕。(雖然是 ArrayBlockQueue 的特性,但這是通過(guò) debug 以及 run 后觀察到的)
第一次 addWorker 方法執(zhí)行了5次,offer 執(zhí)行了5次;第二次則是 0 次,10 次。剛才忽略了這個(gè)細(xì)節(jié),那么需要重新找到相應(yīng)的源碼閱讀。
任務(wù)從隊(duì)列中移除與任務(wù)是否執(zhí)行完畢無(wú)關(guān),先移除,后執(zhí)行。
我們創(chuàng)建的任務(wù),是由 worker 核心線程去調(diào)用任務(wù)的 run 方法來(lái)同步執(zhí)行的,而不是調(diào)用任務(wù)實(shí)例的 start 去異步執(zhí)行,這也就是為什么 invokeAll 可以獲取到返回值的原因所在。
**備注:**這里有點(diǎn)繞,任務(wù)實(shí)例指的是我們最開始在 for 循環(huán)中創(chuàng)建的10個(gè)tasks new InvokeAllThread(),為什么繼承了 Callable 明明改寫的是 call()方法,但卻有 run()方法可以被調(diào)用呢?這是因?yàn)樵?invokeAll()方法執(zhí)行execute()方法前,通過(guò)RunnableFuture<T> f = newTaskFor(t);進(jìn)行了包裝。
復(fù)查源碼,真相大白
查看 ThreadPoolExecutor 類下的 execute() 方法,創(chuàng)建 worker 前的判斷如下:
if (workerCountOf(c) < corePoolSize) { ...}第一次調(diào)用 invokeAll 時(shí),線程池中的核心線程 worker 數(shù)為0,小于 corePoolSize,所以前5次會(huì)創(chuàng)建 worker 核心線程并返回,此時(shí)隨著 worker 的創(chuàng)建,我們創(chuàng)建的10個(gè)任務(wù)中的5個(gè)也會(huì)隨著 worker 的創(chuàng)建作為 firstTask 屬性被傳進(jìn)去。后5個(gè)任務(wù)則被放入 queue 中。
第二次調(diào)用 invokeAll 時(shí),線程池中的核心數(shù)已經(jīng)是5,所以10個(gè)任務(wù)都會(huì)被放入 queue 中異步消費(fèi),但是我們的 queue 的容量為5。如果消費(fèi)速度快于入隊(duì)速度(debug),那么10個(gè)任務(wù)會(huì)正常執(zhí)行。但是入隊(duì)速度太快的話(run),前5個(gè)肯定可以入隊(duì),后面的5個(gè)幾乎都會(huì)被拒絕。
問(wèn)題解決方案
參考
ThreadPoolExecutor源碼分析及阻塞提交任務(wù)方法
Thread的中斷機(jī)制(interrupt)
總結(jié)
以上是生活随笔為你收集整理的线程池invokeAll方法详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 网络工程师知识点整理—第五章:无线通信网
- 下一篇: 【《Multimodal Transfo