Future 和 ExecutorCompletionService 对比和使用
附加:Java 4種線程池介紹請查看?
談?wù)刵ew Thread的弊端及Java四種線程池的使用
當(dāng)我們通過Executor提交一組并發(fā)執(zhí)行的任務(wù),并且希望在每一個任務(wù)完成后能立即得到結(jié)果,有兩種方式可以采取:
?
方式一:
通過一個list來保存一組future,然后在循環(huán)中輪訓(xùn)這組future,直到每個future都已完成。如果我們不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒有及時獲取的情況,那么在調(diào)用get方式時,需要將超時時間設(shè)置為0
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執(zhí)行完任務(wù):" + i; } } public static void main(String[] args){ testUseFuture(); } private static void testUseFuture(){ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); List<Future<String>> futureList = new ArrayList<Future<String>>(); for(int i = 0;i<numThread;i++ ){ Future<String> future = executor.submit(new CompletionServiceTest.Task(i)); futureList.add(future); } while(numThread > 0){ for(Future<String> future : futureList){ String result = null; try { result = future.get(0, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { //超時異常直接忽略//future.cancel(true);//超時設(shè)置任務(wù)取消} if(null != result){ futureList.remove(future); numThread--; System.out.println(result); //此處必須break,否則會拋出并發(fā)修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決) break; } } } } }
?
?方式二:
第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達(dá)到代碼最簡化的效果。
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執(zhí)行完任務(wù):" + i; } } public static void main(String[] args) throws InterruptedException, ExecutionException{ testExecutorCompletionService(); } private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ int numThread = 3; ExecutorService executor = Executors.newFixedThreadPool(numThread); CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); for(int i = 0;i<numThread;i++ ){ completionService.submit(new CompletionServiceTest.Task(i)); } } for(int i = 0;i<numThread;i++ ){ System.out.println(completionService.take().get()); //獲取執(zhí)行結(jié)果} }?
ExecutorCompletionService分析:
?CompletionService是Executor和BlockingQueue的結(jié)合體。
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }?
任務(wù)的提交和執(zhí)行都是委托給Executor來完成。在構(gòu)造函數(shù)中創(chuàng)建一個BlockingQueue來保存計算完成的結(jié)果,當(dāng)提交某個任務(wù)時,該任務(wù)首先將被包裝為一個QueueingFuture,
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }?QueueingFuture,這是FutureTask的一個子類,通過改寫該子類的done方法,可以實(shí)現(xiàn)當(dāng)任務(wù)完成時,將結(jié)果放入到BlockingQueue中。
??
private class QueueingFuture extends FutureTask<Void> {
? ? ? ? QueueingFuture(RunnableFuture<V> task) {
? ? ? ? ? ? super(task, null);
? ? ? ? ? ? this.task = task;
? ? ? ? }
? ? ? ? protected void done() { completionQueue.add(task); }
? ? ? ? private final Future<V> task;
? ? }
?
?
?而通過使用BlockingQueue的take(阻塞獲取)或poll(非阻塞獲取)方法,則可以得到結(jié)果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結(jié)果加入,則立即返回。
附加知識點(diǎn):
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到Blocking有新的對象被加入為止;
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回nul
public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); }?
轉(zhuǎn)載于:https://www.cnblogs.com/cnmenglang/p/6273401.html
總結(jié)
以上是生活随笔為你收集整理的Future 和 ExecutorCompletionService 对比和使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些零碎知识
- 下一篇: Prim算法的3个版本