日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ExecutorCompletionService 源码分析

發布時間:2024/9/30 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ExecutorCompletionService 源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概要

在ExecutorService的submit方法中可以獲取返回值,通過Future的get方法,但是這個Future類存在缺陷,Future接口調用get()方法取得處理后的返回結果時具有阻塞性,也就是說調用Future的get方法時,任務沒有執行完成,則get方法要一直阻塞等到任務完成為止。 這樣大大的影響了系統的性能,這就是Future的最大缺點。為此,java1.5以后提供了CompletionServlice來解決這個問題。

CompletionService 接口CompletionService的功能是異步的方式,一邊生產任務,一邊處理完成的任務結果,這樣可以將執行的任務與處理任務隔離開來進行處理,使用submit執行任務,使用塔克獲取已完成的任務,并按照這些任務的完成的時間順序來處理他們的結果。

示例

向ExecutorService 提交一組任務,哪個任務先完成,就把完成任務的返回結果打印出來。

public class CompletionServiceExecutorDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(10);// 同時運行多個任務,那個任務先返回數據,就先獲取該數據CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);for (int i = 1; i <= 10; i++) {final int seq = i;completionService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {int waitTime = new Random().nextInt(10);TimeUnit.SECONDS.sleep(waitTime);return "callable:"+seq+" 執行時間:"+waitTime+"s";}});}for (int i = 1; i <= 10; i++) {try {Future<String> future = completionService.take();System.out.println(future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}threadPool.shutdown();} }

執行結果如下:

callable:6 執行時間:1s callable:2 執行時間:3s callable:10 執行時間:3s callable:1 執行時間:4s callable:4 執行時間:5s callable:8 執行時間:5s callable:7 執行時間:7s callable:5 執行時間:8s callable:9 執行時間:9s callable:3 執行時間:9s

從打印結果可以看出,這些任務是按照任務執行完成的順序打印的,先執行完就先返回結果。

ExecutorCompletionService 源碼分析

ExecutorCompletionService 類結構如下

public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor; //線程池private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //任務完成隊列private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); }private final Future<V> task;}

ExecutorCompletionService 類中定義了一個QueueingFuture 的內部類,繼承于FutureTask類,內部重寫了FutureTask的done方法,該方法是在FutureTask任務執行完成后會調用的方法,在FutureTask中該方法未實現任何邏輯。

重寫done方法,在任務處理完成后把該FutureTask任務放入到阻塞隊列(BlockingQueue)中,然后我們就可以從阻塞隊列中take執行完成的任務,進行想用的處理。

這里是實現ExecutorCompletionService的核心邏輯。

newTaskFor 方法

private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}

ExecutorCompletionService 支持Callable和Runnable任務
1. 把用戶提交的Callable任務轉成FutureTask。
2. 把用戶提交的Runnable任務轉成FutureTask。

ExecutorCompletionService 構造方法1

public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}
  • 連接池(executor)不能為空。
  • 判斷該線程池是否AbstractExecutorService類型,如果是則賦值給aes,否則賦值null
    (aes作用:把用戶提交的Callable和Runnable任務轉換成FutureTask)
  • 創建一個阻塞隊列。(存放執行完成的FutureTask任務)
  • ExecutorCompletionService 構造方法2

    public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}

    該構造可以指定一個阻塞隊列,其它功能同上構造方法。

    submit方法

    public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture(f));return f;}

    該方法可以向ExecutorCompletionService 中提交要執行的任務。
    支持Callable和Runnable兩種類型的任務。
    如果提交的Runnable任務,則執行完后返回的結果為null。

    public Future<V> take() throws InterruptedException {return completionQueue.take();}

    從阻塞隊列中獲取執行完成的任務的,如果隊列為空且任務沒有全部完成,則阻塞當前線程,直到有任務執行完成。

    public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}}

    ExecutorCompletionService支持非阻塞方式從阻塞隊列中獲取已完成的任務
    1. 可以通過poll方法來從阻塞隊列中獲取任務,如果隊列為空,則直接返回null,不會阻塞當前線程。
    2.支持等待多長時間來從阻塞隊列中獲取已經完成的任務。

    總結

    ExecutorCompletionService的實現原理是內部使用了FutureTask來實現異步的任務執行。通過一個內部類繼承FutureTask,并實現了FutureTask的一個done方法。該done方法會在任務執行完成之后調用該方法,在任務執行完之后把當前的FutureTask放入到阻塞隊列中。這樣就實現了先執行完成的任務先存放到阻塞隊列中,應用程序可以從阻塞隊列中提前獲取先執行完的任務。

    本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8????
    點擊這里快速進入簡書

    GIT地址:http://git.oschina.net/brucekankan/
    點擊這里快速進入GIT

    總結

    以上是生活随笔為你收集整理的ExecutorCompletionService 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。