分支合并 Fork-Join 框架
一、什么是 Fork-Join
Fork/Join框架是Java7提供了的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,這種開發方法也叫分治編程。分治編程可以極大地利用CPU資源,提高任務執行的效率,也是目前與多線程有關的前沿技術。
框架圖:
- fork():利用另一個 ForkJoinPool 線程異步執行新創建的子任務
- join():讀取第一個子任務的結果,尚未完成就等待
二、傳統的分治編程會遇到什么問題
分治的原理上面已經介紹了,就是切割大任務成小任務來完成。看起來好像也不難實現啊!為什么專門弄一個新的框架呢?
我們先看一下,在不使用 Fork-Join 框架時,使用普通的線程池是怎么實現的。
看起來一切都很美好。真的嗎?別忘了, 每一個切割任務的線程(如線程A)都被阻塞了,直到其子任務完成,才能繼續往下運行 。如果任務太大了,需要切割多次,那么就會有多個線程被阻塞,性能將會急速下降。更糟糕的是,如果你的線程池的線程數量是有上限的,極可能會造成池中所有線程被阻塞,線程池無法執行任務。
三、普通線程池實現分治時阻塞的問題
public class NormalThreadPoolDivideAndConquer {//固定大小的線程池,池中線程數量為3static ExecutorService fixPoolExecutors = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException, ExecutionException {//計算 1+2+...+10 的結果CountTaskCallable task = new CountTaskCallable(1,10);//提交主人翁Future<Integer> future = fixPoolExecutors.submit(task);System.out.println("計算的結果:"+future.get());} } class CountTaskCallable implements Callable<Integer> {//設置閥值為2private static final int THRESHOLD = 2;private int start;private int end;public CountTaskCallable(int start, int end) {super();this.start = start;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;//判斷任務的大小是否超過閥值,也即是兩個相加的數的差值不能大于2,在這里意味著需要分為大于4個子任務進行計算,而線程池只有3個,機會造成阻塞boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {System.out.println("切割的任務:"+start+"加到"+end+" 執行此任務的線程是 "+Thread.currentThread().getName());int middle = (start + end) / 2;CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);// 將子任務提交到線程池中Future<Integer> leftFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(leftTaskCallable);Future<Integer> rightFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(rightTaskCallable);//阻塞等待子任務的執行結果int leftResult = leftFuture.get();int rightResult = rightFuture.get();// 合并子任務的執行結果sum = leftResult + rightResult;}return sum;} }- 運行結果:
池的線程只有三個,當任務分割了三次后,池中的線程也就都被阻塞了,無法再執行任何任務,一直卡著動不了。為了解決這個問題,工作竊取算法呼之欲出
四、工作竊取算法
針對上面的問題,Fork-Join 框架使用了“工作竊取(work-stealing)”算法。工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。在《Java 并發編程的藝術》對工作竊取算法的解釋:
使用工作竊取算法有什么優勢呢?
- 假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
Fork-Join 框架中的工作竊取算法的優點可以總結為以下兩點:
五、Fork-Join 框架的使用介紹
- ForkJoinPool: 執行任務的線程池,繼承了 AbstractExecutorService 類。
- ForkJoinWorkerThread:執行任務的工作線程(即ForkJoinPool線程池里的線程)。每個線程都維護著一個內部隊列,用于存放“內部任務”。繼承了 Thread類。
- ForkJoinTask: 一個用于ForkJoinPool的任務抽象類。實現了 Future 接口
因為ForkJoinTask比較復雜,抽象方法比較多,日常使用時一般不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類,實現 compute() 方法:
- RecursiveTask: 子任務帶返回結果時使用
- RecursiveAction: 子任務不帶返回結果時使用
compute 方法的實現模式一般是:
if 任務足夠小直接返回結果 else分割成N個子任務依次調用每個子任務的fork方法執行子任務依次調用每個子任務的join方法合并執行結果六、Fork-Join 例子演示
- 計算 1+2+…+12 的結果。
使用Fork/Join框架首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那么我們設置分割的閾值是2,由于是12個數字相加。同時,觀察執行任務的線程名稱,理解工作竊取算法的實現。
public class CountTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool forkJoinPool = new ForkJoinPool();//創建一個計算任務,計算 由1加到12CountTask countTask = new CountTask(1, 12);Future<Integer> future = forkJoinPool.submit(countTask);System.out.println("最終的計算結果:" + future.get());} }class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= THRESHOLD;//任務已經足夠小,可以直接計算,并返回結果if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());} else { //任務過大,需要切割System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());int middle = (start + end) / 2;//切割成兩個子任務CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);//執行子任務leftTask.fork();rightTask.fork();//等待子任務的完成,并獲取執行結果int leftResult = leftTask.join();int rightResult = rightTask.join();//合并子任務sum = leftResult + rightResult;}return sum;} }- 運行結果:
從結果可以看出:
提交的計算任務是由線程1執行,線程1進行了第一次切割,切割成兩個子任務 “7加到12“ 和
”1加到6“,并提交這兩個子任務。然后這兩個任務便被 線程2、線程3 給竊取了。線程1 的內部隊列中已經沒有任務了,這時候,線程2、線程3
也分別進行了一次任務切割并各自提交了兩個子任務,于是線程1也去竊取任務(這里竊取的都是線程2的子任務)。
- RecursiveAction 演示
遍歷指定目錄(含子目錄)找尋指定類型文件
參考文章
參考文章
總結
以上是生活随笔為你收集整理的分支合并 Fork-Join 框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 产品经理,如何降噪学习?
- 下一篇: 5G零售行业应用白皮书