Java-Java中的线程池原理分析及使用
文章目錄
- 概述
- 線程池的優(yōu)點
- 線程池的實現(xiàn)原理
- 線程池的使用
- 創(chuàng)建線程池
- 向線程池中提交任務(wù)
- 關(guān)閉線程池
- 合理的配置線程池
- 線程池的監(jiān)控
概述
我們在上篇博文 Java-多線程框架Executor解讀 可以看到 Executors 工廠方法中的幾個靜態(tài)工廠方法中的內(nèi)部實現(xiàn)都是 ThreadPoolExecutor。
比如:
JDK中的線程池均由ThreadPoolExecutor類實現(xiàn)。
ThreadPoolExecutor 是JDK中線程池的具體實現(xiàn) , ThreadPoolExecutor 類是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務(wù)。
線程池的優(yōu)點
Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過程中,合理地使用線程池能夠帶來3個好處。
-
降低資源消耗。
通過重復(fù)利用已創(chuàng)建的線程來降低線程創(chuàng)建和銷毀的造成的消耗 -
提高響應(yīng)速度
當(dāng)任務(wù)到達時,任務(wù)可不用等待線程創(chuàng)建就能立即執(zhí)行 -
提高線程的可管理性
線程是稀缺資源,不能無限制的創(chuàng)建,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控,增強穩(wěn)定性。
線程池的實現(xiàn)原理
當(dāng)向線程池提交一個任務(wù)之后,線程池是如何處理這個任務(wù)的呢?
我們來看一下線程池的主要處理流程,處理流程圖如下:
從圖中可以看出,當(dāng)提交一個新任務(wù)到線程池時,線程池的處理流程如下。
-
1)線程池判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進入下個流程。
-
2)線程池判斷工作隊列是否已經(jīng)滿。如果工作隊列沒有滿,則將新提交的任務(wù)存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。
-
3)線程池判斷線程池的線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個任務(wù)。
ThreadPoolExecutor執(zhí)行execute()方法的示意圖 如下所示:
ThreadPoolExecutor執(zhí)行execute方法分下面4種情況。
-
1)如果當(dāng)前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟 需要獲取全局鎖)。
-
2)如果運行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
-
3)如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí) 行這一步驟需要獲取全局鎖)。
-
4)如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設(shè)計思路,是為了在執(zhí)行execute()方法時,盡可能地避免獲取全局鎖(那將會是一個嚴(yán)重的可伸縮瓶頸)。
在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。
ThreadPoolExecutor中線程執(zhí)行任務(wù)的示意圖如下
線程池中的線程執(zhí)行任務(wù)分兩種情況,如下。
-
1)在execute()方法中創(chuàng)建一個線程時,會讓這個線程執(zhí)行當(dāng)前任務(wù)。
-
2)這個線程執(zhí)行完上圖中1的任務(wù)后,會反復(fù)從BlockingQueue獲取任務(wù)來執(zhí)行。
線程池的使用
創(chuàng)建線程池
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池
其中一個構(gòu)造函數(shù)如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}可以抽象為
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);參數(shù)解讀:
-
corePoolSize (線程池的基本大小):當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程。
-
maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果。
-
keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,可以調(diào)大時間,提高線程的利用率。
-
TimeUnit (線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
-
runnableTaskQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列。可以選擇以下幾 個阻塞隊列。
·ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按FIFO(先進先出)原 則對元素進行排序。
·LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。
·PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。
- RejectedExecutionHandler (飽和策略):當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀 態(tài),那么必須采取一種策略處理提交的新任務(wù)。默認(rèn)策略為AbortPolicy,表示無法 處理新任務(wù)時拋出異常。
在JDK 1.5中Java線程池框架提供了以下4種策略。
·AbortPolicy:直接拋出異常。
·CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù)。
·DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)。
·DiscardPolicy:不處理,丟棄掉。
當(dāng)然,也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄 日志或持久化存儲不能處理的任務(wù)。
- ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設(shè)置有意義的名字,代碼如下。
向線程池中提交任務(wù)
可以使用兩個方法向線程池提交任務(wù),分別為execute()和submit()方法。
execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。
通過以下代碼可知execute()方法輸入的任務(wù)是一個Runnable類的實例。
threadsPool.execute(new Runnable() {@Overridepublic void run() {// TODO Auto-generated method stub} });submit()方法用于提交需要返回值的任務(wù)。
線程池會返回一個future類型的對象,通過這個future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
Future<Object> future = executor.submit(harReturnValuetask);try {Object s = future.get();} catch (InterruptedException e) {// 處理中斷異常} catch (ExecutionException e) {// 處理無法執(zhí)行任務(wù)異常} finally {// 關(guān)閉線程池executor.shutdown(); }關(guān)閉線程池
可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。
它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠無法終止。
但是它們存在一定的區(qū)別:
-
shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,
-
而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true。
當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。
至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。
合理的配置線程池
要想合理地配置線程池,就必須首先分析任務(wù)特性.
可以從以下幾個角度來分析。
- ·任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
- ·任務(wù)的優(yōu)先級:高、中和低。
- ·任務(wù)的執(zhí)行時間:長、中和短。
- ·任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu +1個線程的線程池。
由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*N cpu 。
混合型的任務(wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。
可以通過
Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行。
注意:如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。
建議使用有界隊列。有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點兒,比如幾千。
有一次,我們系統(tǒng)里后臺任務(wù)線程池的隊列和線程池全滿了,不斷拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞,任務(wù)積壓在線程池里。如果當(dāng)時我們設(shè)置成無界隊列,那么線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然,我們的系統(tǒng)所有的任務(wù)是用單獨的服務(wù)器部署的,我們使用不同規(guī)模的線程池完成不同類型的任務(wù),但是出現(xiàn)這樣問題時也會影響到其他任務(wù)。
線程池的監(jiān)控
如果在系統(tǒng)中大量使用線程池,則有必要對線程池進行監(jiān)控,方便在出現(xiàn)問題時,可以根據(jù)線程池的使用狀況快速定位問題。可以通過線程池提供的參數(shù)進行監(jiān)控,在監(jiān)控線程池的時候可以使用以下屬性。
-
taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
-
·completedTaskCount:線程池在運行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount。
-
·largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過。
-
·getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會自動銷 毀,所以這個大小只增不減。
-
·getActiveCount:獲取活動的線程數(shù)。
通過擴展線程池進行監(jiān)控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,這幾個方法在線程池里是空方法。
protected void beforeExecute(Thread t, Runnable r) { }protected void afterExecute(Runnable r, Throwable t) { }protected void terminated() { }也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時間、最大執(zhí)行時間和最小執(zhí)行時間等。
總結(jié)
以上是生活随笔為你收集整理的Java-Java中的线程池原理分析及使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring-AOP @AspectJ切
- 下一篇: Java-Java I/O流解读之基于字