ThreadPoolExecutor(三)——Worker
1.Worker
Worker是ThreadPoolExecutor的一個(gè)內(nèi)部類,實(shí)現(xiàn)了AbstractQueuedSynchronizer抽象類。
/*** Class Worker mainly maintains interrupt control state for* threads running tasks, along with other minor bookkeeping.* This class opportunistically extends AbstractQueuedSynchronizer* to simplify acquiring and releasing a lock surrounding each* task execution. This protects against interrupts that are* intended to wake up a worker thread waiting for a task from* instead interrupting a task being run. We implement a simple* non-reentrant mutual exclusion lock rather than use ReentrantLock* because we do not want worker tasks to be able to reacquire the* lock when they invoke pool control methods like setCorePoolSize.*/看注釋,該類主要是控制在線程執(zhí)行任務(wù)時(shí)的interrupt操作的。它繼承了AbstractQueuedSynchronizer類,實(shí)現(xiàn)了一個(gè)非重入的鎖。該鎖會(huì)保護(hù)一個(gè)正在等待任務(wù)被執(zhí)行的Worker不被interrupt操作打斷。
為什么不用ReentrantLock,要用非重入的鎖?因?yàn)樽髡卟幌胱屵@個(gè)Worker task在setCorePoolSize這種線程池控制方法調(diào)用時(shí)能重新獲取到鎖。
2.構(gòu)造方法
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}用自己作為task構(gòu)造一個(gè)線程,同時(shí)把外層任務(wù)賦值給自己的task成員變量,相當(dāng)于對(duì)task做了一個(gè)包裝。
3.run方法
run方法調(diào)用了ThreadPoolExecutor的runWorker方法,
4.runWorker方法
/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and* clearInterruptsForTaskRun called to ensure that unless pool is* stopping, this thread does not have its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to* afterExecute. We separately handle RuntimeException, Error* (both of which the specs guarantee that we trap) and arbitrary* Throwables. Because we cannot rethrow Throwables within* Runnable.run, we wrap them within Errors on the way out (to the* thread's UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Runnable task = w.firstTask;w.firstTask = null;boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();clearInterruptsForTaskRun();try {beforeExecute(w.thread, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}這是執(zhí)行task的主流程。
上面說過,addWorker會(huì)用當(dāng)前task創(chuàng)建一個(gè)Worker對(duì)象,相當(dāng)于對(duì)task的包裝,然后用Worker對(duì)象作為task創(chuàng)建一個(gè)Thread,該Thread保存在Worker的thread成員變量中。在addWorker中啟動(dòng)了這個(gè)線程,線程中執(zhí)行runWorker方法。
先看注釋:
1.首先取傳入的task執(zhí)行,如果task是null,只要該線程池處于運(yùn)行狀態(tài),就會(huì)通過getTask方法從workQueue中取任務(wù)。ThreadPoolExecutor的execute方法會(huì)在無法產(chǎn)生core線程的時(shí)候向workQueue隊(duì)列中offer任務(wù)。
getTask方法從隊(duì)列中取task的時(shí)候會(huì)根據(jù)相關(guān)配置決定是否阻塞和阻塞多久。如果getTask方法結(jié)束,返回的是null,runWorker循環(huán)結(jié)束,執(zhí)行processWorkerExit方法。
至此,該線程結(jié)束自己的使命,從線程池中“消失”。
2.在開始執(zhí)行任務(wù)之前,會(huì)調(diào)用Worker的lock方法,目的是阻止task正在被執(zhí)行的時(shí)候被interrupt,通過調(diào)用clearInterruptsForTaskRun方法來保證的(后面可以看一下這個(gè)方法),該線程沒有自己的interrupt set了。
3.beforeExecute和afterExecute方法用于在執(zhí)行任務(wù)前后執(zhí)行一些自定義的操作,這兩個(gè)方法是空的,留給繼承類去填充功能。
我們可以在beforeExecute方法中拋出異常,這樣task不會(huì)被執(zhí)行,而且在跳出該循環(huán)的時(shí)候completedAbruptly的值是true,表示the worker died due to user exception,會(huì)用decrementWorkerCount調(diào)整wc。
4.因?yàn)镽unnable的run方法不能拋出Throwables異常,所以這里重新包裝異常然后拋出,拋出的異常會(huì)使當(dāng)當(dāng)前線程死掉,可以在afterExecute中對(duì)異常做一些處理。
5.afterExecute方法也可能拋出異常,也可能使當(dāng)前線程死掉。
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor(三)——Worker的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式(二)简单工厂模式
- 下一篇: 2021年缆索式起重机司机考试内容及缆索