JAVA线程池(ThreadPoolExecutor)源码分析
生活随笔
收集整理的這篇文章主要介紹了
JAVA线程池(ThreadPoolExecutor)源码分析
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
JAVA5提供了多種類型的線程池,如果你對(duì)這些線程池的特點(diǎn)以及類型不太熟悉或者非常熟悉,請(qǐng)幫忙看看這篇文章(順便幫忙解決里面存在的問(wèn)題,謝謝!):
????http://xtu-xiaoxin.iteye.com/admin/blogs/647580
????
??? 如果對(duì)ThreadPoolExecutor還不是很熟悉,可以看看一篇對(duì)ThreadPoolExecutor的介紹的博文:
http://blog.csdn.net/waterbig/archive/2009/11/10/4794244.aspx
??? 首先,JAVA 中使用ThreadPoolExecutor的常用方式:
??? 實(shí)例代碼1?
Java代碼 ? Runnable?runnable?=?new?CountService(intArr);?? ???????ThreadPoolExecutor?execute?=?(ThreadPoolExecutor)Executors.newFixedThreadPool(10);?? ???????//或者使用:ThreadPoolExecutor?execute?=?(ThreadPoolExecutor)Executors.newCachedThreadPool();?? ???????execute.submit(runnable);??
???? 在分析ThreadPoolExecutor源碼前,先了解下面兩個(gè)概念:
???? 1.核心線程(任務(wù)):我們定義的線程,即實(shí)現(xiàn)了Runnable接口的類,是我們將要放到線程池中執(zhí)行的類,如實(shí)例代碼中的CountService類
???? 2.工作線程:由線程池中創(chuàng)建的線程,是用來(lái)獲得核心線程并執(zhí)行核心線程的線程(比較拗口哦,具體看代碼就知道是什么東東了)。
??? Executors是一個(gè)線程池工廠,各種類型的線程池都是通過(guò)它來(lái)創(chuàng)建的,注意把它和Executor分開,感覺(jué)這個(gè)線程池工廠命名有點(diǎn)問(wèn)題。
??? 我們主要分析下我們提交任務(wù)的處理邏輯,即’execute.submit(runnable)’的實(shí)現(xiàn)。
Submit()方法是在ThreadPoolExecutor繼承的抽象類AbstractExecutorService中實(shí)現(xiàn)的,具體代碼如下:
?? Java代碼 ? public?Future<?>?submit(Runnable?task)?{?? ????????if?(task?==?null)?throw?new?NullPointerException();?? ???????//對(duì)核心線程的一個(gè)包裝,RunnableFuture還是一個(gè)Runnable?? ????????RunnableFuture<Object>?ftask?=?newTaskFor(task,?null);?? ???????//核心線程執(zhí)行邏輯?? ????????execute(ftask);?? ????????return?ftask;?? ????}??
???? 從代碼中可以看出,線程的執(zhí)行邏輯通過(guò)execute()完成,而execute是在AbstractExecutorService的子類ThreadPoolExecutor中實(shí)現(xiàn)的。看,一個(gè)典型的模板模式!廢話少說(shuō),下面看ThreadPoolExecutor中execute()方法中代碼:
???
??? Java代碼 ? public?void?execute(Runnable?command)?{?? ????????if?(command?==?null)?? ????????????throw?new?NullPointerException();?? ????????/*? ?????????*?command線程運(yùn)行的整個(gè)邏輯在?addIfUnderCorePoolSize(command)方法中實(shí)現(xiàn)? ?????????*?一般適用于FixedThreadPool? ?????????*/?? ????????if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))?{?? ????????????/*? ?????????????*?poolSize?>=?corePoolSize條件成立情景:當(dāng)創(chuàng)建的為CacheThreadPool時(shí),條件? ?????????????*?就能成立? ?????????????*/?? ????????????if?(runState?==?RUNNING?&&?workQueue.offer(command))?{?? ????????????????if?(runState?!=?RUNNING?||?poolSize?==?0)?? ????????????????????//兩種情況下執(zhí)行該方法:1.線程池shutdown??2.CacheThreadPool中第一個(gè)核心線程的執(zhí)行?? ????????????????????ensureQueuedTaskHandled(command);?? ????????????}?? ????????????//CacheThreadPool中線程的執(zhí)行邏輯?? ????????????else?if?(!addIfUnderMaximumPoolSize(command))?? ????????????????reject(command);?//?is?shutdown?or?saturated?? ????????}?? ????}??
???? 注意:CachedThreadPool和FixedThreadPool的邏輯實(shí)現(xiàn)都是在ThreadPoolExecutor中實(shí)現(xiàn)的。它兩的主要區(qū)別就是屬性corePoolSize以及workQueue的初始值的不同。具體可自己查看工程類Executors的newFixedThreadPool()和newCachedThreadPool方法。由于這些初始值的不同,所以實(shí)現(xiàn)的邏輯也不同,具體的我在代碼中已經(jīng)注釋了。
??? command線程運(yùn)行的整個(gè)邏輯在 addIfUnderCorePoolSize(command)方法中實(shí)現(xiàn)的,
詳細(xì)請(qǐng)看addIfUnderCorePoolSize(command)源碼:
? Java代碼 ? private?boolean?addIfUnderCorePoolSize(Runnable?firstTask)?{?? ???????Thread?t?=?null;?? ???????final?ReentrantLock?mainLock?=?this.mainLock;?? ???????mainLock.lock();?? ???????try?{?? ????????//poolSize?<?corePoolSize?即當(dāng)前工作線程的數(shù)量一定要小于你設(shè)置的線程最大數(shù)量?? ????????//CachedThreadPool永遠(yuǎn)也不會(huì)進(jìn)入該方法,因?yàn)樗腸orePoolSize初始為0?? ???????????if?(poolSize?<?corePoolSize?&&?runState?==?RUNNING)?? ???????????????t?=?addThread(firstTask);?? ???????}?finally?{?? ???????????mainLock.unlock();?? ???????}?? ???????if?(t?==?null)?? ???????????return?false;?? ???????t.start();???//線程執(zhí)行了?? ???????return?true;?? ???}??
???? 看’t.start()’,這表示工作線程啟動(dòng)了,工作線程t啟動(dòng)的前提條件是’t = addThread(firstTask); ‘返回值t必須不為null。好了,現(xiàn)在想看看java線程池中工作線程是怎么樣的嗎?請(qǐng)看addThread方法:
??? Java代碼 ? private?Thread?addThread(Runnable?firstTask)?{?? ????//Worker就是典型的工作線程,所以的核心線程都在工作線程中執(zhí)行?? ???????Worker?w?=?new?Worker(firstTask);?? ???????//采用默認(rèn)的線程工廠生產(chǎn)出一線程。注意就是設(shè)置一些線程的默認(rèn)屬性,如優(yōu)先級(jí)、是否為后臺(tái)線程等?? ???????Thread?t?=?threadFactory.newThread(w);??? ???????if?(t?!=?null)?{?? ???????????w.thread?=?t;?? ???????????workers.add(w);?? ?????????//沒(méi)生成一個(gè)工作線程?poolSize加1,但poolSize等于最大線程數(shù)corePoolSize時(shí),則不能再生成工作線程?? ???????????int?nt?=?++poolSize;???? ???????????if?(nt?>?largestPoolSize)?? ???????????????largestPoolSize?=?nt;?? ???????}?? ???????return?t;?? ???}??
??? 看見沒(méi),Worker就是工作線程類,它是ThreadPoolExecutor中的一個(gè)內(nèi)部類。下面,我們主要分析Worker類,如了解了Worker類,那基本就了解了java線程池的整個(gè)原理了。不用怕,Worker類的邏輯很簡(jiǎn)單,它其實(shí)就是一個(gè)線程,實(shí)現(xiàn)了Runnable接口的,所以,我們先從run方法入手,run方法源碼如下:
? Java代碼 ? public?void?run()?{?? ????????????try?{?? ????????????????Runnable?task?=?firstTask;?? ????????????????firstTask?=?null;?? ????????????????/**? ?????????????????*?注意這段while循環(huán)的執(zhí)行邏輯,沒(méi)執(zhí)行完一個(gè)核心線程后,就會(huì)去線程池? ?????????????????*?隊(duì)列中取下一個(gè)核心線程,如取出的核心線程為null,則當(dāng)前工作線程終止? ?????????????????*/?? ????????????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{?? ????????????????????runTask(task);??//你所提交的核心線程(任務(wù))的運(yùn)行邏輯?? ????????????????????task?=?null;?? ????????????????}?? ????????????}?finally?{?? ????????????????workerDone(this);?//?當(dāng)前工作線程退出?? ????????????}?? ????????}?? ????}??
???? 從源碼中可看出,我們所提交的核心線程(任務(wù))的邏輯是在Worker中的runTask()方法中實(shí)現(xiàn)的。這個(gè)方法很簡(jiǎn)單,自己可以打開看看。這里要注意一點(diǎn),在runTask()方法中執(zhí)行核心線程時(shí)是調(diào)用核心線程的run()方法,這是一個(gè)尋常方法的調(diào)用,千萬(wàn)別與線程的啟動(dòng)(start())混合了。這里還有一個(gè)比較重要的方法,那就是上述代碼中while循環(huán)中的getTask()方法,它是一個(gè)從池隊(duì)列中取的核心線程(任務(wù))的方法。具體代碼如下:
??? Java代碼 ? Runnable?getTask()?{?? ????????for?(;;)?{?? ????????????try?{?? ????????????????int?state?=?runState;?? ????????????????if?(state?>?SHUTDOWN)???? ????????????????????return?null;?? ????????????????Runnable?r;?? ????????????????if?(state?==?SHUTDOWN)??//幫助清空隊(duì)列?? ????????????????????r?=?workQueue.poll();?? ???????????????/*? ????????????????*?對(duì)于條件1,如果可以超時(shí),則在等待keepAliveTime時(shí)間后,則返回一null對(duì)象,這時(shí)就? ????????????????*??銷毀該工作線程,這就是CachedThreadPool為什么能回收空閑線程的原因了。? ????????????????*?注意以下幾點(diǎn):1.這種功能情況一般不可能在fixedThreadPool中出現(xiàn)? ????????????????*????????????2.在使用CachedThreadPool時(shí),條件1一般總是成立,因?yàn)镃achedThreadPool的corePoolSize? ????????????????*??????????????初始為0? ????????????????*/?? ????????????????else?if?(poolSize?>?corePoolSize?||?allowCoreThreadTimeOut)??//------------------條件1?? ????????????????????r?=?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);???? ????????????????else?? ????????????????????r?=?workQueue.take();???????//如果隊(duì)列不存在任何元素?則一直等待。?FiexedThreadPool典型模式----------條件2?? ????????????????if?(r?!=?null)?? ????????????????????return?r;?? ????????????????if?(workerCanExit())?{???????//--------------------------條件3?? ????????????????????if?(runState?>=?SHUTDOWN)?//?Wake?up?others?? ????????????????????????interruptIdleWorkers();?? ????????????????????return?null;?? ????????????????}?? ????????????????//?Else?retry?? ????????????}?catch?(InterruptedException?ie)?{?? ????????????????//?On?interruption,?re-check?runState?? ????????????}?? ????????}?? ????}??
???? 從這個(gè)方法中,我們需要了解一下幾點(diǎn):
??? 1.CachedThreadPool獲得任務(wù)邏輯是條件1,條件1的處理邏輯請(qǐng)看注釋,CachedThreadPool執(zhí)行條件1的原因是:CachedThreadPool的corePoolSize時(shí)刻為0。
??? 2.FixedThreadPool執(zhí)行的邏輯為條件2,從’workQueue.take()’中我們就明白了為什么FixedThreadPool不會(huì)釋放工作線程的原因了(除非你關(guān)閉線程池)。
??? 最后,我們了解下Worker(工作線程)終止時(shí)的處理吧,這個(gè)對(duì)理解CachedThreadPool有幫助,具體代碼如下:
??? Java代碼 ? /**? ????*?工作線程退出要處理的邏輯? ????*?@param?w? ????*/?? ???void?workerDone(Worker?w)?{?? ???????final?ReentrantLock?mainLock?=?this.mainLock;?? ???????mainLock.lock();?? ???????try?{?? ???????????completedTaskCount?+=?w.completedTasks;??? ???????????workers.remove(w);??//從工作線程緩存中刪除?? ???????????if?(--poolSize?==?0)?//poolSize減一,這時(shí)其實(shí)又可以創(chuàng)建工作線程了?? ???????????????tryTerminate();?//嘗試終止?? ???????}?finally?{?? ???????????mainLock.unlock();?? ???????}?? ???}??
???? 注意workDone()方法中的tyrTerminate()方法,它是你以后理解線程池中shuDown()以及CachedThreadPool原理的關(guān)鍵,具體代碼如下: ???
??? Java代碼 ? private?void?tryTerminate()?{?? ????//終止的前提條件就是線程池里已經(jīng)沒(méi)有工作線程(Worker)了?? ???????if?(poolSize?==?0)?{?? ???????????int?state?=?runState;?? ???????????/**? ????????????*?如果當(dāng)前已經(jīng)沒(méi)有了工作線程(Worker),但是線程隊(duì)列里還有等待的線程任務(wù),則創(chuàng)建一個(gè)? ????????????*?工作線程來(lái)執(zhí)行線程隊(duì)列中等待的任務(wù)? ????????????*/?? ???????????if?(state?<?STOP?&&?!workQueue.isEmpty())?{?????? ???????????????state?=?RUNNING;?//?disable?termination?check?below?? ???????????????Thread?t?=?addThread(null);?? ???????????????if?(t?!=?null)?? ???????????????????t.start();?? ???????????}?? ???????????//設(shè)置池狀態(tài)為終止?fàn)顟B(tài)?? ???????????if?(state?==?STOP?||?state?==?SHUTDOWN)?{?? ???????????????runState?=?TERMINATED;?? ???????????????termination.signalAll();??? ???????????????terminated();??? ???????????}?? ???????}?? ???}??
???? 第一次寫這么長(zhǎng)的博文,還是躲著項(xiàng)目經(jīng)理寫的,真不容易,希望能對(duì)想了解java線程池原理的朋友們有一點(diǎn)幫助。
????http://xtu-xiaoxin.iteye.com/admin/blogs/647580
????
??? 如果對(duì)ThreadPoolExecutor還不是很熟悉,可以看看一篇對(duì)ThreadPoolExecutor的介紹的博文:
http://blog.csdn.net/waterbig/archive/2009/11/10/4794244.aspx
??? 首先,JAVA 中使用ThreadPoolExecutor的常用方式:
??? 實(shí)例代碼1?
Java代碼 ?
???? 在分析ThreadPoolExecutor源碼前,先了解下面兩個(gè)概念:
???? 1.核心線程(任務(wù)):我們定義的線程,即實(shí)現(xiàn)了Runnable接口的類,是我們將要放到線程池中執(zhí)行的類,如實(shí)例代碼中的CountService類
???? 2.工作線程:由線程池中創(chuàng)建的線程,是用來(lái)獲得核心線程并執(zhí)行核心線程的線程(比較拗口哦,具體看代碼就知道是什么東東了)。
??? Executors是一個(gè)線程池工廠,各種類型的線程池都是通過(guò)它來(lái)創(chuàng)建的,注意把它和Executor分開,感覺(jué)這個(gè)線程池工廠命名有點(diǎn)問(wèn)題。
??? 我們主要分析下我們提交任務(wù)的處理邏輯,即’execute.submit(runnable)’的實(shí)現(xiàn)。
Submit()方法是在ThreadPoolExecutor繼承的抽象類AbstractExecutorService中實(shí)現(xiàn)的,具體代碼如下:
?? Java代碼 ?
???? 從代碼中可以看出,線程的執(zhí)行邏輯通過(guò)execute()完成,而execute是在AbstractExecutorService的子類ThreadPoolExecutor中實(shí)現(xiàn)的。看,一個(gè)典型的模板模式!廢話少說(shuō),下面看ThreadPoolExecutor中execute()方法中代碼:
???
??? Java代碼 ?
???? 注意:CachedThreadPool和FixedThreadPool的邏輯實(shí)現(xiàn)都是在ThreadPoolExecutor中實(shí)現(xiàn)的。它兩的主要區(qū)別就是屬性corePoolSize以及workQueue的初始值的不同。具體可自己查看工程類Executors的newFixedThreadPool()和newCachedThreadPool方法。由于這些初始值的不同,所以實(shí)現(xiàn)的邏輯也不同,具體的我在代碼中已經(jīng)注釋了。
??? command線程運(yùn)行的整個(gè)邏輯在 addIfUnderCorePoolSize(command)方法中實(shí)現(xiàn)的,
詳細(xì)請(qǐng)看addIfUnderCorePoolSize(command)源碼:
? Java代碼 ?
???? 看’t.start()’,這表示工作線程啟動(dòng)了,工作線程t啟動(dòng)的前提條件是’t = addThread(firstTask); ‘返回值t必須不為null。好了,現(xiàn)在想看看java線程池中工作線程是怎么樣的嗎?請(qǐng)看addThread方法:
??? Java代碼 ?
??? 看見沒(méi),Worker就是工作線程類,它是ThreadPoolExecutor中的一個(gè)內(nèi)部類。下面,我們主要分析Worker類,如了解了Worker類,那基本就了解了java線程池的整個(gè)原理了。不用怕,Worker類的邏輯很簡(jiǎn)單,它其實(shí)就是一個(gè)線程,實(shí)現(xiàn)了Runnable接口的,所以,我們先從run方法入手,run方法源碼如下:
? Java代碼 ?
???? 從源碼中可看出,我們所提交的核心線程(任務(wù))的邏輯是在Worker中的runTask()方法中實(shí)現(xiàn)的。這個(gè)方法很簡(jiǎn)單,自己可以打開看看。這里要注意一點(diǎn),在runTask()方法中執(zhí)行核心線程時(shí)是調(diào)用核心線程的run()方法,這是一個(gè)尋常方法的調(diào)用,千萬(wàn)別與線程的啟動(dòng)(start())混合了。這里還有一個(gè)比較重要的方法,那就是上述代碼中while循環(huán)中的getTask()方法,它是一個(gè)從池隊(duì)列中取的核心線程(任務(wù))的方法。具體代碼如下:
??? Java代碼 ?
???? 從這個(gè)方法中,我們需要了解一下幾點(diǎn):
??? 1.CachedThreadPool獲得任務(wù)邏輯是條件1,條件1的處理邏輯請(qǐng)看注釋,CachedThreadPool執(zhí)行條件1的原因是:CachedThreadPool的corePoolSize時(shí)刻為0。
??? 2.FixedThreadPool執(zhí)行的邏輯為條件2,從’workQueue.take()’中我們就明白了為什么FixedThreadPool不會(huì)釋放工作線程的原因了(除非你關(guān)閉線程池)。
??? 最后,我們了解下Worker(工作線程)終止時(shí)的處理吧,這個(gè)對(duì)理解CachedThreadPool有幫助,具體代碼如下:
??? Java代碼 ?
???? 注意workDone()方法中的tyrTerminate()方法,它是你以后理解線程池中shuDown()以及CachedThreadPool原理的關(guān)鍵,具體代碼如下: ???
??? Java代碼 ?
???? 第一次寫這么長(zhǎng)的博文,還是躲著項(xiàng)目經(jīng)理寫的,真不容易,希望能對(duì)想了解java線程池原理的朋友們有一點(diǎn)幫助。
總結(jié)
以上是生活随笔為你收集整理的JAVA线程池(ThreadPoolExecutor)源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 《Java编程思想》学习笔记9——集合容
- 下一篇: java集合框架类源代码阅读体会