日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发编程-22J.U.C组件拓展之Fork/Join框架

發(fā)布時間:2025/3/21 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发编程-22J.U.C组件拓展之Fork/Join框架 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • Fork/Join框架概述
  • 工作竊取算法
    • 優(yōu)點
    • 缺點
  • Fork/Join框架的設(shè)計
    • ForkJoinTask
    • ForkJoinPool
  • 示例
  • Fork/Join框架的異常處理
  • 代碼

Fork/Join框架概述

Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。

Fork就是把一個大任務(wù)切分為若干子任務(wù)并行的執(zhí)行,Join就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。


工作竊取算法

工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行.

假如我們需要做一個比較大的任務(wù),可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊列里,并為每個
隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng).

比如A線程負(fù)責(zé)處理A列里的任務(wù)。但是,有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。

干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列被竊取任務(wù)線程永遠從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠從雙端隊列的尾部拿任務(wù)執(zhí)行

優(yōu)點

  • 充分利用線程進行并行計算,減少了線程間的競爭。

缺點

  • 在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。
  • 該算法會消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列

Fork/Join框架的設(shè)計

如果讓我們來設(shè)計一個Fork/Join框架,該如何設(shè)計?

  • Step1: 分割任務(wù)
  • 首先我們需要有一個fork類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大,所以還需要不停地分割,直到分割出的子任務(wù)足夠小。

  • Step2: 執(zhí)行任務(wù)并合并結(jié)果
  • 分割的子任務(wù)分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里,啟動一個線程從隊列里拿數(shù)據(jù),然后合并這些數(shù)據(jù)。

    Fork/Join使用ForkJoinTask、ForkJoinPool兩個類來完成以上兩件事情。

    ForkJoinTask

    ForkJoinTask:我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類。

    Fork/Join框架提供了以下兩個子類

    • RecursiveAction:用于沒有返回結(jié)果的任務(wù)
    • RecursiveTask:用于有返回結(jié)果的任務(wù)

    ForkJoinPool

    ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執(zhí)行。

    任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護的雙端隊列中,進入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(wù)。


    示例

    需求:計算1+2+3+4的結(jié)果

    使用Fork/Join框架首先要考慮到的是如何分割任務(wù),如果希望每個子任務(wù)最多執(zhí)行兩個數(shù)的相加,那么我們設(shè)置分割的閾值是2.

    由于是4個數(shù)字相加,所以Fork/Join框架會把這個任務(wù)fork成兩個子任務(wù),子任務(wù)一負(fù)責(zé)計算1+2,子任務(wù)二負(fù)責(zé)計算3+4,然后再join兩個子任務(wù)的結(jié)果。

    因為是有結(jié)果的任務(wù),所以必須繼承RecursiveTask

    package com.artisan.example.aqs;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;@Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> {private static final long serialVersionUID = 5547724989189856681L;public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任務(wù)足夠小就計算任務(wù)boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 執(zhí)行子任務(wù)leftTask.fork();rightTask.fork();// 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任務(wù)sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();//生成一個計算任務(wù),計算1+2+3+4 + ... + 100ForkJoinTaskExample task = new ForkJoinTaskExample(1, 4);//執(zhí)行一個任務(wù)Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}} }

    ForkJoinTask與一般任務(wù)的主要區(qū)別在于它需要實現(xiàn)compute方法.

    在這個方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù).

    如果不足夠小,就必須分割成兩個子任務(wù),每個子任務(wù)在調(diào)用fork方法時,又會進入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成子任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。

    使用join方法會等待子任務(wù)執(zhí)行完并得到其結(jié)果。


    Fork/Join框架的異常處理

    ForkJoinTask在執(zhí)行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了,并且可以通過ForkJoinTask的getException方法獲取異常.

    if(task.isCompletedAbnormally()){System.out.println(task.getException());}

    getException方法返回Throwable對象,

    • 如果任務(wù)被取消了則返回CancellationException,
    • 如果任務(wù)沒有完成或者沒有拋出異常則返回null。

    代碼

    https://github.com/yangshangwei/ConcurrencyMaster

    《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

    總結(jié)

    以上是生活随笔為你收集整理的并发编程-22J.U.C组件拓展之Fork/Join框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。