Java 线程池详解
構(gòu)造一個(gè)線程池為什么需要幾個(gè)參數(shù)?如果避免線程池出現(xiàn)OOM?Runnable和Callable的區(qū)別是什么?本文將對(duì)這些問題一一解答,同時(shí)還將給出使用線程池的常見場(chǎng)景和代碼片段。
基礎(chǔ)知識(shí)
Executors創(chuàng)建線程池
Java中創(chuàng)建線程池很簡(jiǎn)單,只需要調(diào)用Executors中相應(yīng)的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不僅隱藏了復(fù)雜性,也為我們埋下了潛在的隱患(OOM,線程耗盡)。
Executors創(chuàng)建線程池便捷方法列表:
小程序使用這些快捷方法沒什么問題,對(duì)于服務(wù)端需要長(zhǎng)期運(yùn)行的程序,創(chuàng)建線程池應(yīng)該直接使用ThreadPoolExecutor的構(gòu)造方法。沒錯(cuò),上述Executors方法創(chuàng)建的線程池就是ThreadPoolExecutor。
ThreadPoolExecutor構(gòu)造方法
Executors中創(chuàng)建線程池的快捷方法,實(shí)際上是調(diào)用了ThreadPoolExecutor的構(gòu)造方法(定時(shí)任務(wù)使用的是ScheduledThreadPoolExecutor),該類構(gòu)造方法參數(shù)列表如下:
//?Java線程池的完整構(gòu)造函數(shù) public?ThreadPoolExecutor(int?corePoolSize,?//?線程池長(zhǎng)期維持的線程數(shù),即使線程處于Idle狀態(tài),也不會(huì)回收。int?maximumPoolSize,?//?線程數(shù)的上限long?keepAliveTime,?TimeUnit?unit,?//?超過corePoolSize的線程的idle時(shí)長(zhǎng),//?超過這個(gè)時(shí)間,多余的線程會(huì)被回收。BlockingQueue<Runnable>?workQueue,?//?任務(wù)的排隊(duì)隊(duì)列ThreadFactory?threadFactory,?//?新線程的產(chǎn)生方式RejectedExecutionHandler?handler)?//?拒絕策略竟然有7個(gè)參數(shù),很無奈,構(gòu)造一個(gè)線程池確實(shí)需要這么多參數(shù)。這些參數(shù)中,比較容易引起問題的有corePoolSize, maximumPoolSize, workQueue以及handler:
corePoolSize和maximumPoolSize設(shè)置不當(dāng)會(huì)影響效率,甚至耗盡線程;
workQueue設(shè)置不當(dāng)容易導(dǎo)致OOM;
handler設(shè)置不當(dāng)會(huì)導(dǎo)致提交任務(wù)時(shí)拋出異常。
正確的參數(shù)設(shè)置方式會(huì)在下文給出。
線程池的工作順序
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
corePoolSize -> 任務(wù)隊(duì)列 -> maximumPoolSize -> 拒絕策略
Runnable和Callable
可以向線程池提交的任務(wù)有兩種:Runnable和Callable,二者的區(qū)別如下:
方法簽名不同,void Runnable.run(), V Callable.call() throws Exception
是否允許有返回值,Callable允許有返回值
是否允許拋出異常,Callable允許拋出異常。
Callable是JDK1.5時(shí)加入的接口,作為Runnable的一種補(bǔ)充,允許有返回值,允許拋出異常。
三種提交任務(wù)的方式:
如何正確使用線程池
避免使用無界隊(duì)列
不要使用Executors.newXXXThreadPool()快捷方法創(chuàng)建線程池,因?yàn)檫@種方式會(huì)使用無界的任務(wù)隊(duì)列,為避免OOM,我們應(yīng)該使用ThreadPoolExecutor的構(gòu)造方法手動(dòng)指定隊(duì)列的最大長(zhǎng)度:
ExecutorService?executorService?=?new?ThreadPoolExecutor(2,?2,0,?TimeUnit.SECONDS,new?ArrayBlockingQueue<>(512),?//?使用有界隊(duì)列,避免OOMnew?ThreadPoolExecutor.DiscardPolicy());明確拒絕任務(wù)時(shí)的行為
任務(wù)隊(duì)列總有占滿的時(shí)候,這是再submit()提交新的任務(wù)會(huì)怎么樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:
public?interface?RejectedExecutionHandler?{void?rejectedExecution(Runnable?r,?ThreadPoolExecutor?executor); }線程池給我們提供了幾種常見的拒絕策略:
線程池默認(rèn)的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關(guān)心任務(wù)被拒絕的事件,可以將拒絕策略設(shè)置成DiscardPolicy,這樣多余的任務(wù)會(huì)悄悄的被忽略。
ExecutorService?executorService?=?new?ThreadPoolExecutor(2,?2,0,?TimeUnit.SECONDS,new?ArrayBlockingQueue<>(512),new?ThreadPoolExecutor.DiscardPolicy());//?指定拒絕策略獲取處理結(jié)果和異常
線程池的處理結(jié)果、以及處理過程中的異常都被包裝到Future中,并在調(diào)用Future.get()方法時(shí)獲取,執(zhí)行過程中的異常會(huì)被包裝成ExecutionException,submit()方法本身不會(huì)傳遞結(jié)果和任務(wù)執(zhí)行過程中的異常。獲取執(zhí)行結(jié)果的代碼可以這樣寫:
ExecutorService?executorService?=?Executors.newFixedThreadPool(4); Future<Object>?future?=?executorService.submit(new?Callable<Object>()?{@Overridepublic?Object?call()?throws?Exception?{throw?new?RuntimeException("exception?in?call~");//?該異常會(huì)在調(diào)用Future.get()時(shí)傳遞給調(diào)用者}});try?{Object?result?=?future.get(); }?catch?(InterruptedException?e)?{//?interrupt }?catch?(ExecutionException?e)?{//?exception?in?Callable.call()e.printStackTrace(); }上述代碼輸出類似如下:
線程池的常用場(chǎng)景
正確構(gòu)造線程池
int?poolSize?=?Runtime.getRuntime().availableProcessors()?*?2; BlockingQueue<Runnable>?queue?=?new?ArrayBlockingQueue<>(512); RejectedExecutionHandler?policy?=?new?ThreadPoolExecutor.DiscardPolicy(); executorService?=?new?ThreadPoolExecutor(poolSize,?poolSize,0,?TimeUnit.SECONDS,queue,policy);獲取單個(gè)結(jié)果
過submit()向線程池提交任務(wù)后會(huì)返回一個(gè)Future,調(diào)用V Future.get()方法能夠阻塞等待執(zhí)行結(jié)果,V get(long timeout, TimeUnit unit)方法可以指定等待的超時(shí)時(shí)間。
獲取多個(gè)結(jié)果
如果向線程池提交了多個(gè)任務(wù),要獲取這些任務(wù)的執(zhí)行結(jié)果,可以依次調(diào)用Future.get()獲得。但對(duì)于這種場(chǎng)景,我們更應(yīng)該使用ExecutorCompletionService,該類的take()方法總是阻塞等待某一個(gè)任務(wù)完成,然后返回該任務(wù)的Future對(duì)象。向CompletionService批量提交任務(wù)后,只需調(diào)用相同次數(shù)的CompletionService.take()方法,就能獲取所有任務(wù)的執(zhí)行結(jié)果,獲取順序是任意的,取決于任務(wù)的完成順序:
void?solve(Executor?executor,?Collection<Callable<Result>>?solvers)throws?InterruptedException,?ExecutionException?{CompletionService<Result>?ecs?=?new?ExecutorCompletionService<Result>(executor);//?構(gòu)造器for?(Callable<Result>?s?:?solvers)//?提交所有任務(wù)ecs.submit(s);int?n?=?solvers.size();for?(int?i?=?0;?i?<?n;?++i)?{//?獲取每一個(gè)完成的任務(wù)Result?r?=?ecs.take().get();if?(r?!=?null)use(r);} }單個(gè)任務(wù)的超時(shí)時(shí)間
V?Future.get(long?timeout,?TimeUnit?unit)方法可以指定等待的超時(shí)時(shí)間,超時(shí)未完成會(huì)拋出TimeoutException。
多個(gè)任務(wù)的超時(shí)時(shí)間
等待多個(gè)任務(wù)完成,并設(shè)置最大等待時(shí)間,可以通過CountDownLatch完成:
public?void?testLatch(ExecutorService?executorService,?List<Runnable>?tasks)throws?InterruptedException{CountDownLatch?latch?=?new?CountDownLatch(tasks.size());for(Runnable?r?:?tasks){executorService.submit(new?Runnable()?{@Overridepublic?void?run()?{try{r.run();}finally?{latch.countDown();//?countDown}}});}latch.await(10,?TimeUnit.SECONDS);?//?指定超時(shí)時(shí)間}線程池和裝修公司
以運(yùn)營(yíng)一家裝修公司做個(gè)比喻。公司在辦公地點(diǎn)等待客戶來提交裝修請(qǐng)求;公司有固定數(shù)量的正式工以維持運(yùn)轉(zhuǎn);旺季業(yè)務(wù)較多時(shí),新來的客戶請(qǐng)求會(huì)被排期,比如接單后告訴用戶一個(gè)月后才能開始裝修;當(dāng)排期太多時(shí),為避免用戶等太久,公司會(huì)通過某些渠道(比如人才市場(chǎng)、熟人介紹等)雇傭一些臨時(shí)工(注意,招聘臨時(shí)工是在排期排滿之后);如果臨時(shí)工也忙不過來,公司將決定不再接收新的客戶,直接拒單。
線程池就是程序中的“裝修公司”,代勞各種臟活累活。上面的過程對(duì)應(yīng)到線程池上:
//?Java線程池的完整構(gòu)造函數(shù) public?ThreadPoolExecutor(int?corePoolSize,?//?正式工數(shù)量int?maximumPoolSize,?//?工人數(shù)量上限,包括正式工和臨時(shí)工long?keepAliveTime,?TimeUnit?unit,?//?臨時(shí)工游手好閑的最長(zhǎng)時(shí)間,超過這個(gè)時(shí)間將被解雇BlockingQueue<Runnable>?workQueue,?//?排期隊(duì)列ThreadFactory?threadFactory,?//?招人渠道RejectedExecutionHandler?handler)?//?拒單方式總結(jié)
Executors為我們提供了構(gòu)造線程池的便捷方法,對(duì)于服務(wù)器程序我們應(yīng)該杜絕使用這些便捷方法,而是直接使用線程池ThreadPoolExecutor的構(gòu)造方法,避免無界隊(duì)列可能導(dǎo)致的OOM以及線程個(gè)數(shù)限制不當(dāng)導(dǎo)致的線程數(shù)耗盡等問題。ExecutorCompletionService提供了等待所有任務(wù)執(zhí)行結(jié)束的有效方式,如果要設(shè)置等待的超時(shí)時(shí)間,則可以通過CountDownLatch完成。
參考
ThreadPoolExecutor API Doc
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
總結(jié)
以上是生活随笔為你收集整理的Java 线程池详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 20万用户同时访问一个热点Key,如何优
- 下一篇: 困扰我多年的Java泛型〈? exten