當(dāng)我們通過Executor提交一組并發(fā)執(zhí)行的任務(wù),并且希望在每一個(gè)任務(wù)完成后能立即得到結(jié)果,有兩種方式可以采取:
?
方式一:
通過一個(gè)list來保存一組future,然后在循環(huán)中輪訓(xùn)這組future,直到每個(gè)future都已完成。如果我們不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒有及時(shí)獲取的情況,那么在調(diào)用get方式時(shí),需要將超時(shí)時(shí)間設(shè)置為0?
Java代碼 ?
public?class?CompletionServiceTest?{????????static?class?Task?implements?Callable<String>{??????????private?int?i;????????????????????public?Task(int?i){??????????????this.i?=?i;??????????}????????????@Override??????????public?String?call()?throws?Exception?{??????????????Thread.sleep(10000);??????????????return?Thread.currentThread().getName()?+?"執(zhí)行完任務(wù):"?+?i;??????????}?????????}????????????public?static?void?main(String[]?args){??????????testUseFuture();??????}????????????private?static?void?testUseFuture(){??????????int?numThread?=?5;??????????ExecutorService?executor?=?Executors.newFixedThreadPool(numThread);??????????List<Future<String>>?futureList?=?new?ArrayList<Future<String>>();??????????for(int?i?=?0;i<numThread;i++?){??????????????Future<String>?future?=?executor.submit(new?CompletionServiceTest.Task(i));??????????????futureList.add(future);??????????}????????????????????????????while(numThread?>?0){??????????????for(Future<String>?future?:?futureList){??????????????????String?result?=?null;??????????????????try?{??????????????????????result?=?future.get(0,?TimeUnit.SECONDS);??????????????????}?catch?(InterruptedException?e)?{??????????????????????e.printStackTrace();??????????????????}?catch?(ExecutionException?e)?{??????????????????????e.printStackTrace();??????????????????}?catch?(TimeoutException?e)?{??????????????????????//超時(shí)異常直接忽略??????????????????}??????????????????if(null?!=?result){??????????????????????futureList.remove(future);??????????????????????numThread--;??????????????????????System.out.println(result);??????????????????????//此處必須break,否則會(huì)拋出并發(fā)修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)??????????????????????break;??????????????????}??????????????}??????????}??????}??}?? ?方式二:
第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達(dá)到代碼最簡(jiǎn)化的效果。
Java代碼 ?
public?class?CompletionServiceTest?{????????static?class?Task?implements?Callable<String>{??????????private?int?i;????????????????????public?Task(int?i){??????????????this.i?=?i;??????????}????????????@Override??????????public?String?call()?throws?Exception?{??????????????Thread.sleep(10000);??????????????return?Thread.currentThread().getName()?+?"執(zhí)行完任務(wù):"?+?i;??????????}?????????}????????????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException{??????????testExecutorCompletionService();??????}????????????private?static?void?testExecutorCompletionService()?throws?InterruptedException,?ExecutionException{??????????int?numThread?=?5;??????????ExecutorService?executor?=?Executors.newFixedThreadPool(numThread);??????????CompletionService<String>?completionService?=?new?ExecutorCompletionService<String>(executor);??????????for(int?i?=?0;i<numThread;i++?){??????????????completionService.submit(new?CompletionServiceTest.Task(i));??????????}??}????????????????????for(int?i?=?0;i<numThread;i++?){???????????????????System.out.println(completionService.take().get());??????????}????????????????}?? ?
ExecutorCompletionService分析:
?CompletionService是Executor和BlockingQueue的結(jié)合體。
Java代碼 ?
public?ExecutorCompletionService(Executor?executor)?{??????????if?(executor?==?null)??????????????throw?new?NullPointerException();??????????this.executor?=?executor;??????????this.aes?=?(executor?instanceof?AbstractExecutorService)????????????????(AbstractExecutorService)?executor?:?null;??????????this.completionQueue?=?new?LinkedBlockingQueue<Future<V>>();??????}?? ?任務(wù)的提交和執(zhí)行都是委托給Executor來完成。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)首先將被包裝為一個(gè)QueueingFuture,
Java代碼 ?
public?Future<V>?submit(Callable<V>?task)?{??????????if?(task?==?null)?throw?new?NullPointerException();??????????RunnableFuture<V>?f?=?newTaskFor(task);??????????executor.execute(new?QueueingFuture(f));??????????return?f;??????}?? ?QueueingFuture是FutureTask的一個(gè)子類,通過改寫該子類的done方法,可以實(shí)現(xiàn)當(dāng)任務(wù)完成時(shí),將結(jié)果放入到BlockingQueue中。
?
Java代碼 ?
private?class?QueueingFuture?extends?FutureTask<Void>?{??????????QueueingFuture(RunnableFuture<V>?task)?{??????????????super(task,?null);??????????????this.task?=?task;??????????}??????????protected?void?done()?{?completionQueue.add(task);?}??????????private?final?Future<V>?task;??????}?? ?而通過使用BlockingQueue的take或poll方法,則可以得到結(jié)果。在BlockingQueue不存在元素時(shí),這兩個(gè)操作會(huì)阻塞,一旦有結(jié)果加入,則立即返回。
Java代碼 ?
public?Future<V>?take()?throws?InterruptedException?{??????return?completionQueue.take();??}????public?Future<V>?poll()?{??????return?completionQueue.poll();??}?原文:http://xw-z1985.iteye.com/blog/1997077
轉(zhuǎn)載于:https://www.cnblogs.com/langtianya/p/5013353.html
總結(jié)
以上是生活随笔為你收集整理的获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。