JUC系列(十) | ForkJoin框架 并行处理任务
多線程一直Java開發中的難點,也是面試中的常客,趁著還有時間,打算鞏固一下JUC方面知識,我想機會隨處可見,但始終都是留給有準備的人的,希望我們都能加油!!!
沉下去,再浮上來,我想我們會變的不一樣的。
🚤Fork&Join框架
1)介紹
Fork/Join框架是從Java1.7開始提供的一個并行處理任務的框架,它的基本思路是將一個大任務分解成若干個小任務,并行處理多個小任務,最后再匯總合并這些小任務的結果便可得到原來的大任務結果。
1、Fork :遞歸式的將大任務分割成合適大小的小任務。
2、Join:執行任務并合并結果。
2)相關類
我們要使用 Fork/Join 框架,首先需要創建一個 ForkJoin 任務。 ForkJoin 類提供了在任務中執行 fork 和 join 的機制。
通常情況下我們都是直接繼承ForkJoinTask 的子類,Fork/Join框架提供了兩個子類:
-
RecursiveAction:一個遞歸無結果的ForkJoinTask(沒有返回值)任務
-
RecursiveTask:一個遞歸有結果的ForkJoinTask(有返回值)任務
-
ForkJoinTask 主要方法:
fork() // 在當前線程運行的線程池中安排一個異步執行。簡單的理解就是再創建一個子任務。 join() //當任務完成的時候返回計算結果。 invoke() //開始執行任務,如果必要,等待計算完成。
ForkJoinPool:另外ForkJoinTask需要通過 ForkJoinPool 來執行
RecursiveTask:一個遞歸有結果的ForkJoinTask(有返回值)任務
🚁Fork 方法
調用fork方法時,程序會將新創建的子任務放入當前線程的workQueue隊列中,Fork/Join框架將根據當前正在并發執行的ForkJoinTask任務的ForkJoinWorkerThread線程狀態,來決定是讓這個任務在隊列中等待,還是創建一個新的ForkJoinWorkerThread線程運行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運行它。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);else//將給定的任務添加到提交者當前隊列的提交隊列中,如果為 null 或存在競爭,則創建一個。ForkJoinPool.common.externalPush(this);return this; }push方法把當前任務存放在 ForkJoinTask 數組隊列里。然后再調用 ForkJoinPool 的 signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:
//推送任務。 僅由非共享隊列中的所有者調用。 final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a;int s = top, d, cap, m;ForkJoinPool p = pool;if ((a = array) != null && (cap = a.length) > 0) {QA.setRelease(a, (m = cap - 1) & s, task);top = s + 1;if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&p != null) { // size 0 or 1VarHandle.fullFence();// signalWork方法的意義在于,如果運行的工作程序太少,則嘗試創建或釋放工作程序。p.signalWork(); }// 如果array的剩余空間不夠了,則進行增加else if (d == m)growArray(false); } }🪂Join 方法
Join 方法的主要作用是阻塞當前線程并等待獲取結果。代碼如下:
通過 doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有 4 種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)//假如任務狀態是拋出異常狀態,就直接拋出對應的異常//若是任務狀態是被取消狀態,則直接拋出CancellationException異常。reportException(s);//如若任務狀態是已經完成,則直接立馬返回任務結果。//即使此任務異常完成,如果不知道此任務已完成,則返回null return getRawResult(); }讓我們分析一下 doJoin 方法的實現
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;//1. 首先通過查看任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;//2. 如果沒有執行完任務,則從任務數組里取出任務并執行。//3. 如果任務順利執行完成,則設置任務狀態為 NORMAL,假如出現異常,則記錄異常,并將任務狀態設置為 EXCEPTIONAL。return (s = status) < 0 ?s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s ://幫助和/或阻塞,直到給定的任務完成或超時wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone(); }🚤Fork/Join 框架的異常處理
ForkJoinTask 在執行的時候可能會拋出異常,但因為它并不是在主線程中運行,故此沒有辦法在主線程中去捕獲異常,這種問題當然 ForkJoinTask 也是提供了API來處理的啦,如下:
🌍入門案例
場景: 生成一個計算任務,計算 1+2+3…+1000,每 100 個數切分一個 子任務
import java.util.concurrent.RecursiveTask; /** * 遞歸累加 */ public class TaskExample extends RecursiveTask<Long> {private int start;private int end;private long sum;/*** 構造函數* @param start* @param end*/public TaskExample(int start, int end){this.start = start;this.end = end;}@Overrideprotected Long compute() {System.out.println("任務" + start + "=========" + end + "累加開始");//大于 100 個數相加切分,小于直接加if(end - start <= 100){for (int i = start; i <= end; i++) {//累加sum += i;}}else {//切分為 2 塊int middle = start + 100;//遞歸調用,切分為 2 個小任務TaskExample taskExample1 = new TaskExample(start, middle);TaskExample taskExample2 = new TaskExample(middle + 1, end);//執行:異步taskExample1.fork();taskExample2.fork();//同步阻塞獲取執行結果sum = taskExample1.join() + taskExample2.join();}//加完返回return sum;} } import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; /** * 分支合并案例 */ public class ForkJoinPoolDemo {/*** 生成一個計算任務,計算 1+2+3.........+1000* @param args*/public static void main(String[] args) {//定義任務TaskExample taskExample = new TaskExample(1, 1000);//定義執行對象ForkJoinPool forkJoinPool = new ForkJoinPool();//加入任務執行ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);//輸出結果try {System.out.println(result.get());}catch (Exception e){e.printStackTrace();}finally {forkJoinPool.shutdown();}} }🌈自言自語
最近又開始了JUC的學習,感覺Java內容真的很多,但是為了能夠走的更遠,還是覺得應該需要打牢一下基礎。
最近在持續更新中,如果你覺得對你有所幫助,也感興趣的話,關注我吧,讓我們
一起學習,一起討論吧。
你好,我是博主寧在春,Java學習路上的一顆小小的種子,也希望有一天能扎根長成蒼天大樹。
希望與君共勉😁
我們:待別時相見時,都已有所成。
總結
以上是生活随笔為你收集整理的JUC系列(十) | ForkJoin框架 并行处理任务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java设计模式-外观模式
- 下一篇: SpringBoot 整合 Thymel