日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

线程池invokeAll方法详解

發(fā)布時(shí)間:2023/12/15 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 线程池invokeAll方法详解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

線程池invokeAll方法詳解

      • 問(wèn)題起源與抽象
      • 問(wèn)題排查與猜測(cè)
        • 猜測(cè)一:invokeAll 在異步執(zhí)行后會(huì)不會(huì)同步等待線程執(zhí)行完畢獲取最終結(jié)果
        • 猜測(cè)二:隊(duì)列里面可能存在第一次調(diào)用 invokeAll 執(zhí)行了但沒(méi)有刪掉的任務(wù),所以才會(huì)導(dǎo)致第二次放入隊(duì)列失敗
      • 兩次猜測(cè)失敗后的總結(jié)
      • 復(fù)查源碼,真相大白
      • 問(wèn)題解決方案
      • 參考

線上真實(shí)案例,多次調(diào)用線程池 ThreadPoolExecutor 的 invokeAll() 方法進(jìn)行數(shù)據(jù)統(tǒng)計(jì)時(shí)任務(wù)被拒絕,故事從此開始。

本文重在講述問(wèn)題的產(chǎn)生、抽象、尋找解決方法的過(guò)程,并結(jié)合源碼對(duì)原因進(jìn)行抽絲剝繭般的分析。bug 千千萬(wàn)萬(wàn),唯有合理的邏輯推理思維才能讓這些 bug 顯露原形。

問(wèn)題起源與抽象

先來(lái)看一段簡(jiǎn)單的代碼,定義一個(gè)核心線程數(shù)5、有界隊(duì)列5的線程池,然后創(chuàng)建10個(gè)任務(wù)丟進(jìn)去執(zhí)行2次。

按照以前對(duì)線程池執(zhí)行邏輯的理解,創(chuàng)建的10個(gè)線程,會(huì)先交給核心線程去執(zhí)行,5個(gè)核心線程滿了之后,存放到隊(duì)列中,剛好存儲(chǔ)剩下的5個(gè),按理說(shuō)10個(gè)任務(wù)都會(huì)正常執(zhí)行完畢。本次只測(cè)試固定大小的線程池。

public class InvokeAllTest {private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,60 * 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),new MyThreadFactory());public static void main(String[] args) {List<Callable<Void>> tasks = new ArrayList<>();for (int i = 0; i < 10; i++) {tasks.add(new InvokeAllThread());}System.out.println("第一次任務(wù)執(zhí)行前的executor: " + executor);try {executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一次任務(wù)執(zhí)行完畢后的executor: " + executor);System.out.println("==============第一次任務(wù)執(zhí)行完畢,開始第二次任務(wù)============");try {Thread.sleep(1000);executor.invokeAll(tasks);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二次任務(wù)執(zhí)行完畢后的executor:" + executor);}// 任務(wù)執(zhí)行線程。通過(guò)打印線程名稱,觀察提交的任務(wù)被哪個(gè)線程執(zhí)行static class InvokeAllThread implements Callable<Void> {@Overridepublic Void call() throws Exception {System.out.println(Thread.currentThread().getName());return null;}}// 給工作線程自定義名字,方便觀察提交的任務(wù)被哪個(gè)線程執(zhí)行static class MyThreadFactory implements ThreadFactory {private AtomicInteger threadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, String.valueOf(threadNum.getAndIncrement()));if (thread.getPriority() != Thread.NORM_PRIORITY) {thread.setPriority(Thread.NORM_PRIORITY);}return thread;}}

運(yùn)行程序后發(fā)現(xiàn),第一次調(diào)用 invokeAll 正常執(zhí)行,第二次調(diào)用報(bào)錯(cuò)。多次執(zhí)行結(jié)果相同。

第一次任務(wù)執(zhí)行前的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 1 2 3 4 4 5 3 2 3 3 第一次任務(wù)執(zhí)行完畢后的executorjava.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10] ==============第一次任務(wù)執(zhí)行完畢,開始第二次任務(wù)============ 2 4 5 2 1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3a71f4dd rejected from java.util.concurrent.ThreadPoolExecutor@30f39991[Running, pool size = 5, active threads = 2, queued tasks = 0, completed tasks = 13]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:238)at com.aaron.hp.thread.pool.InvokeAllTest.main(InvokeAllTest.java:36)

問(wèn)題排查與猜測(cè)

既然程序出現(xiàn)異常,就該調(diào)用 debug 模式進(jìn)行排查,并遵循"大膽猜測(cè),小心求證"的態(tài)度,去解決這個(gè)問(wèn)題。

猜測(cè)一:invokeAll 在異步執(zhí)行后會(huì)不會(huì)同步等待線程執(zhí)行完畢獲取最終結(jié)果

由于 invokeAll 封裝的太好,之前只知道最后會(huì)同步等待才能獲取返回值。那么現(xiàn)在就需要去證實(shí)這個(gè)概念。

進(jìn)入 invokeAll 方法后,發(fā)現(xiàn)調(diào)用了f.get(),那么毫無(wú)疑問(wèn),這個(gè)猜測(cè)可以排除掉了。

其實(shí)從執(zhí)行過(guò)程的輸出內(nèi)容也可以看出,兩次調(diào)用 invokeAll 的執(zhí)行順序和界限(打印語(yǔ)句) 非常明顯。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());boolean done = false;try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);// 任務(wù)被添加后的具體執(zhí)行execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try {// 此處同步等待f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}done = true;return futures;} finally {if (!done)for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true);} }

猜測(cè)二:隊(duì)列里面可能存在第一次調(diào)用 invokeAll 執(zhí)行了但沒(méi)有刪掉的任務(wù),所以才會(huì)導(dǎo)致第二次放入隊(duì)列失敗

由于未閱讀源碼,猜測(cè)只有當(dāng)創(chuàng)建的任務(wù)執(zhí)行完畢并且銷毀之后,才會(huì)從隊(duì)列中真正移除。

那么就需要查看入隊(duì)列和出隊(duì)列的時(shí)機(jī)。查看 invokeAll 方法中的 execute(f) 方法。

查看 ThreadPoolExecutor 類下的 execute 方法源碼:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 判斷工作線程數(shù)是否小于核心線程數(shù),如果是則創(chuàng)建 Worker 工作線并返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 判斷主線程是否在運(yùn)行,并判斷是否入隊(duì)列成功if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 否則重新創(chuàng)建 Worker 線程,創(chuàng)建失敗則拋出拒絕策略else if (!addWorker(command, false))reject(command); }

此時(shí)就會(huì)發(fā)現(xiàn)入隊(duì)列的操作在workQueue.offer(command)處完成,而我們提交的任務(wù)是由一個(gè)叫 Worker 類的實(shí)例來(lái)執(zhí)行,addWorker(command, true)創(chuàng)建 Worker 實(shí)例。

那么我們就分別進(jìn)去這兩個(gè)方法來(lái)看下源碼:

矮油黑人問(wèn)號(hào)臉。。沒(méi)想到這個(gè) ThreadPoolExecutor 類的 addWorker 這么長(zhǎng),給核心代碼寫個(gè)注釋重點(diǎn)關(guān)注,掃一眼然后去看 offer 方法(英文注釋是源碼中自帶的)。前面都是校驗(yàn),創(chuàng)建核心線程處為new Worker(firstTask):

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 上面一堆都是校驗(yàn),此處才是 Worker 被創(chuàng)建的地方,注意被傳入的 firstTaskw = new Worker(firstTask);// 此處發(fā)現(xiàn) Worker 里面居然還有個(gè) therad 線程,不過(guò)想想也是,沒(méi)有線程怎么異步執(zhí)行呢。點(diǎn)進(jìn) Worker 的構(gòu)造方法看一眼就會(huì)發(fā)現(xiàn),這個(gè)線程就是由我們自定義的 threadFactory 來(lái)創(chuàng)建的,所以核心線程名稱就是我們之前設(shè)定好的名字。this.thread = getThreadFactory().newThread(this);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 實(shí)例成功創(chuàng)建后,讓它啟動(dòng)起來(lái)t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted; }

接著是 ArrayBlockingQueue 類的 offer 方法,在 enqueue(e)處進(jìn)入隊(duì)列:

public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {// 進(jìn)入隊(duì)列enqueue(e);return true;}} finally {lock.unlock();} }

此時(shí)我們先來(lái)調(diào)試一波,看看入隊(duì)列時(shí)這些方法的執(zhí)行情況,在三個(gè) if 處分別設(shè)置斷點(diǎn),在 addWorker 和 offer 方法靠前的未知打斷點(diǎn),確定是否會(huì)進(jìn)入。

第一次調(diào)用 invokeAll:addWorker 進(jìn)入5次,offer 方法進(jìn)入5次。

第二次調(diào)用 invokeAll:addWorker 進(jìn)入0次,offer 方法進(jìn)入10次(可能是5-10次)。

那么發(fā)現(xiàn)了新的問(wèn)題:程序居然沒(méi)報(bào)錯(cuò)!正常執(zhí)行完成!這不科學(xué)!

帶著疑惑,重新 debug,居然還沒(méi)報(bào)錯(cuò)!難道之前的異常是偶然嗎?

以最快速度連按 F9 debug了幾次,有時(shí)候報(bào)錯(cuò)。。

重新運(yùn)行 run 了幾次,次次報(bào)錯(cuò)。。

懷疑人生了。。

此時(shí)墨菲定律在我頭腦中回響,“偶然事件存在必然的因素”。那么大膽猜測(cè),這個(gè)原因極有可能是隊(duì)列消費(fèi)速度較慢導(dǎo)致的,去查看消費(fèi)部分的源碼。由于 worker 也是一個(gè)線程,那么肯定有類似的 run 方法:

查看 ThreadPoolExecutor 類 的 Worker 這個(gè)內(nèi)部類,找到 run() 方法:

public void run() {runWorker(this); }

而 run 方法調(diào)用的是 ThreadPoolExecutor 類里的 runWorker(this):

final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 此處注意,將 worker 里存入的 firstTask 取出來(lái),交給下面的 while 去執(zhí)行Runnable task = w.firstTask;// 將 worker 里的 firstTask 屬性置空w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 如果 task 不為空,即取出的 firstTask 不為空,則執(zhí)行;否則調(diào)用 getTask() 方法獲取 task 再執(zhí)行while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 此處為空實(shí)現(xiàn),可自定義beforeExecute(wt, task);Throwable thrown = null;try {// 調(diào)用 task 的 run 方法執(zhí)行任務(wù)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);} }

查看 ThreadPoolExecutor 類下的 getTask() 方法:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 此處為出隊(duì)列操作,poll 和 take 的區(qū)別在于,poll 會(huì)等待指定時(shí)間,而 take 是阻塞的,會(huì)一直等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }

看到這里,猜測(cè)二也就不攻自破,出隊(duì)列后任務(wù)才會(huì)被執(zhí)行,所以某個(gè)任務(wù)出隊(duì)列后,執(zhí)行成功與否與隊(duì)列再無(wú)瓜葛。(注意這個(gè)說(shuō)法只針對(duì)默認(rèn)代碼,如果自定義了拒絕策略是可以將被 interrupt 的線程重新塞回隊(duì)列里的)

兩次猜測(cè)失敗后的總結(jié)

  • 隊(duì)列是異步消費(fèi)的,但入隊(duì)是同步進(jìn)行的,如果隊(duì)列的容量不足以承載要存入隊(duì)列的任務(wù)數(shù),就會(huì)被拒絕。(雖然是 ArrayBlockQueue 的特性,但這是通過(guò) debug 以及 run 后觀察到的)

  • 第一次 addWorker 方法執(zhí)行了5次,offer 執(zhí)行了5次;第二次則是 0 次,10 次。剛才忽略了這個(gè)細(xì)節(jié),那么需要重新找到相應(yīng)的源碼閱讀。

  • 任務(wù)從隊(duì)列中移除與任務(wù)是否執(zhí)行完畢無(wú)關(guān),先移除,后執(zhí)行。

  • 我們創(chuàng)建的任務(wù),是由 worker 核心線程去調(diào)用任務(wù)的 run 方法來(lái)同步執(zhí)行的,而不是調(diào)用任務(wù)實(shí)例的 start 去異步執(zhí)行,這也就是為什么 invokeAll 可以獲取到返回值的原因所在。

    **備注:**這里有點(diǎn)繞,任務(wù)實(shí)例指的是我們最開始在 for 循環(huán)中創(chuàng)建的10個(gè)tasks new InvokeAllThread(),為什么繼承了 Callable 明明改寫的是 call()方法,但卻有 run()方法可以被調(diào)用呢?這是因?yàn)樵?invokeAll()方法執(zhí)行execute()方法前,通過(guò)RunnableFuture<T> f = newTaskFor(t);進(jìn)行了包裝。

  • 復(fù)查源碼,真相大白

    查看 ThreadPoolExecutor 類下的 execute() 方法,創(chuàng)建 worker 前的判斷如下:

    if (workerCountOf(c) < corePoolSize) { ...}

    第一次調(diào)用 invokeAll 時(shí),線程池中的核心線程 worker 數(shù)為0,小于 corePoolSize,所以前5次會(huì)創(chuàng)建 worker 核心線程并返回,此時(shí)隨著 worker 的創(chuàng)建,我們創(chuàng)建的10個(gè)任務(wù)中的5個(gè)也會(huì)隨著 worker 的創(chuàng)建作為 firstTask 屬性被傳進(jìn)去。后5個(gè)任務(wù)則被放入 queue 中。

    第二次調(diào)用 invokeAll 時(shí),線程池中的核心數(shù)已經(jīng)是5,所以10個(gè)任務(wù)都會(huì)被放入 queue 中異步消費(fèi),但是我們的 queue 的容量為5。如果消費(fèi)速度快于入隊(duì)速度(debug),那么10個(gè)任務(wù)會(huì)正常執(zhí)行。但是入隊(duì)速度太快的話(run),前5個(gè)肯定可以入隊(duì),后面的5個(gè)幾乎都會(huì)被拒絕。

    問(wèn)題解決方案

  • 對(duì)于固定大小的線程池,我們要按照實(shí)際情況設(shè)置 queue 和 worker 的數(shù)量。根據(jù)任務(wù)類型(IO/CPU)以及機(jī)器配置(CPU 核數(shù)等)設(shè)置 worker 核心線程數(shù);而根據(jù)我們的任務(wù)多少來(lái)設(shè)定 queue 的大小,而不是 queue + worker 的總數(shù)。
  • 重寫拒絕策略,將被丟棄的任務(wù)重新 put 回隊(duì)列中去,put 是阻塞的。
  • 參考

    ThreadPoolExecutor源碼分析及阻塞提交任務(wù)方法

    Thread的中斷機(jī)制(interrupt)

    總結(jié)

    以上是生活随笔為你收集整理的线程池invokeAll方法详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。