日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 中的fork join框架

發布時間:2024/2/28 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 中的fork join框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • ForkJoinPool
    • ForkJoinWorkerThread
    • ForkJoinTask
    • 在ForkJoinPool中提交Task

java 中的fork join框架

fork join框架是java 7中引入框架,這個框架的引入主要是為了提升并行計算的能力。

fork join主要有兩個步驟,第一就是fork,將一個大任務分成很多個小任務,第二就是join,將第一個任務的結果join起來,生成最后的結果。如果第一步中并沒有任何返回值,join將會等到所有的小任務都結束。

還記得之前的文章我們講到了thread pool的基本結構嗎?

  • ExecutorService - ForkJoinPool 用來調用任務執行。
  • workerThread - ForkJoinWorkerThread 工作線程,用來執行具體的任務。
  • task - ForkJoinTask 用來定義要執行的任務。
  • 下面我們從這三個方面來詳細講解fork join框架。

    ForkJoinPool

    ForkJoinPool是一個ExecutorService的一個實現,它提供了對工作線程和線程池的一些便利管理方法。

    public class ForkJoinPool extends AbstractExecutorService

    一個work thread一次只能處理一個任務,但是ForkJoinPool并不會為每個任務都創建一個單獨的線程,它會使用一個特殊的數據結構double-ended queue來存儲任務。這樣的結構可以方便的進行工作竊取(work-stealing)。

    什么是work-stealing呢?

    默認情況下,work thread從分配給自己的那個隊列頭中取出任務。如果這個隊列是空的,那么這個work thread會從其他的任務隊列尾部取出任務來執行,或者從全局隊列中取出。這樣的設計可以充分利用work thread的性能,提升并發能力。

    下面看下怎么創建一個ForkJoinPool。

    最常見的方法就是使用ForkJoinPool.commonPool()來創建,commonPool()為所有的ForkJoinTask提供了一個公共默認的線程池。

    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

    另外一種方式是使用構造函數:

    ForkJoinPool forkJoinPool = new ForkJoinPool(2);

    這里的參數是并行級別,2指的是線程池將會使用2個處理器核心。

    ForkJoinWorkerThread

    ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。

    public class ForkJoinWorkerThread extends Thread }

    和一般的線程不一樣的是它定義了兩個變量:

    final ForkJoinPool pool; // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

    一個是該worker thread所屬的ForkJoinPool。 另外一個是支持 work-stealing機制的Queue。

    再看一下它的run方法:

    public void run() {if (workQueue.array == null) { // only run onceThrowable exception = null;try {onStart();pool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);} catch (Throwable ex) {if (exception == null)exception = ex;} finally {pool.deregisterWorker(this, exception);}}}}

    簡單點講就是從Queue中取出任務執行。

    ForkJoinTask

    ForkJoinTask是ForkJoinPool中運行的任務類型。通常我們會用到它的兩個子類:RecursiveAction和RecursiveTask。

    他們都定義了一個需要實現的compute()方法用來實現具體的業務邏輯。不同的是RecursiveAction只是用來執行任務,而RecursiveTask可以有返回值。

    既然兩個類都帶了Recursive,那么具體的實現邏輯也會跟遞歸有關,我們舉個使用RecursiveAction來打印字符串的例子:

    public class CustomRecursiveAction extends RecursiveAction {private String workload = "";private static final int THRESHOLD = 4;private static Logger logger =Logger.getAnonymousLogger();public CustomRecursiveAction(String workload) {this.workload = workload;}@Overrideprotected void compute() {if (workload.length() > THRESHOLD) {ForkJoinTask.invokeAll(createSubtasks());} else {processing(workload);}}private List<CustomRecursiveAction> createSubtasks() {List<CustomRecursiveAction> subtasks = new ArrayList<>();String partOne = workload.substring(0, workload.length() / 2);String partTwo = workload.substring(workload.length() / 2, workload.length());subtasks.add(new CustomRecursiveAction(partOne));subtasks.add(new CustomRecursiveAction(partTwo));return subtasks;}private void processing(String work) {String result = work.toUpperCase();logger.info("This result - (" + result + ") - was processed by "+ Thread.currentThread().getName());} }

    上面的例子使用了二分法來打印字符串。

    我們再看一個RecursiveTask的例子:

    public class CustomRecursiveTask extends RecursiveTask<Integer> {private int[] arr;private static final int THRESHOLD = 20;public CustomRecursiveTask(int[] arr) {this.arr = arr;}@Overrideprotected Integer compute() {if (arr.length > THRESHOLD) {return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();} else {return processing(arr);}}private Collection<CustomRecursiveTask> createSubtasks() {List<CustomRecursiveTask> dividedTasks = new ArrayList<>();dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));return dividedTasks;}private Integer processing(int[] arr) {return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();} }

    和上面的例子很像,不過這里我們需要有返回值。

    在ForkJoinPool中提交Task

    有了上面的兩個任務,我們就可以在ForkJoinPool中提交了:

    int[] intArray= {12,12,13,14,15};CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);int result = forkJoinPool.invoke(customRecursiveTask);System.out.println(result);

    上面的例子中,我們使用invoke來提交,invoke將會等待任務的執行結果。

    如果不使用invoke,我們也可以將其替換成fork()和join():

    customRecursiveTask.fork();int result2= customRecursiveTask.join();System.out.println(result2);

    fork() 是將任務提交給pool,但是并不觸發執行, join()將會真正的執行并且得到返回結果。

    本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin

    更多精彩內容且看:

    • 區塊鏈從入門到放棄系列教程-涵蓋密碼學,超級賬本,以太坊,Libra,比特幣等持續更新
    • Spring Boot 2.X系列教程:七天從無到有掌握Spring Boot-持續更新
    • Spring 5.X系列教程:滿足你對Spring5的一切想象-持續更新
    • java程序員從小工到專家成神之路(2020版)-持續更新中,附詳細文章教程

    更多教程請參考 flydean的博客

    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的java 中的fork join框架的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。