线程执行器
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
????通常我們使用JAVA來開發(fā)一個(gè)簡(jiǎn)單的并發(fā)應(yīng)用時(shí),會(huì)創(chuàng)建一些Runnable對(duì)象,然后創(chuàng)建對(duì)應(yīng)的Thread對(duì)象來執(zhí)行他們,但是,如果需要開發(fā)一個(gè)程序需要運(yùn)行大量并發(fā)任務(wù)的時(shí)候,這個(gè)方法顯然不合適。Java提供了執(zhí)行器框架(Executor Framework)來解決這些問題。
????Executor Framework機(jī)制分離了任務(wù)的創(chuàng)建和執(zhí)行。通過執(zhí)行器,僅需要實(shí)現(xiàn)Runnable接口的對(duì)象,然后把這個(gè)對(duì)象發(fā)送給執(zhí)行器即可。執(zhí)行器通過創(chuàng)建所需要的線程來負(fù)責(zé)這些Runnable對(duì)象的創(chuàng)建、實(shí)例化以及運(yùn)行。執(zhí)行器使用了線程池來提高應(yīng)用程序的性能。當(dāng)發(fā)送一個(gè)任務(wù)執(zhí)行器時(shí),執(zhí)行器會(huì)嘗試使用線程池中的線程來執(zhí)行這個(gè)任務(wù),避免了不斷地創(chuàng)建和銷毀線程而導(dǎo)致系統(tǒng)性能下降。
????執(zhí)行器框架另一個(gè)重要的優(yōu)勢(shì)是Callable接口。這個(gè)接口的主方法是call(),可以返回結(jié)果。當(dāng)發(fā)送一個(gè)Callable對(duì)象給執(zhí)行器時(shí),將獲得一個(gè)實(shí)現(xiàn)了Future接口的對(duì)象。可以使用這個(gè)對(duì)象來控制Callable對(duì)象的狀態(tài)和結(jié)果。
1、創(chuàng)建線程執(zhí)行器。
????使用執(zhí)行器框架(Executor Framework)的第一步是創(chuàng)建ThreadPoolExecutor對(duì)象。可以使用ThreadPoolExecutor類提供的四個(gè)構(gòu)造器或者使用Executor工廠類來創(chuàng)建ThreadPoolExecutor對(duì)象。一旦有了執(zhí)行器,就可以將Runnable或者Callable對(duì)象發(fā)送給它去執(zhí)行了。下面將用實(shí)例來演示Java創(chuàng)建線程執(zhí)行器。
package?org.concurrency.executorframework; import?java.util.Date; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?定義一個(gè)任務(wù)類,實(shí)現(xiàn)Runnable接口*?只是定義,不執(zhí)行*/ public?class?Task?implements?Runnable?{private?Date?initDate;//存儲(chǔ)任務(wù)創(chuàng)建時(shí)間private?String?name;//存儲(chǔ)任務(wù)的名稱public?Task()?{}public?Task(String?name)?{initDate?=?new?Date();this.name?=?name;}@Overridepublic?void?run()?{//?TODO?Auto-generated?method?stubSystem.out.printf("%s:?Task?%s?Created?on:?%s\n",Thread.currentThread().getName(),name,initDate);System.out.printf("%s:?Task?%s?Started?on:?%s\n",Thread.currentThread().getName(),name,initDate);try?{Long?duration?=?(long)(Math.random()*10);System.out.printf("%s:?Task?%s:?Doing?a?task?during?%d?seconds\n",Thread.currentThread().getName(),name,duration);TimeUnit.SECONDS.sleep(duration);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("%s:?Task?%s:?Finished?on:?%s\n",Thread.currentThread().getName(),name,new?Date());} }package?org.concurrency.executorframework; import?java.util.concurrent.Executors; import?java.util.concurrent.ThreadPoolExecutor; /***?@author?Administrator*?它將執(zhí)行通過執(zhí)行器接收到的每一個(gè)任務(wù)。*/ public?class?Server?{private?ThreadPoolExecutor?executor;public?Server()?{/*線程執(zhí)行器的創(chuàng)建有兩個(gè)方式:*?一個(gè)是直接使用ThreadPoolExecutor的構(gòu)造器來實(shí)現(xiàn)*?一個(gè)是通過Executors工廠類來構(gòu)造執(zhí)行器和其他相關(guān)對(duì)象。*?但是由于TheadPoolExecutor構(gòu)造器在使用上的復(fù)雜性,推薦使用Executors工廠類類創(chuàng)建。*?這里使用了Executors工廠類的newCacheThreadPoolExecutor()方法來創(chuàng)建一個(gè)緩存線程池*?返回一個(gè)ExecutorService對(duì)象,因此被強(qiáng)制轉(zhuǎn)換成ThreadPoolExecutor類型。*?使用線程池的優(yōu)點(diǎn)是減少新建線程所花費(fèi)的時(shí)間。此類緩存池的缺點(diǎn)是,如果發(fā)送過多任務(wù)給執(zhí)行器,系統(tǒng)的復(fù)合會(huì)過載。*?當(dāng)且僅當(dāng)線程的數(shù)量是合理的,或者線程只會(huì)運(yùn)行很短的時(shí)間時(shí),適合采用緩存線程池類。*?*/executor?=?(ThreadPoolExecutor)?Executors.newCachedThreadPool();}/***?創(chuàng)建了執(zhí)行器之后,就可以使用執(zhí)行器的execute()方法來發(fā)送Runnable或者Callable類型的任務(wù)。*?這里的Task是實(shí)現(xiàn)了Runnable接口的對(duì)象。*?這里也有一些執(zhí)行器相關(guān)的日志信息:*?getPoolSize():返回執(zhí)行器線程池中實(shí)際的線程數(shù)*?getActiveCount():返回執(zhí)行器中正在執(zhí)行任務(wù)的線程數(shù)*?getCompleteTaskCount():返回執(zhí)行器中已經(jīng)完成的任務(wù)數(shù)*?*/public?void?executeTask(Task?task){System.out.printf("Server:?A?new?task?hs?arrived\n");executor.execute(task);System.out.printf("Server:?Pool?Size:?%d\n",executor.getPoolSize());System.out.printf("Server:?Active?Count:?%d\n",executor.getActiveCount());System.out.printf("Server:?Completed?Tasks:?%d\n",executor.getCompletedTaskCount());}/***?執(zhí)行器以及ThreadPoolExecutor類一個(gè)重要的特性是,通常需要顯示地區(qū)結(jié)束,如果不這樣做,那么執(zhí)行器將繼續(xù)執(zhí)行。*?為了完成執(zhí)行器的執(zhí)行,可以使用ThreadPoolExecutor類的shutdown()方法。當(dāng)執(zhí)行器執(zhí)行完所有待運(yùn)行的任務(wù),它將結(jié)束執(zhí)行。*?如果再shutdown()方法之后,有新的任務(wù)發(fā)送給執(zhí)行器,那么會(huì)報(bào)出RejectExecutionException異常。*?*/public?void?endServer(){executor.shutdown();} }package?org.concurrency.executorframework; /***?@author?Administrator*?main主程序,循環(huán)創(chuàng)建Task*/ public?class?Task_Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stubServer?server?=?new?Server();for(int?i?=?0;i?<?100;i++){Task?task?=?new?Task("Task"+i);server.executeTask(task);}server.endServer();} }執(zhí)行結(jié)果:????
ThreadPoolExecutor類提供了其他結(jié)束執(zhí)行器的方法:
shutdownNow():這個(gè)方法會(huì)立即關(guān)閉執(zhí)行器。執(zhí)行器將不再執(zhí)行那些正在等待執(zhí)行的任務(wù)。這個(gè)方法將返回等待執(zhí)行的任務(wù)列表。調(diào)用時(shí),正在執(zhí)行的任務(wù)將繼續(xù)執(zhí)行,但這個(gè)方法不等待這個(gè)任務(wù)的完成。
isTerminated():如果調(diào)用了shutdown()或shutdownNow()方法,并且執(zhí)行器完成了關(guān)閉過程,那么這個(gè)方法將返回true。
isShutdown():如果調(diào)用了shutdown()方法,則返回true。
awaitTermination(long timeout,TimeUnit unit):這個(gè)方法將阻塞所調(diào)用的線程,知道執(zhí)行器完成任務(wù)或者達(dá)到所指定的timeout值。
2、創(chuàng)建固定大小的線程執(zhí)行器
????當(dāng)使用Executors類的newCachedThreadPool()方法創(chuàng)建的ThreadPoolExecutor時(shí),執(zhí)行器運(yùn)行過程中將碰到線程數(shù)量問題。如果線程池中沒有空閑的線程可用,那么執(zhí)行器將為接收到的每一個(gè)任務(wù)創(chuàng)建一個(gè)新的線程,當(dāng)發(fā)送大量的任務(wù)給執(zhí)行器并且任務(wù)需要持續(xù)較長(zhǎng)的時(shí)間時(shí),系統(tǒng)將會(huì)超負(fù)荷,應(yīng)用程序也將隨之不佳。
????為了避免這個(gè)問題,Executors工廠類提供了一個(gè)方法來床架一個(gè)固定大小的線程執(zhí)行器。這個(gè)執(zhí)行器有一個(gè)線程數(shù)的最大值,如果發(fā)送超過這個(gè)最大值的任務(wù)給執(zhí)行器,執(zhí)行器將不會(huì)創(chuàng)建額外的線程,剩下的任務(wù)將被阻塞直到執(zhí)行器有空閑的線程可用。這個(gè)特性可以保證執(zhí)行器不會(huì)給應(yīng)用程序帶來性能不佳的問題。
????可以對(duì)上述示例進(jìn)行修改
public?Server()?{executor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(5);}public?void?executeTask(Task?task){System.out.printf("Server:?A?new?task?hs?arrived\n");executor.execute(task);System.out.printf("Server:?Pool?Size:?%d\n",executor.getPoolSize());System.out.printf("Server:?Active?Count:?%d\n",executor.getActiveCount());System.out.printf("Server:?Completed?Tasks:?%d\n",executor.getCompletedTaskCount());System.out.printf("Server:?Task?Count:?%d\n",executor.getTaskCount());}????在這個(gè)示例中使用了Executors工廠類的newFixedThreadPool()方法來創(chuàng)建執(zhí)行器。這個(gè)方法創(chuàng)建了具有線程數(shù)量最大值的執(zhí)行器。如果發(fā)送超過線程數(shù)的任務(wù)給執(zhí)行器,剩余的任務(wù)將被阻塞知道線程池里有空閑的線程來處理他們。
3、在執(zhí)行器中執(zhí)行任務(wù)并返回結(jié)果
????執(zhí)行器框架(Executor Framework)的優(yōu)勢(shì)之一是,可以運(yùn)行并發(fā)任務(wù)并返回結(jié)果。Callable:這個(gè)接口聲明了call()方法。可以在這個(gè)方法里實(shí)現(xiàn)任務(wù)的具體邏輯操作。Callable接口是一個(gè)泛型接口,這意味著必須聲明call()方法返回的數(shù)據(jù)類型。Future:這個(gè)接口聲明了一些方法來獲取由Callable對(duì)象產(chǎn)生的結(jié)果,并管理它們的狀態(tài)。
package?org.concurrency.executorframework.callable; import?java.util.ArrayList; import?java.util.List; import?java.util.Random; import?java.util.concurrent.Callable; import?java.util.concurrent.ExecutionException; import?java.util.concurrent.Executors; import?java.util.concurrent.Future; import?java.util.concurrent.ThreadPoolExecutor; import?java.util.concurrent.TimeUnit; public?class?FactorialCalculator?implements?Callable<Integer>?{private?Integer?number;//存儲(chǔ)任務(wù)即將用來計(jì)算的數(shù)字public?FactorialCalculator(Integer?number)?{this.number?=?number;}@Overridepublic?Integer?call()?throws?Exception?{//?TODO?Auto-generated?method?stubint?result?=?1;if(number?==0?||?number?==1){result?=?1;}else{for(int?i?=2;i<number;i++){result?*=?i;TimeUnit.MILLISECONDS.sleep(20);}}System.out.printf("%s:?%d\n",Thread.currentThread().getName(),result);return?result;}public?static?void?main(String[]?args)?{/*通過Executors工廠類的newFixedThreadPool()方法創(chuàng)建ThreadPoolExecutor執(zhí)行器來運(yùn)行任務(wù)。這里最多創(chuàng)建2個(gè)線程*/ThreadPoolExecutor?executor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(2);List<Future<Integer>>?resultList?=?new?ArrayList<Future<Integer>>();Random?random?=?new?Random();for(int?i=0;i<10;i++){int?number?=?random.nextInt(10);FactorialCalculator?calculator?=?new?FactorialCalculator(number);Future<Integer>?result?=?executor.submit(calculator);resultList.add(result);}do{System.out.printf("Main:?Number?of?Completed?Tasks:%d\n",executor.getCompletedTaskCount());for(int?i=0;i<resultList.size();i++){Future<Integer>?result?=?resultList.get(i);System.out.printf("Main:?Task?%d:?%s\n",i,result.isDone());try?{TimeUnit.MILLISECONDS.sleep(50);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}}}while(executor.getCompletedTaskCount()?<?resultList.size());System.out.printf("Main:?Results\n");for(int?i?=0;i<resultList.size();i++){Future<Integer>?result?=?resultList.get(i);Integer?number?=?null;try?{number?=?result.get();}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}?catch?(ExecutionException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("Main:?Task?%d\n",i,number);}executor.shutdown();} }????在本節(jié)中我們學(xué)習(xí)了如何使用Callable接口來啟動(dòng)并發(fā)任務(wù)并返回結(jié)果。我們編寫了FactorialCaculator類,它實(shí)現(xiàn)了帶有泛型參數(shù)Integer類型的Callable接口。因此,這個(gè)Integer類型將作為調(diào)用call()方法時(shí)返回的類型。
????我們通過submit()方法發(fā)送一個(gè)Callable對(duì)象給執(zhí)行去執(zhí)行,這個(gè)submit()方法接收Callable對(duì)象作為參數(shù),并返回Future對(duì)象。Future對(duì)象可以用于以下兩個(gè)目的。
控制任務(wù)狀態(tài):可以取消任務(wù)或者檢查任務(wù)是否已經(jīng)完成。為了達(dá)到這個(gè)目的,可使用isDone()方法來檢查任務(wù)是否已經(jīng)完成。
公國(guó)call()方法獲取返回結(jié)果。為了達(dá)到這個(gè)目的,可以使用get()方法。這個(gè)方法一直等待直到Callable對(duì)象的call()方法執(zhí)行完成并返回結(jié)果。如果get()方法在等待結(jié)果時(shí)中斷了,則會(huì)拋出異常。如果call()方法拋出異常,那個(gè)get()也會(huì)拋出異常。
4、運(yùn)行多個(gè)任務(wù)并處理第一個(gè)結(jié)果
????并發(fā)編程中比較常見的一個(gè)問題是,當(dāng)采用多個(gè)并發(fā)任務(wù)解決一個(gè)問題時(shí),往往只關(guān)系這些任務(wù)的第一個(gè)結(jié)果。例如允許兩種驗(yàn)證機(jī)制,只要有一種驗(yàn)證機(jī)制成功,那么就驗(yàn)證通過。這主要是用到了ThreadPoolExecutor類的invokeAny()方法。
5、運(yùn)行多個(gè)任務(wù)并處理所有結(jié)果。
????執(zhí)行器框架(Executor Framework)允許執(zhí)行并發(fā)任務(wù)而不需要去考慮線程創(chuàng)建和執(zhí)行。它還提供了可以用來控制在執(zhí)行器中執(zhí)行任務(wù)的狀態(tài)和獲取任務(wù)結(jié)果的Future類。
6、在執(zhí)行器中周期性執(zhí)行任務(wù)。
????執(zhí)行器框架提供了ThreadPoolExecutor類,通過線程池來執(zhí)行并發(fā)任務(wù)從而避免了執(zhí)行所有線程的創(chuàng)建操作。當(dāng)一個(gè)任務(wù)給執(zhí)行器后,根據(jù)執(zhí)行器的配置,它將盡快地執(zhí)行這個(gè)任務(wù)。當(dāng)任務(wù)執(zhí)行結(jié)束后,這個(gè)任務(wù)就會(huì)從執(zhí)行器中刪除;如果想再次執(zhí)行這個(gè)任務(wù),則需要再次發(fā)送這個(gè)任務(wù)到執(zhí)行器。
????但是執(zhí)行器框架提供了ScheduledThreadPoolExecutor類來執(zhí)行周期性的任務(wù)。通過Executors工廠類的newScheduledThreadPoolExecutor()方法創(chuàng)建ScheduledThreadPoolExecutor執(zhí)行器對(duì)象。這個(gè)方法接收一個(gè)表示線程中的線程數(shù)類作參數(shù)。一旦有了可以執(zhí)行周期性的執(zhí)行器,就可以發(fā)送任務(wù)給這個(gè)執(zhí)行器。使用scheduledAtFixedRate()方法發(fā)送任務(wù)。scheduledAtFixedRate()方法返回一個(gè)ScheduledFuture對(duì)象,ScheduledFuture接口則擴(kuò)展了Future接口,于是它帶有了定時(shí)任務(wù)的相關(guān)操作方法。使用getDelay()方法返回任務(wù)到下一次執(zhí)行時(shí)所要等待的剩余時(shí)間。我們將通過一個(gè)實(shí)例來演示周期性執(zhí)行任務(wù)
package?org.concurrency.executorframework.scheduled; import?java.util.Date; /***?@author?Administrator*?創(chuàng)建任務(wù)線程*/ public?class?Task?implements?Runnable?{private?String?name;public?Task(String?name)?{this.name?=?name;} //?@Override //?public?String?call()?throws?Exception?{ //??//?TODO?Auto-generated?method?stub //??System.out.printf("%s:?Starting?at?:?%s\n",name,new?Date()); //??return?"Hello,world"; //?}@Overridepublic?void?run()?{//?TODO?Auto-generated?method?stubSystem.out.printf("%s:?Starting?at?:?%s\n",name,new?Date()); //??return?"Hello,world";} }package?org.concurrency.executorframework.scheduled; import?java.util.Date; import?java.util.concurrent.Executors; import?java.util.concurrent.ScheduledFuture; import?java.util.concurrent.ScheduledThreadPoolExecutor; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?主線程類*/ public?class?Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stub/*使用scheduledThreadPoolExecutor()方法創(chuàng)建ScheduledExecutorService對(duì)象,并轉(zhuǎn)化為ScheduledThreadPoolExecutor*?這個(gè)方法接收一個(gè)表示線程數(shù)量的整數(shù)作為參數(shù)。*?*/ScheduledThreadPoolExecutor?executor?=?(ScheduledThreadPoolExecutor)?Executors.newScheduledThreadPool(1);System.out.printf("Main:?Starting?at:?%s\n",new?Date());Task?task?=?new?Task("Task");/**?使用scheduledAtFixedRate()方法發(fā)送任務(wù)。這個(gè)方法接收四個(gè)參數(shù)*?1.被周期執(zhí)行的任務(wù)*?2.執(zhí)行第一次任務(wù)執(zhí)行后的延時(shí)時(shí)間*?3.兩次執(zhí)行的時(shí)間周期*?4.第2個(gè)和第3個(gè)參數(shù)的時(shí)間單位*?兩次執(zhí)行之間的周期是指任務(wù)咋兩次執(zhí)行開始的時(shí)間間隔。*?這期間可能會(huì)存在多個(gè)任務(wù)實(shí)例*?*/ScheduledFuture<?>?result?=?executor.scheduleAtFixedRate(task,?1,?2,?TimeUnit.SECONDS);for(int?i?=?0;i<10;i++){System.out.printf("Main:?Delay:?%d\n",result.getDelay(TimeUnit.MILLISECONDS));try?{TimeUnit.MILLISECONDS.sleep(500);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}}executor.shutdown();try?{TimeUnit.SECONDS.sleep(5);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("Main:?Finished?at:?%s\n",new?Date());} }????執(zhí)行結(jié)果截圖:
7、在執(zhí)行器中取消任務(wù)。如果需要取消已經(jīng)發(fā)送給執(zhí)行器的任務(wù),則需要使用Future接口的cancle()方法來執(zhí)行取消操作。
8、在執(zhí)行器中控制任務(wù)的完成。FutureTask類中提供了一個(gè)名為done()方法,允許在執(zhí)行器中的任務(wù)執(zhí)行結(jié)束后還可以執(zhí)行一些代碼。例如生成報(bào)表,通過郵件發(fā)送結(jié)果或釋放一些系統(tǒng)資源等。我們可以可以覆蓋FutureTask類的done()方法來控制任務(wù)的完成。
9、在執(zhí)行器中分離任務(wù)的啟動(dòng)和出結(jié)果的處理
????通常情況下,使用執(zhí)行器執(zhí)行并發(fā)任務(wù)時(shí),將Runnable或Callable任務(wù)發(fā)送給執(zhí)行器,并獲得Future對(duì)象來控制任務(wù)。此外,還會(huì)碰到如下情形,需要在一個(gè)對(duì)象里發(fā)送任務(wù)給執(zhí)行器,然后唉另一個(gè)對(duì)象里處理結(jié)果。對(duì)于這種情況,Java API提供了CompletionService類。
????CompletionService類有一個(gè)方法用來發(fā)送任務(wù)給執(zhí)行器,還有一個(gè)方法為下一個(gè)已經(jīng)執(zhí)行結(jié)束的任務(wù)獲取Future對(duì)象。
????我們將通過一個(gè)實(shí)例學(xué)習(xí)如何使用CompletionService類,在執(zhí)行器中分離任務(wù)的啟動(dòng)與結(jié)果的處理。
package?org.concurrency.executorframework.callable; import?java.util.concurrent.Callable; import?java.util.concurrent.TimeUnit; /***?@author?Administrator**/ public?class?ReportGenerator?implements?Callable<String>?{/*用來表示數(shù)據(jù)和報(bào)告*/private?String?sender;private?String?title;public?ReportGenerator(String?sender,?String?title)?{super();this.sender?=?sender;this.title?=?title;}@Overridepublic?String?call()?throws?Exception?{//?TODO?讓線程休眠一段隨機(jī)時(shí)間long?duration?=?(long)?(Math.random()*10);System.out.printf("%s_%s:?ReportGenerator:Generating?a?report?during?%d?seconds\n",this.sender,this.title,duration);TimeUnit.SECONDS.sleep(duration);String?ret?=?sender+":"+title;return?ret;} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; /***?@author?Administrator*?用來模擬請(qǐng)求報(bào)告*/ public?class?ReportRequest?implements?Runnable?{private?String?name;private?CompletionService<String>?service;public?ReportRequest(String?name,?CompletionService<String>?service)?{this.name?=?name;this.service?=?service;}@Overridepublic?void?run()?{//?TODO?創(chuàng)建了ReportGenerator對(duì)象,并使用submit()方法將此對(duì)對(duì)象發(fā)送給CompletionService。ReportGenerator?reportGenerator?=?new?ReportGenerator(name,?"Report");service.submit(reportGenerator);} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; import?java.util.concurrent.ExecutionException; import?java.util.concurrent.Future; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?這個(gè)類將獲取ReportGenerator任務(wù)的結(jié)果*/ public?class?ReportProcessor?implements?Runnable?{private?CompletionService<String>?service;private?boolean?end;public?ReportProcessor(CompletionService<String>?service)?{this.service?=?service;end?=?false;}@Overridepublic?void?run()?{//?TODO?獲取下一個(gè)已經(jīng)完成任務(wù)的Future對(duì)象;當(dāng)然這個(gè)任務(wù)是采用CompletionService來完成/*當(dāng)?*完成服務(wù)*任務(wù)結(jié)束,這些任務(wù)中的一個(gè)任務(wù)就執(zhí)行結(jié)束了,完成服務(wù)中存儲(chǔ)著Future對(duì)象,用來空載它在隊(duì)列中的隊(duì)形?*?調(diào)用poll()方法訪問這個(gè)隊(duì)列,查看是否有任務(wù)已經(jīng)完成,如果有就返回隊(duì)列中的第一個(gè)元素,即一個(gè)任務(wù)執(zhí)行完成后的Future對(duì)象。*?當(dāng)poll()方法返回Future對(duì)象后,它將從隊(duì)列中刪除這個(gè)Future對(duì)象。*?*/while(!end){try?{Future<String>?result?=?service.poll(20,?TimeUnit.SECONDS);if(result?!=?null){String?report?=?result.get();System.out.println("ReportReciver:Report?Received:"+?report);}}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}?catch?(ExecutionException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.println("ReportSender:?End");}}public?void?setEnd(boolean?end)?{this.end?=?end;} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; import?java.util.concurrent.ExecutorCompletionService; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?線程啟動(dòng)類*/ public?class?Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stubExecutorService?executor?=?Executors.newCachedThreadPool();CompletionService<String>?service?=?new?ExecutorCompletionService<>(executor);ReportRequest?faceRequest?=?new?ReportRequest("Face",?service);ReportRequest?onlineRequest?=?new?ReportRequest("Online",?service);Thread?faceThread?=?new?Thread(faceRequest);Thread?onlineThread?=?new?Thread(onlineRequest);ReportProcessor?processor?=?new?ReportProcessor(service);Thread?senderThread?=?new?Thread(processor);System.out.println("Main:?Staring?the?Threads");faceThread.start();onlineThread.start();senderThread.start();try?{System.out.println("Main:?Waiting?for?the?reportgenerators.");faceThread.join();onlineThread.join();}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.println("Main:?Shutting?down?the?executor.");executor.shutdown();try?{/*調(diào)用awaitTerminated()方法等待所有任務(wù)執(zhí)行結(jié)束*/executor.awaitTermination(1,?TimeUnit.DAYS);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}processor.setEnd(true);System.out.println("Main:Ends");} }執(zhí)行結(jié)果截圖:
10、處理在執(zhí)行器中被拒絕的任務(wù)。
當(dāng)我們想結(jié)束執(zhí)行器的執(zhí)行時(shí),調(diào)用shutdown()方法來表示執(zhí)行器應(yīng)當(dāng)結(jié)束,但是,執(zhí)行器只有等待正在運(yùn)行的任務(wù)或者等待執(zhí)行的任務(wù)結(jié)束后,才能真正結(jié)束。如果在此期間發(fā)送給一個(gè)任務(wù)給執(zhí)行器,這個(gè)任務(wù)會(huì)被拒絕,ThreadPoolExecutor提供了一套機(jī)制來處理被拒絕的任務(wù)。這些任務(wù)實(shí)現(xiàn)了RejectExecutionHandler接口。
本文出自 “阿酷博客源” 博客,請(qǐng)務(wù)必保留此出處http://aku28907.blog.51cto.com/5668513/1788500
轉(zhuǎn)載于:https://my.oschina.net/mrku/blog/693731
總結(jié)
- 上一篇: dedecms--需要注意的细节
- 下一篇: 重构随笔——重构的原则