http://uule.iteye.com/blog/1123185
線程池還具有提高系統性能的優點,因為創建線程和清除線程的開銷比較大。?
有兩種不同類型的線程池:一是固定線程數量的線程池;二是可變數量的線程池。?
?
對于固定數量的線程池,可以使用Executors的靜態方法 newFixedThreadPool 來創建 ExecutorService;或者利用 newSingleThreadPool來創建。?
? ? ? 而 ExecutorService 實現了 Executor 接口,這個接口中有一個方法:Execute(Runnable command),也就是執行線程。?
? ? ? 對于固定數量的線程池而言,如果需要執行的線程數量多于構造的數量,那么只能并發構造時的數量,剩下的線程就進入線程池的等待隊列。?
? ? ? 如果不需要使用該線程池了,則使用 ExecutorService 中的 shutDown 方法,此時,該線程池就不會接受執行新的線程任務了。
?
對于可變數量的線程池,可用Executors的靜態方法 newCachedThreadPool 來創建 ExecutorService,該線程池的大小是不定的,當執行任務時,會先選取緩存中的空閑線程來執行,如果沒有空閑線程,則創建一個新的線程,而如果空閑線程的空閑狀態超過60秒,則線程池刪除該線程。
?
還有一種線程池:延遲線程池?
該線程池的創建有兩個方法: Executors.newScheduledThreadPool(int corePoolSize);
??????????????????????????????????????? Executors.newSingleScheduledExecutor();?
創建之后,會獲得一個 ScheduledExecutorService。?
該對象的一個重要的方法就是: schedule(Runnable command, long delay, TimeUnit unit)
?該方法返回了一個 ScheduledFuture。
?
?
?
JDK1.5中加入了許多對并發特性的支持,例如:線程池。
一、簡介
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:
Java代碼??
ThreadPoolExecutor(int?corePoolSize,?int?maximumPoolSize,long?keepAliveTime,?TimeUnit?unit,BlockingQueue<Runnable>?workQueue,RejectedExecutionHandler?handler)?? ?
corePoolSize:??????? 線程池維護線程的最少數量 (core : 核心)
maximumPoolSize:線程池維護線程的最大數量?
keepAliveTime:???? 線程池維護線程所允許的空閑時間
unit:?????????? 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler:????? 線程池對拒絕任務的處理策略
一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。?
當一個任務通過execute(Runnable)方法欲添加到線程池時:
如果線程池中運行的線程?小于corePoolSize?,即使線程池中的線程都處于空閑狀態,也要?創建新的線程?來處理被添加的任務。
如果線程池中運行的線程大于等于corePoolSize,但是緩沖隊列 workQueue未滿?,那么任務被放入緩沖隊列?。
如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿(即無法將請求加入隊列?),并且線程池中的數量小于maximumPoolSize,建新的線程?來處理被添加的任務。
如果此時線程池中的數量大于corePoolSize,緩沖隊列workQueue滿,并且線程池中的數量等于maximumPoolSize?,那么通過?handler?所指定的策略來處理此任務。
當線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止?。這樣,線程池可以動態的調整池中的線程數。
也就是:處理任務的優先級為:
corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。?
當然,如果用的是無界的緩沖隊列,那么當線程等于corePoolSIze,小于maximumPoolSize,任務就會不停的添加到隊列中,也不會創建新線程,也不會丟棄。
Java代碼??
unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:?? NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。?? ?? workQueue常用的是:?? ???????java.util.concurrent.ArrayBlockingQueue?? ?? handler有四個選擇:?? ThreadPoolExecutor.AbortPolicy() ?//默認的處理方式 ???????? ThreadPoolExecutor.CallerRunsPolicy()?? ??????? ThreadPoolExecutor.DiscardOldestPolicy()?? ??????? ThreadPoolExecutor.DiscardPolicy() ?//推薦的處理方式 ?????? 一個例子:?
Java代碼??
package?test;?? import?java.util.concurrent.ArrayBlockingQueue;?? import?java.util.concurrent.ThreadPoolExecutor;?? import?java.util.concurrent.TimeUnit;?? ?? public?class?TestThreadPool?{?? ?? ????private?static?int?produceTaskSleepTime?=?2;?? ????private?static?int?produceTaskMaxNumber?=?10;?? ?? ????? ? ?? ????public?static?class?ThreadPoolTask?implements?Runnable{?? ?????????? ????????private?Object?threadPoolTaskData;?? ?? ????????ThreadPoolTask(Object?tasks)?{?? ????????????this.threadPoolTaskData?=?tasks;?? ????????}?? ?? ????????public?void?run()?{?? ?????????????? ????????????System.out.println("start?.."?+?threadPoolTaskData);?? ????????????try?{?? ?????????????????? ????????????????Thread.sleep(2000);?? ????????????}?catch?(Exception?e)?{?? ????????????????e.printStackTrace();?? ????????????}?? ????????????threadPoolTaskData?=?null;?? ????????}?? ?? ????????public?Object?getTask()?{?? ????????????return?this.threadPoolTaskData;?? ????????}?? ????}????? ?????? ????public?static?void?main(String[]?args)?{?? ?????????? ????????ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(2,?4,?3,?? ????????????????TimeUnit.SECONDS,?new?ArrayBlockingQueue<Runnable>(3),?? ????????????????new?ThreadPoolExecutor.DiscardOldestPolicy());?? ?? ????????for?(int?i?=?1;?i?<=?produceTaskMaxNumber;?i++)?{?? ????????????try?{?? ?????????????????? ????????????????String?task?=?"task@?"?+?i;?? ????????????????System.out.println("put?"?+?task);?? ?????????????????? ????????????????threadPool.execute(new?ThreadPoolTask(task));?? ?? ?????????????????? ????????????????Thread.sleep(produceTaskSleepTime);?? ????????????}?catch?(Exception?e)?{?? ????????????????e.printStackTrace();?? ????????????}?? ????????}?? ????}?? ?????? }?? ? 說明:
1、在這段程序中,一個任務就是一個Runnable類型的對象,也就是一個ThreadPoolTask類型的對象。
2、一般來說任務除了處理方式外,還需要處理的數據,處理的數據通過構造方法傳給任務。
3、在這段程序中,main()方法相當于一個殘忍的領導,他派發出許多任務,丟給一個叫 threadPool的任勞任怨的小組來做。
4、這個小組里面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務列表里面。
如果積壓的任務過多,多到任務列表都裝不下(超過3個)的時候,就雇傭新的隊員來幫忙。但是基于成本的考慮,不能雇傭太多的隊員,至多只能雇傭 4個。
5、如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種策略來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。
因為隊員工作是需要成本的,如果工作很閑,閑到 3 秒都沒有新的任務了,那么有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少于兩個。
本例來源:http://blog.csdn.net/senton/article/details/3528720
?
Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor并不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
線程池的類體系結構:?
ExecutorService: ??? ??? 真正的線程池接口。
ScheduledExecutorService ??? 能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。
ThreadPoolExecutor ??? ??? ExecutorService的默認實現。
ScheduledThreadPoolExecutor ??? 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現。
?
Executors 工廠方法:?
Java代碼??
newSingleThreadExecutor:?? ?????? 創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。?? ?? newFixedThreadPool:?? ?????? 創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。?? ?? newCachedThreadPool:??? ?????? 創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。?? ?? newScheduledThreadPool:?? ?????? ThreadPoolExecutor是Executors類的底層實現:
ExecutorService newFixedThreadPool(int nThreads) 固定大小線程池
Java代碼??
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{?? ????????return?new?ThreadPoolExecutor(nThreads,?nThreads,?? ??????????????????????????????????????0L,?TimeUnit.MILLISECONDS,?? ??????????????????????????????????????new?LinkedBlockingQueue<Runnable>());?? ????}?? ?corePoolSize和maximumPoolSize的大小是一樣的(實際上,如果使用無界queue的話maximumPoolSize參數是沒有意義的),BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。
一個例子:
Java代碼??
public?class?ThreadPool?{?? ?? ????private?static?void?doSomething(int?id)?{?? ????????System.out.println("start?do?"?+?id?+?"?task?…");?? ????????try?{?? ????????????Thread.sleep(1000?*?2);?? ????????}?catch?(InterruptedException?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????????System.out.println("start?do?"?+?id?+?"?finished.");?? ????}?? ?????? ????public?static?void?main(String[]?args)?{?? ????????ExecutorService?executorService?=?Executors.newFixedThreadPool(2);?? ?????????????? ?? ?????????? ????????executorService.submit(new?Runnable()?{?? ????????????public?void?run()?{?? ????????????????doSomething(1);?? ????????????}?? ????????});?? ?? ????????executorService.execute(new?Runnable()?{?? ????????????public?void?run()?{?? ????????????????doSomething(2);?? ????????????}?? ????????});?? ?? ????????executorService.shutdown();?? ????????? ????????System.out.println(">>main?thread?end.");??? ????}?? }?? ?
ExecutorService newSingleThreadExecutor() 單線程
Java代碼??
public?static?ExecutorService?newSingleThreadExecutor()?{?? ????????return?new?FinalizableDelegatedExecutorService?? ????????????(new?ThreadPoolExecutor(1,?1,?? ????????????????????????????????????0L,?TimeUnit.MILLISECONDS,?? ????????????????????????????????????new?LinkedBlockingQueue<Runnable>()));?? ????}?? ?最大和最小都為1
?
ExecutorService newCachedThreadPool() 無界線程池
Java代碼??
public?static?ExecutorService?newCachedThreadPool()?{?? ????????return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,?? ??????????????????????????????????????60L,?TimeUnit.SECONDS,?? ??????????????????????????????????????new?SynchronousQueue<Runnable>());?? ????}?? 這個實現就有意思了。首先是無界的線程池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用?SynchronousQueue
。可能對于該BlockingQueue有些陌生,簡單說:該QUEUE中,?每個插入操作必須等待另一個
線程的對應移除操作。?比如,我先添加一個元素,接下來如果繼續想嘗試添加則會阻塞,直到另一個線程取走一個元素,反之亦然。(想到什么?就是緩沖區為1的生產者消費者模式^_^)
?
以下為重要分析:
到此如果有很多疑問,那是必然了(除非你也很了解了)
?
先從BlockingQueue?<Runnable?>?workQueue這個入參開始說起。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。以下為引用:(我會稍微修改一下,并用紅色突出顯示)
??
所有?BlockingQueue?都可用于傳輸和保持提交的任務??梢允褂么岁犃信c池大小進行交互: - 如果運行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(什么意思?如果當前運行的線程小于corePoolSize?,則任務根本不會存放,添加到queue中?,而是直接?抄家伙(thread)開始運行?)
- 如果運行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊列?,而不添加新的線程?。
- 如果無法將請求加入隊列,則創建新的線程?,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
先不著急舉例子,因為首先需要知道隊列的三種類型?。
排隊有三種通用策略: 直接提交。?工作隊列的默認選項是?SynchronousQueue?,它將任務直接提交給線程而不保持它們?。在此,如果不存在可用于立即運行任務的線程?,則試圖把任務加入隊列將失敗,因此會構造一個新的線程?。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務?。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。無界隊列。?使用無界隊列(例如,不具有預定義容量的?LinkedBlockingQueue?)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize?。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。有界隊列。?當使用有限的 maximumPoolSizes 時,有界隊列(如?ArrayBlockingQueue?)有助于防止資源耗盡?,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。 ? ?
到這里,該了解的理論已經夠多了,可以調節的就是corePoolSize和maximumPoolSizes 這對參數還有就是BlockingQueue的選擇。
?
例子一:使用直接提交策略,也即SynchronousQueue。
?
首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由于該Queue本身的特性?,在某次添加元素后必須等待其他線程取走后才能繼續添加?。在這里不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。
?
我們使用一下參數構造ThreadPoolExecutor:
Java代碼?? new?ThreadPoolExecutor(???? ????????????2,?3,?30,?TimeUnit.SECONDS,????? ????????????new?<span?style="white-space:?normal;">SynchronousQueue</span><Runnable>(),????? ????????????new?RecorderThreadFactory("CookieRecorderPool"),????? ????????????new?ThreadPoolExecutor.CallerRunsPolicy());???? ? Java代碼?? new?ThreadPoolExecutor(?? ????????????2,?3,?30,?TimeUnit.SECONDS,??? ????????????new?<span?style="white-space:?normal;">SynchronousQueue</span>?? ?? ?? ?? ?? ?? <Runnable>(),??? ????????????new?RecorderThreadFactory("CookieRecorderPool"),??? ????????????new?ThreadPoolExecutor.CallerRunsPolicy());?? ?當核心線程已經有2個正在運行.
?
此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊列?,而不添加新的線程??!?所以A被添加到queue中。又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由于使用的SynchronousQueue,所以一定無法加入進去。此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程?,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕?!?#xff0c;所以必然會新建一個線程來運行這個任務。暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,后一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。 所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界隊列)。對于使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖?。
什么意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那么先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1么有被執行前,A2不可能添加入queue中
例子二:使用無界隊列策略,即LinkedBlockingQueue
這個就拿newFixedThreadPool?來說,根據前文提到的規則: ?寫道 如果運行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。 ?那么當任務繼續增加,會發生什么呢? ?寫道 ? 如果運行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。 ?OK,此時任務變加入隊列之中了,那什么時候才會添加新線程呢?
?
?寫道 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。 這里就很有意思了,可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對于無界隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的線程!?corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,如果任務內存大一些,不一會兒就爆了,呵呵。
?
可以仔細想想哈。
?
例子三:有界隊列,使用ArrayBlockingQueue。
?
這個是最為復雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。
?
舉例來說,請看如下構造方法:
?
Java代碼?? new?ThreadPoolExecutor(???? ????????????2,?4,?30,?TimeUnit.SECONDS,????? ????????????new?ArrayBlockingQueue<Runnable>(2),????? ????????????new?RecorderThreadFactory("CookieRecorderPool"),????? ????????????new?ThreadPoolExecutor.CallerRunsPolicy());??? ? Java代碼?? new?ThreadPoolExecutor(?? ????????????2,?4,?30,?TimeUnit.SECONDS,??? ????????????new?ArrayBlockingQueue<Runnable>(2),??? ????????????new?RecorderThreadFactory("CookieRecorderPool"),??? ????????????new?ThreadPoolExecutor.CallerRunsPolicy());?? 假設,所有的任務都永遠無法執行完。
?
對于首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queu中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,所以就會使用拒絕策略來處理。
?
總結:
ThreadPoolExecutor的使用還是很有技巧的。使用無界queue可能會耗盡系統資源。使用有界queue可能不能很好的滿足性能,需要調節線程數和queue大小線程數自然也有開銷,所以需要根據不同應用進行調節。 通常來說對于靜態任務可以歸為: 數量大,但是執行時間很短數量小,但是執行時間較長數量又大執行時間又長除了以上特點外,任務間還有些內在關系 看完這篇問文章后,希望能夠可以選擇合適的類型了
http://rdc.gleasy.com/java-executorservice-%E9%87%8D%E8%A6%81bug%E4%B8%A4%E4%BE%8B.html
static class Task1 implements Runnable {int id;public Task1(int id) {this.id = id;}@Overridepublic void run() {try {System.out.println(Thread.currentThread() +" task " + id + " begin");Thread.sleep(5000);} catch (InterruptedException e) {}if (id < 5) {throw new RuntimeException(Thread.currentThread() + " task " +id+ " exception");}}}static ExecutorService pool = Executors.newFixedThreadPool(5);public void test() throws Exception {}public static void main(String[] args) throws Exception{for (int i = 0; i < 10; ++i) {pool.execute(new Task1(i));}ThreadPoolExecutor threads =(ThreadPoolExecutor) pool;for (int i= 0; i < 40; i++) {System.out.println("pool size = " + threads.getPoolSize() + " Largest = " + threads.getLargestPoolSize() + " Max = " + threads.getMaximumPoolSize());Thread.sleep(1000);}pool.shutdown();
// ToyClient client = new ToyClient();
// client.test();}起始線程池里有5個線程,前五個線程都拋出異常,這五個線程因為異常而死掉,線程池會再創建五個新線程
所以建議用submit提交任務,如果異常在線程中發生,當前線程不會處理這個異常,當前線程也不會死掉
如果任務實現的是callable接口,會在future.get();結果返回時拋出異常
與50位技術專家面對面20年技術見證,附贈技術全景圖
總結
以上是生活随笔為你收集整理的线程池java.util.concurrent.ThreadPoolExecutor总结的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。