ThreadPoolExecutor(二)——execute
?1.execute方法
/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}看方法注釋:
執(zhí)行給定的任務(wù)在將來(lái)的某個(gè)時(shí)間。該任務(wù)可能會(huì)用一個(gè)新的線程來(lái)執(zhí)行,也有可能用線程池中一個(gè)已有的線程來(lái)執(zhí)行。
如果該executor已經(jīng)被shutdown了,或者因?yàn)槿萘恳褲M,任務(wù)不會(huì)被執(zhí)行,通過(guò)RejectedExecutionHandler來(lái)處理剩下流程。
將來(lái)的某個(gè)時(shí)間執(zhí)行:說(shuō)的是任務(wù)會(huì)入隊(duì)列,然后根據(jù)線程池目前的各項(xiàng)指標(biāo)狀況來(lái)決定何時(shí)執(zhí)行。
新的線程或已有線程:根據(jù)線程池的各項(xiàng)指標(biāo)狀況來(lái)決定是喚醒線程池中一個(gè)已有的阻塞線程來(lái)執(zhí)行還是new一個(gè)Thread來(lái)執(zhí)行任務(wù)。
2.execute方法的三個(gè)步驟
看方法內(nèi)部注釋:
1.如果當(dāng)前正在run的線程數(shù)小于corePoolSize,那么就調(diào)用addWorker方法來(lái)new一個(gè)線程用來(lái)執(zhí)行傳入的任務(wù)。
對(duì)應(yīng)代碼片:
int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}2.如果addWorker方法執(zhí)行失敗了,任務(wù)要入隊(duì)列,如果成功入隊(duì)列了,需要做double check來(lái)處理一些極端情況,比如線程池是否shutdown了等等。
對(duì)應(yīng)代碼片:
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}3.如果任務(wù)無(wú)法入隊(duì)列,再次嘗試addWorker,這次是用正在run的線程數(shù)和maximumPoolSize比,如果超過(guò)了maximumPoolSize則reject任務(wù),說(shuō)明線程池已經(jīng)飽和了。
對(duì)應(yīng)代碼片:
else if (!addWorker(command, false))reject(command);3.addWorker
/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked, which requires a* backout of workerCount, and a recheck for termination, in case* the existence of this worker was holding up termination.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}Worker w = new Worker(firstTask);Thread t = w.thread;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}t.start();// It is possible (but unlikely) for a thread to have been// added to workers, but not yet started, during transition to// STOP, which could result in a rare missed interrupt,// because Thread.interrupt is not guaranteed to have any effect// on a non-yet-started Thread (see Thread#interrupt).if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())t.interrupt();return true;}先看注釋:
檢查根據(jù)當(dāng)前線程池的狀態(tài)是否允許添加一個(gè)新的Worker,如果可以,調(diào)整wc(workerCount,以后都用wc表示),代碼塊:
if (compareAndIncrementWorkerCount(c))break retry;如果線程被stop了或者可以shutdown,addWorker方法返回false。
如果thread工廠創(chuàng)建線程失敗,需要a?backout of workerCount(這是個(gè)啥?!),代碼塊:
// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}3.1.addWorker局部分析(一)
// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;這個(gè)代碼塊的判斷,如果是STOP,TIDYING和TERMINATED這三種狀態(tài),都會(huì)返回false。
如果是SHUTDOWN,還要判斷firstTask和workQueue的狀況,如果firstTask不是null,返回false。
如果firstTask是null,判斷workQueue的狀況,workQueue是空的時(shí)候,返回false。
這個(gè)還要再看看,SHUTDOWN狀態(tài)下為什么要判斷firstTask和隊(duì)列,是要保證在SHUTDOWN的時(shí)候,新添加進(jìn)來(lái)和隊(duì)列中剩余的task要正常執(zhí)行完嗎?
3.2.addWorker局部分析(二)
for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}這個(gè)代碼塊比上一個(gè)代碼塊更容易理解,是正常流程,當(dāng)線程池沒(méi)有處于RUNNING之外的幾種狀態(tài)的時(shí)候。
這時(shí)候的處理流程就是線程池是否創(chuàng)建線程的正常語(yǔ)義,依次進(jìn)行下列比較:
1.和CAPACITY比較
如果當(dāng)前wc超過(guò)CAPACITY(這個(gè)基本上不可能),返回false。
2.入?yún)ore為true,表示
轉(zhuǎn)存失敗重新上傳取消addWorker的時(shí)候,wc還沒(méi)到達(dá)corePoolSize,和corePoolSize比較
如果超過(guò)corePoolSize,返回false。否則原子操作compareAndIncrementWorkerCount修改wc值。
3.入?yún)ore為false,表示addWorker的時(shí)候隊(duì)列已滿,wc和maximumPoolSize比較
如果超過(guò)maximumPoolSize,返回false,否則原子操作compareAndIncrementWorkerCount修改wc值。
3.3.addWorker局部分析(三)
final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}這個(gè)代碼塊,首先加鎖,整個(gè)類用到這個(gè)鎖的地方,除了獲取該線程池的一些關(guān)鍵參數(shù)之外,就是shutdown和terminate等相關(guān)操作。
注釋里說(shuō),Back out on ThreadFactory failure or if?shut down before lock acquired,需要再看看。
如果這個(gè)if判斷沒(méi)有走,用該task構(gòu)建的Worker就可以正常添加到workers這個(gè)HashSet中。
4.ctl
ctl是控制線程池狀態(tài)的變量,由兩部分組成,runState(高位)和workerCount(低28位),
private static int CAPACITY = (1 << COUNT_BITS) - 1;CAPACITY是1左移COUNT_BITS,然后減一。
?
COUNT_BITS是Integer的位數(shù)減去3,即29。
所以CAPACITY是100000000000000000000000000000-1=11111111111111111111111111111,表示低28位。這28位是表示運(yùn)行的worker個(gè)數(shù)的。
private static final int RUNNING = -1 << COUNT_BITS;二進(jìn)制,1111111111111111111111111111111111100000000000000000000000000000,和CAPACITY互補(bǔ),所以~CAPACITY也是這個(gè)值。
?
二進(jìn)制,100000000000000000000000000000000
?
二進(jìn)制,1000000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;二進(jìn)制,1100000000000000000000000000000000
放在一起對(duì)比看,狀態(tài)位置看標(biāo)紅三位:
1111111111111111111111111111111111100000000000000000000000000000,RUNNING
0000000000000000000000000000000000000000000000000000000000000000,SHUTDOWN
0000000000000000000000000000000100000000000000000000000000000000,STOP
0000000000000000000000000000001000000000000000000000000000000000,TIDYING
0000000000000000000000000000001100000000000000000000000000000000,TERMINATED
5.和ctl相關(guān)的幾組方法
private static int runStateOf(int c) { return c & ~CAPACITY; }把低28位都清掉了,只拿高位的運(yùn)行狀態(tài)。
private static int workerCountOf(int c) { return c & CAPACITY; }只取低28位,即拿workerCount。
?
?
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor(二)——execute的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 左神---基础提升笔记
- 下一篇: Dear小弟×××,给你们的一封信「社区