java并发编程--Executor框架
摘要:
? ? ? ?Eexecutor作為靈活且強(qiáng)大的異步執(zhí)行框架,其支持多種不同類型的任務(wù)執(zhí)行策略,提供了一種標(biāo)準(zhǔn)的方法將任務(wù)的提交過程和執(zhí)行過程解耦開發(fā),基于生產(chǎn)者-消費(fèi)者模式,其提交任務(wù)的線程相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)的線程相當(dāng)于消費(fèi)者,并用Runnable來表示任務(wù),Executor的實(shí)現(xiàn)還提供了對生命周期的支持,以及統(tǒng)計(jì)信息收集,應(yīng)用程序管理機(jī)制和性能監(jiān)視等機(jī)制。
?
1.Exexctor簡介
Executor的UML圖:(常用的幾個(gè)接口和子類)
Executor:一個(gè)接口,其定義了一個(gè)接收Runnable對象的方法executor,其方法簽名為executor(Runnable command),
?
ExecutorService:是一個(gè)比Executor使用更廣泛的子類接口,其提供了生命周期管理的方法,以及可跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況返回Future的方法
?
AbstractExecutorService:ExecutorService執(zhí)行方法的默認(rèn)實(shí)現(xiàn)
?
ScheduledExecutorService:一個(gè)可定時(shí)調(diào)度任務(wù)的接口
?
ScheduledThreadPoolExecutor:ScheduledExecutorService的實(shí)現(xiàn),一個(gè)可定時(shí)調(diào)度任務(wù)的線程池
?
ThreadPoolExecutor:線程池,可以通過調(diào)用Executors以下靜態(tài)工廠方法來創(chuàng)建線程池并返回一個(gè)ExecutorService對象:
?
2.ThreadPoolExecutor構(gòu)造函數(shù)的各個(gè)參數(shù)說明
ThreadPoolExecutor方法簽名:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) //后兩個(gè)參數(shù)為可選參數(shù)參數(shù)說明:
corePoolSize:核心線程數(shù),如果運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行新任務(wù),即使線程池中的其他線程是空閑的
?
maximumPoolSize:最大線程數(shù),可允許創(chuàng)建的線程數(shù),corePoolSize和maximumPoolSize設(shè)置的邊界自動(dòng)調(diào)整池大小:
corePoolSize?<運(yùn)行的線程數(shù)<?maximumPoolSize:僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程
corePoolSize=運(yùn)行的線程數(shù)=?maximumPoolSize:創(chuàng)建固定大小的線程池
?
keepAliveTime:如果線程數(shù)多于corePoolSize,則這些多余的線程的空閑時(shí)間超過keepAliveTime時(shí)將被終止
?
unit:keepAliveTime參數(shù)的時(shí)間單位
?
workQueue:保存任務(wù)的阻塞隊(duì)列,與線程池的大小有關(guān):
? 當(dāng)運(yùn)行的線程數(shù)少于corePoolSize時(shí),在有新任務(wù)時(shí)直接創(chuàng)建新線程來執(zhí)行任務(wù)而無需再進(jìn)隊(duì)列
? 當(dāng)運(yùn)行的線程數(shù)等于或多于corePoolSize,在有新任務(wù)添加時(shí)則選加入隊(duì)列,不直接創(chuàng)建線程
? 當(dāng)隊(duì)列滿時(shí),在有新任務(wù)時(shí)就創(chuàng)建新線程
?
threadFactory:使用ThreadFactory創(chuàng)建新線程,默認(rèn)使用defaultThreadFactory創(chuàng)建線程
?
handle:定義處理被拒絕任務(wù)的策略,默認(rèn)使用ThreadPoolExecutor.AbortPolicy,任務(wù)被拒絕時(shí)將拋出RejectExecutorException
?
3.Executors:提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種線程池
? ?newFixedThreadPool:創(chuàng)建可重用且固定線程數(shù)的線程池,如果線程池中的所有線程都處于活動(dòng)狀態(tài),此時(shí)再提交任務(wù)就在隊(duì)列中等待,直到有可用線程;如果線程池中的某個(gè)線程由于異常而結(jié)束時(shí),線程池就會(huì)再補(bǔ)充一條新線程。
方法簽名:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,//使用一個(gè)基于FIFO排序的阻塞隊(duì)列,在所有corePoolSize線程都忙時(shí)新任務(wù)將在隊(duì)列中等待new LinkedBlockingQueue<Runnable>()); }? ?newSingleThreadExecutor:創(chuàng)建一個(gè)單線程的Executor,如果該線程因?yàn)楫惓6Y(jié)束就新建一條線程來繼續(xù)執(zhí)行后續(xù)的任務(wù)
方法簽名:
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService//corePoolSize和maximumPoolSize都等于,表示固定線程池大小為1(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }? ?newScheduledThreadPool:創(chuàng)建一個(gè)可延遲執(zhí)行或定期執(zhí)行的線程池
方法簽名:
例1:(使用newScheduledThreadPool來模擬心跳機(jī)制)
1 public class HeartBeat {2 public static void main(String[] args) {3 ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);4 Runnable task = new Runnable() {5 public void run() {6 System.out.println("HeartBeat.........................");7 }8 };9 executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS); //5秒后第一次執(zhí)行,之后每隔3秒執(zhí)行一次 10 } 11 }輸出:
HeartBeat....................... //5秒后第一次輸出 HeartBeat....................... //每隔3秒輸出一個(gè)? ?newCachedThreadPool:創(chuàng)建可緩存的線程池,如果線程池中的線程在60秒未被使用就將被移除,在執(zhí)行新的任務(wù)時(shí),當(dāng)線程池中有之前創(chuàng)建的可用線程就重 ? ? ?用可用線程,否則就新建一條線程
方法簽名:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,//使用同步隊(duì)列,將任務(wù)直接提交給線程new SynchronousQueue<Runnable>()); }?
例2:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public?class?ThreadPoolTest { ????public?static?void?main(String[] args)?throws?InterruptedException { ?????ExecutorService threadPool = Executors.newCachedThreadPool();//線程池里面的線程數(shù)會(huì)動(dòng)態(tài)變化,并可在線程線被移除前重用 ????????for?(int?i =?1; i <=?3; i ++) { ????????????final??int?task = i;???//10個(gè)任務(wù) ????????????//TimeUnit.SECONDS.sleep(1); ????????????threadPool.execute(new?Runnable() {????//接受一個(gè)Runnable實(shí)例 ????????????????public?void?run() { ????????????????????????System.out.println("線程名字: "?+ Thread.currentThread().getName() +??"? 任務(wù)名為: "+task); ????????????????} ????????????}); ????????} ????} } |
輸出:(為每個(gè)任務(wù)新建一條線程,共創(chuàng)建了3條線程)
線程名字: pool-1-thread-1 任務(wù)名為: 1 線程名字: pool-1-thread-2 任務(wù)名為: 2 線程名字: pool-1-thread-3 任務(wù)名為: 3去掉第6行的注釋其輸出如下:(始終重復(fù)利用一條線程,因?yàn)閚ewCachedThreadPool能重用可用線程)
線程名字: pool-1-thread-1 任務(wù)名為: 1 線程名字: pool-1-thread-1 任務(wù)名為: 2 線程名字: pool-1-thread-1 任務(wù)名為: 3通過使用Executor可以很輕易的實(shí)現(xiàn)各種調(diào)優(yōu) ?管理 ?監(jiān)視 ?記錄日志和錯(cuò)誤報(bào)告等待。
?
4.Executor的生命周期
ExecutorService提供了管理Eecutor生命周期的方法,ExecutorService的生命周期包括了:運(yùn)行 ?關(guān)閉和終止三種狀態(tài)。
?
ExecutorService在初始化創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。
shutdown方法等待提交的任務(wù)執(zhí)行完成并不再接受新任務(wù),在完成全部提交的任務(wù)后關(guān)閉
shutdownNow方法將強(qiáng)制終止所有運(yùn)行中的任務(wù)并不再允許提交新任務(wù)
?
可以將一個(gè)Runnable(如例2)或Callable(如例3)提交給ExecutorService的submit方法執(zhí)行,最終返回一上Futire用來獲得任務(wù)的執(zhí)行結(jié)果或取消任務(wù)
例3:(任務(wù)執(zhí)行完成后并返回執(zhí)行結(jié)果)
| 1 2 3 4 5 6 7 8 9 10 11 | public?class?CallableAndFuture { ????public?static?void?main(String[] args)?throws?ExecutionException, InterruptedException { ????????ExecutorService executor = Executors.newSingleThreadExecutor(); ????????Future<String> future = executor.submit(new?Callable<String>() {???//接受一上callable實(shí)例 ????????????public?String call()?throws?Exception { ????????????????return?"MOBIN"; ????????????} ????????}); ????????System.out.println("任務(wù)的執(zhí)行結(jié)果:"+future.get()); ????} } |
輸出:
任務(wù)的執(zhí)行結(jié)果:MOBIN?
ExecutorCompletionService:實(shí)現(xiàn)了CompletionService,將執(zhí)行完成的任務(wù)放到阻塞隊(duì)列中,通過take或poll方法來獲得執(zhí)行結(jié)果
例4:(啟動(dòng)10條線程,誰先執(zhí)行完成就返回誰)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public?class?CompletionServiceTest { ????public?static?void?main(String[] args)?throws?InterruptedException, ExecutionException { ????????ExecutorService executor = Executors.newFixedThreadPool(10);????????//創(chuàng)建含10.條線程的線程池 ????????CompletionService completionService =?new?ExecutorCompletionService(executor); ????????for?(int?i =1; i <=10; i ++) { ????????????final??int?result = i; ????????????completionService.submit(new?Callable() { ????????????????public?Object call()?throws?Exception { ????????????????????Thread.sleep(new?Random().nextInt(5000));???//讓當(dāng)前線程隨機(jī)休眠一段時(shí)間 ????????????????????return?result; ????????????????} ????????????}); ????????} ????????System.out.println(completionService.take().get());???//獲取執(zhí)行結(jié)果 ????} } |
輸出結(jié)果可能每次都不同(在1到10之間)
3?
通過Executor來設(shè)計(jì)應(yīng)用程序可以簡化開發(fā)過程,提高開發(fā)效率,并有助于實(shí)現(xiàn)并發(fā),在開發(fā)中如果需要?jiǎng)?chuàng)建線程可優(yōu)先考慮使用Executor
from:?https://www.cnblogs.com/MOBIN/p/5436482.html?
總結(jié)
以上是生活随笔為你收集整理的java并发编程--Executor框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Executors创建的4种线程池的使用
- 下一篇: 由浅入深理解Java线程池及线程池的如何