并发编程-22J.U.C组件拓展之Fork/Join框架
文章目錄
- Fork/Join框架概述
- 工作竊取算法
- 優(yōu)點
- 缺點
- Fork/Join框架的設(shè)計
- ForkJoinTask
- ForkJoinPool
- 示例
- Fork/Join框架的異常處理
- 代碼
Fork/Join框架概述
Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
Fork就是把一個大任務(wù)切分為若干子任務(wù)并行的執(zhí)行,Join就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。
工作竊取算法
工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行.
假如我們需要做一個比較大的任務(wù),可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊列里,并為每個
隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng).
比如A線程負(fù)責(zé)處理A列里的任務(wù)。但是,有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。
干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠從雙端隊列的尾部拿任務(wù)執(zhí)行。
優(yōu)點
- 充分利用線程進行并行計算,減少了線程間的競爭。
缺點
- 在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。
- 該算法會消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列
Fork/Join框架的設(shè)計
如果讓我們來設(shè)計一個Fork/Join框架,該如何設(shè)計?
首先我們需要有一個fork類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大,所以還需要不停地分割,直到分割出的子任務(wù)足夠小。
分割的子任務(wù)分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里,啟動一個線程從隊列里拿數(shù)據(jù),然后合并這些數(shù)據(jù)。
Fork/Join使用ForkJoinTask、ForkJoinPool兩個類來完成以上兩件事情。
ForkJoinTask
ForkJoinTask:我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類。
Fork/Join框架提供了以下兩個子類
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)
- RecursiveTask:用于有返回結(jié)果的任務(wù)
ForkJoinPool
ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執(zhí)行。
任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護的雙端隊列中,進入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(wù)。
示例
需求:計算1+2+3+4的結(jié)果
使用Fork/Join框架首先要考慮到的是如何分割任務(wù),如果希望每個子任務(wù)最多執(zhí)行兩個數(shù)的相加,那么我們設(shè)置分割的閾值是2.
由于是4個數(shù)字相加,所以Fork/Join框架會把這個任務(wù)fork成兩個子任務(wù),子任務(wù)一負(fù)責(zé)計算1+2,子任務(wù)二負(fù)責(zé)計算3+4,然后再join兩個子任務(wù)的結(jié)果。
因為是有結(jié)果的任務(wù),所以必須繼承RecursiveTask
package com.artisan.example.aqs;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;@Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> {private static final long serialVersionUID = 5547724989189856681L;public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任務(wù)足夠小就計算任務(wù)boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 執(zhí)行子任務(wù)leftTask.fork();rightTask.fork();// 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任務(wù)sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();//生成一個計算任務(wù),計算1+2+3+4 + ... + 100ForkJoinTaskExample task = new ForkJoinTaskExample(1, 4);//執(zhí)行一個任務(wù)Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}} }ForkJoinTask與一般任務(wù)的主要區(qū)別在于它需要實現(xiàn)compute方法.
在這個方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù).
如果不足夠小,就必須分割成兩個子任務(wù),每個子任務(wù)在調(diào)用fork方法時,又會進入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成子任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。
使用join方法會等待子任務(wù)執(zhí)行完并得到其結(jié)果。
Fork/Join框架的異常處理
ForkJoinTask在執(zhí)行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了,并且可以通過ForkJoinTask的getException方法獲取異常.
if(task.isCompletedAbnormally()){System.out.println(task.getException());}getException方法返回Throwable對象,
- 如果任務(wù)被取消了則返回CancellationException,
- 如果任務(wù)沒有完成或者沒有拋出異常則返回null。
代碼
https://github.com/yangshangwei/ConcurrencyMaster
《新程序員》:云原生和全面數(shù)字化實踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的并发编程-22J.U.C组件拓展之Fork/Join框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 并发编程-21J.U.C组件拓展之Fut
- 下一篇: 并发编程-23J.U.C组件拓展之阻塞队