ThreadPoolExecutor源码解析(二)
1.ThreadPoolExcuter運(yùn)行實(shí)例
首先我們先看如何新建一個(gè)ThreadPoolExecutor去運(yùn)行線程。然后深入到源碼中去看ThreadPoolExecutor里面使如何運(yùn)作的。
public class Test {public static void main(String[] args){/*** 新建一個(gè)線程池* corePoolSize:2* maximumPoolSize:10* keepAliveTime:20* unit:TimeUnit.SECONDS(秒)* workQueue:new ArrayBlockingQueue(10)* threadFactory:默認(rèn)* RejectedExecutionHandler默認(rèn)*/ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,10,20, TimeUnit.SECONDS,new ArrayBlockingQueue(10));//用execute添加一個(gè)線程threadPool.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}});} }2.ThreadPoolExecute.execute方法
可以發(fā)現(xiàn),其實(shí)使用線程池就是使用這個(gè)方法,然后我們看這個(gè)方法具體的代碼。
/*** 在后面執(zhí)行給定任務(wù)。任務(wù)在一個(gè)新的線程中或一個(gè)存在的worker的線程池中執(zhí)行。* 如果一個(gè)線程不能提交到excution,可能是因?yàn)檫@個(gè)excutor已經(jīng)shundown或者因?yàn)槠淙萘恳呀?jīng)是最大,* 此時(shí)任務(wù)將會(huì)被RejectedExecutionHandler處理**/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:* 有以下3個(gè)步驟** 1.如果少于corePoolSize的線程在運(yùn)行,那么試著啟動(dòng)一個(gè)新線程,其中用給定指令作為first task。* 這會(huì)調(diào)用addWorker去原子性得檢查runState和workerCoune,因此可以防止錯(cuò)誤報(bào)警,在錯(cuò)誤報(bào)警不應(yīng)該時(shí)通過返回false來添加線程* 2.如果任務(wù)被成功排隊(duì),我們?nèi)稳粦?yīng)該第二次檢查是否添加一個(gè)新線程(因?yàn)榭赡艽嬖谠谧詈笠淮螜z查后掛掉的情況)* 或者在進(jìn)入這個(gè)方法期間線程池shutdown。所以我們?cè)俅螜z查狀態(tài),如果已關(guān)閉和有必要?jiǎng)t退出隊(duì)列,或者如果沒有的話就開始一個(gè)新的線程。* 3.如果我們無法將task入隊(duì),那么我們?cè)噲D添加新線程。如果失敗,那么知道我們shutdown或者是飽和的并拒絕task。*/int c = ctl.get();//判斷是否小于corePoolSizeif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果pool在運(yùn)行并且能提交到隊(duì)列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//這里進(jìn)行再次檢查,如果線程池沒在運(yùn)行并且成功刪除task后,使用拒絕策略拒絕該taskif (! isRunning(recheck) && remove(command))reject(command);//如果已經(jīng)將task添加到隊(duì)列中,而此時(shí)沒有worker的話,那么新建一個(gè)worker。稍后這個(gè)空閑的worker就會(huì)自動(dòng)去隊(duì)列里面取任務(wù)來執(zhí)行else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果無法提交那么按照拒絕策略拒絕taskelse if (!addWorker(command, false))reject(command);}線程池的基礎(chǔ)和Worker基本介紹在前一節(jié)已經(jīng)有說過,可以點(diǎn)這里查看。可以看到這個(gè)方法的主要流程,其實(shí)都在注釋里面說明了。可以發(fā)現(xiàn)里面主要調(diào)用了一個(gè)方法,addWorker()。?那么這個(gè)addWorker()又是什么東西呢。其實(shí)看方法名就很清楚了,就是新建一個(gè)Worker來執(zhí)行你添加進(jìn)來的task。
3.ThreadPoolExecute.addWorker()方法
/*** 檢查當(dāng)前的線程池狀態(tài)和容量,是否可以讓一個(gè)新的worker加入。如果可以,worker計(jì)數(shù)將會(huì)被調(diào)整,并且* 如果可能,一個(gè)新的woker將會(huì)被創(chuàng)建和開始,將它當(dāng)作第一個(gè)任務(wù)來運(yùn)行。當(dāng)線程池是stopped或shutdown狀態(tài)時(shí),* 將返回false。當(dāng)線程工廠創(chuàng)建失敗而返回null或者拋出exception(比如典型的OOM)時(shí),它也會(huì)返回fails。* firstTask:新線程應(yīng)該第一個(gè)運(yùn)行的任務(wù)。當(dāng)線程數(shù)少于corePoolSize時(shí)或是隊(duì)列滿時(shí),workers使用一個(gè)初始化的* first task來創(chuàng)建,用來進(jìn)行分流。初始化空閑線程通常使用prestartCoreThread。* core:為true,如果使用有界的corePoolSize,否則時(shí)maxPoolSize* @return true if successful* 添加Worker*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();//狀態(tài)值int rs = runStateOf(c);// Check if queue empty only if necessary.//關(guān)于狀態(tài)值的檢測(cè)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//關(guān)于容量的檢測(cè)if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//用到了原子CAS方法比較,使用CAS增加worker計(jì)數(shù)器成功,才能進(jìn)入下一步if (compareAndIncrementWorkerCount(c))break retry;//重新獲取ctlc = ctl.get(); // Re-read ctl//這里表示執(zhí)行到這里的時(shí)候線程池的運(yùn)行狀態(tài)改變,需要重新跳到retry處執(zhí)行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//使用firstTask初始化Worker,first可能為null,那么則表示該worker為空閑w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.//持有鎖之后再次檢查,確保一致性int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();//largestPoolSize為跟蹤的目前最大線程數(shù),因?yàn)橹耙呀?jīng)做過判斷,所以不會(huì)越界問題if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//workerAdded是在上面最后才設(shè)置的,確保這個(gè)變量能準(zhǔn)確表示是否添加worker成功if (workerAdded) {t.start();workerStarted = true;}}} finally {//再次檢查if (! workerStarted)addWorkerFailed(w);}return workerStarted;}addWorker本事只是為線程池添加一個(gè)Worker,其本身所做的事情其實(shí)很簡單,但難就難在要確保安全有效得添加一個(gè)Worker。為此addWorker()方法做了很多額外的工作。比如判斷線程池的運(yùn)行狀態(tài),當(dāng)前Worker數(shù)量是否已經(jīng)飽和等等。可以發(fā)現(xiàn)在這個(gè)方法,或者說整個(gè)ThreadPoolExecutor中,很多時(shí)候都是使用雙重檢查的方式來對(duì)線程池狀態(tài)進(jìn)行檢查。其實(shí)這都是為了效率,最簡單不過直接使用Synchronized或ReentranLock進(jìn)行同步,但這樣效率會(huì)低很多,所以在這里,只有在萬不得已的情況下,才會(huì)使用悲觀的ReentranLock。
addWorker的最后直接調(diào)用了t.start,這里的t其實(shí)就是Worker它自己。接下來再看Worker是如何運(yùn)行的。
4.ThreadPoolExecute.runWorker()方法
/*** 主要的Worker運(yùn)行的循環(huán)。重復(fù)得獲取從任務(wù)隊(duì)列中取出task并執(zhí)行它。*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();//取出firstTask,再將worker中的值-設(shè)置為nullRunnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//不斷循環(huán)取出線程運(yùn)行while (task != null || (task = getTask()) != null) {w.lock();//鎖住線程// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//如果當(dāng)前線程是stop,那么將確認(rèn)其為interruptedif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//調(diào)用鉤子beforeExecute(wt, task);Throwable thrown = null;try {//運(yùn)行task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}從源碼中可以看出,一個(gè)Worker的工作其實(shí)就是不斷使用getTask()方法從隊(duì)列中獲取新的任務(wù)來執(zhí)行。值得一提的是,初始化參數(shù)里面的時(shí)間戳參數(shù)就是在這個(gè)方法里面運(yùn)用的。在循環(huán)體中每次都使用鎖以保證當(dāng)前worker在運(yùn)行task過程中不會(huì)被中斷。同時(shí)運(yùn)行時(shí)還會(huì)去調(diào)用兩個(gè)內(nèi)置的鉤子:beforeExecute()和afterExecute(),這兩個(gè)方法默認(rèn)實(shí)現(xiàn)時(shí)空的。
同時(shí)在運(yùn)行的循環(huán)中每次都關(guān)注著ThreadPoolExecutor的運(yùn)行狀態(tài),當(dāng)線程池處于中斷狀態(tài)時(shí),循環(huán)Worker的當(dāng)前線程也會(huì)中斷。
?
總結(jié):說到這里就差不多把線程池運(yùn)行task的流程說完了,當(dāng)然其中忽略了很多的細(xì)節(jié)。但總而言之,ThreadPoolExecutor其實(shí)就是對(duì)worker進(jìn)行管理,然后使用這些worker來執(zhí)行用戶提交的task。對(duì)用戶提交的task的數(shù)量也進(jìn)行一定的控制管理,比如超過一定數(shù)量時(shí)放入一個(gè)任務(wù)隊(duì)列中等等。然后對(duì)線程池規(guī)定一些狀態(tài)量,根據(jù)這些狀態(tài)量對(duì)線程池進(jìn)行控制。
from:?https://www.cnblogs.com/listenfwind/p/9102532.html
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor源码解析(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: apache commons常用工具类
- 下一篇: 常用集合类的使用