日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ExecutorCompletionService分析及使用

發布時間:2024/1/17 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ExecutorCompletionService分析及使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

當我們通過Executor提交一組并發執行的任務,并且希望在每一個任務完成后能立即得到結果,有兩種方式可以采取:

方式一:

通過一個list來保存一組future,然后在循環中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,那么在調用get方式時,需要將超時時間設置為0?

1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "執行完任務:" + i; 14 } 15 } 16 17 public static void main(String[] args) { 18 testUseFuture(); 19 } 20 21 private static void testUseFuture() { 22 int numThread = 5; 23 ExecutorService executor = Executors.newFixedThreadPool(numThread); 24 List<Future<String>> futureList = new ArrayList<Future<String>>(); 25 for (int i = 0; i < numThread; i++) { 26 Future<String> future = executor 27 .submit(new ExecutorCompletionServiceTest.Task(i)); 28 futureList.add(future); 29 } 30 31 while (numThread > 0) { 32 for (Future<String> future : futureList) { 33 String result = null; 34 try { 35 result = future.get(0, TimeUnit.SECONDS); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } catch (TimeoutException e) { 41 // 超時異常直接忽略 42 } 43 if (null != result) { 44 futureList.remove(future); 45 numThread--; 46 System.out.println(result); 47 // 此處必須break,否則會拋出并發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決) 48 break; 49 } 50 } 51 } 52 } 53 }

?方式二:

第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到代碼最簡化的效果。

1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "執行完任務:" + i; 14 } 15 } 16 17 public static void main(String[] args) throws InterruptedException, ExecutionException { 18 testExecutorCompletionService(); 19 } 20 21 private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ 22 int numThread = 5; 23 ExecutorService executorService = Executors.newFixedThreadPool(numThread); 24 CompletionService<String> completionService = new ExecutorCompletionService<>(executorService); 25 for (int i = 0; i < numThread; i++) { 26 completionService.submit(new ExecutorCompletionServiceTest.Task(i)); 27 } 28 for (int i = 0; i < numThread; i++) { 29 System.out.println(completionService.take().get()); 30 } 31 executorService.shutdown(); 32 } 33 }

ExecutorCompletionService實現了CompletionService接口,CompletionService是Executor和BlockingQueue的結合體。可以看下構造函數

1 public ExecutorCompletionService(Executor executor) { 2 if (executor == null) 3 throw new NullPointerException(); 4 this.executor = executor; 5 this.aes = (executor instanceof AbstractExecutorService) ? 6 (AbstractExecutorService) executor : null; 7 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 8 }

任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,

1 public Future<V> submit(Callable<V> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<V> f = newTaskFor(task); 4 executor.execute(new QueueingFuture(f)); 5 return f; 6 }

QueueingFuture是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

1 private class QueueingFuture extends FutureTask<Void> { 2 QueueingFuture(RunnableFuture<V> task) { 3 super(task, null); 4 this.task = task; 5 } 6 protected void done() { completionQueue.add(task); } 7 private final Future<V> task; 8 }

而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。

1 public Future<V> take() throws InterruptedException { 2 return completionQueue.take(); 3 } 4 5 public Future<V> poll() { 6 return completionQueue.poll(); 7 }

?

總結

以上是生活随笔為你收集整理的ExecutorCompletionService分析及使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。