日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分支合并 Fork-Join 框架

發布時間:2025/3/15 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分支合并 Fork-Join 框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、什么是 Fork-Join

Fork/Join框架是Java7提供了的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,這種開發方法也叫分治編程。分治編程可以極大地利用CPU資源,提高任務執行的效率,也是目前與多線程有關的前沿技術。

框架圖:

  • fork():利用另一個 ForkJoinPool 線程異步執行新創建的子任務
  • join():讀取第一個子任務的結果,尚未完成就等待

二、傳統的分治編程會遇到什么問題

分治的原理上面已經介紹了,就是切割大任務成小任務來完成。看起來好像也不難實現啊!為什么專門弄一個新的框架呢?
我們先看一下,在不使用 Fork-Join 框架時,使用普通的線程池是怎么實現的。

  • 我們往一個線程池提交了一個大任務,規定好任務切割的閥值。
  • 由線程池中線程(假設是線程A)執行大任務,發現大任務的大小大于閥值,于是切割成兩個子任務,并調用 submit()提交到線程池,得到返回的子任務的 Future。
  • 線程A就調用返回的 Future 的 get() 方法阻塞等待子任務的執行結果。
  • 池中的其他線程(除線程A外,線程A被阻塞)執行兩個子任務,然后判斷子任務的大小有沒有超過閥值,如果超過,則按照步驟2繼續切割,否則,才計算并返回結果。
  • 看起來一切都很美好。真的嗎?別忘了, 每一個切割任務的線程(如線程A)都被阻塞了,直到其子任務完成,才能繼續往下運行 。如果任務太大了,需要切割多次,那么就會有多個線程被阻塞,性能將會急速下降。更糟糕的是,如果你的線程池的線程數量是有上限的,極可能會造成池中所有線程被阻塞,線程池無法執行任務。

    三、普通線程池實現分治時阻塞的問題

    public class NormalThreadPoolDivideAndConquer {//固定大小的線程池,池中線程數量為3static ExecutorService fixPoolExecutors = Executors.newFixedThreadPool(3);public static void main(String[] args) throws InterruptedException, ExecutionException {//計算 1+2+...+10 的結果CountTaskCallable task = new CountTaskCallable(1,10);//提交主人翁Future<Integer> future = fixPoolExecutors.submit(task);System.out.println("計算的結果:"+future.get());} } class CountTaskCallable implements Callable<Integer> {//設置閥值為2private static final int THRESHOLD = 2;private int start;private int end;public CountTaskCallable(int start, int end) {super();this.start = start;this.end = end;}@Overridepublic Integer call() throws Exception {int sum = 0;//判斷任務的大小是否超過閥值,也即是兩個相加的數的差值不能大于2,在這里意味著需要分為大于4個子任務進行計算,而線程池只有3個,機會造成阻塞boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {System.out.println("切割的任務:"+start+"加到"+end+" 執行此任務的線程是 "+Thread.currentThread().getName());int middle = (start + end) / 2;CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);// 將子任務提交到線程池中Future<Integer> leftFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(leftTaskCallable);Future<Integer> rightFuture = NormalThreadPoolDivideAndConquer.fixPoolExecutors.submit(rightTaskCallable);//阻塞等待子任務的執行結果int leftResult = leftFuture.get();int rightResult = rightFuture.get();// 合并子任務的執行結果sum = leftResult + rightResult;}return sum;} }
    • 運行結果:
    切割的任務:1加到10 執行此任務的線程是 pool-1-thread-1 切割的任務:1加到5 執行此任務的線程是 pool-1-thread-2 切割的任務:6加到10 執行此任務的線程是 pool-1-thread-3

    池的線程只有三個,當任務分割了三次后,池中的線程也就都被阻塞了,無法再執行任何任務,一直卡著動不了。為了解決這個問題,工作竊取算法呼之欲出

    四、工作竊取算法

    針對上面的問題,Fork-Join 框架使用了“工作竊取(work-stealing)”算法。工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。在《Java 并發編程的藝術》對工作竊取算法的解釋:

    使用工作竊取算法有什么優勢呢?

    • 假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
  • Fork-Join 框架使用工作竊取算法
  • Fork-Join 框架的線程池ForkJoinPool 的任務分為“外部任務” 和 “內部任務”。
  • “外部任務”是放在ForkJoinPool 的全局隊列里;
  • ForkJoinPool 池中的每個線程都維護著一個內部隊列,用于存放“內部任務”。
  • 線程切割任務得到的子任務就會作為“內部任務”放到內部隊列中。
  • 當此線程要想要拿到子任務的計算結果時,先判斷子任務有沒有完成,如果沒有完成,則再判斷子任務有沒有被其他線程“竊取”,一旦子任務被竊取了則去執行本線程“內部隊列”的其他任務,或者掃描其他的任務隊列,竊取任務,如果子任務沒有被竊取,則由本線程來完成。
  • 最后,當線程完成了其“內部任務”,處于空閑的狀態時,就會去掃描其他的任務隊列,竊取任務
  • 工作竊取算法的優點
    Fork-Join 框架中的工作竊取算法的優點可以總結為以下兩點:
  • 線程是不會因為等待某個子任務的完成或者沒有內部任務要執行而被阻塞等待、掛起,而是會掃描所有的隊列,竊取任務,直到所有隊列都為空時,才會被掛起。
  • Fork-Join框架在多CPU的環境下,能提供很好的并行性能。在使用普通線程池的情況下,當CPU不再是性能瓶頸時,能并行地運行多個線程,然而卻因為要互斥訪問一個任務隊列而導致性能提高不上去。(所以ForkJoin適合在多核環境下,單核環境使用ForkJoin沒什么意思。)而 Fork-Join框架為每個線程為維護著一個內部任務隊列,以及一個全局的任務隊列,而且任務隊列都是雙向隊列,可從首尾兩端來獲取任務,極大地減少了競爭的可能性,提高并行的性能。
  • 五、Fork-Join 框架的使用介紹

  • Fork/Join有三個核心類:
    • ForkJoinPool: 執行任務的線程池,繼承了 AbstractExecutorService 類。
    • ForkJoinWorkerThread:執行任務的工作線程(即ForkJoinPool線程池里的線程)。每個線程都維護著一個內部隊列,用于存放“內部任務”。繼承了 Thread類。
    • ForkJoinTask: 一個用于ForkJoinPool的任務抽象類。實現了 Future 接口

    因為ForkJoinTask比較復雜,抽象方法比較多,日常使用時一般不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類,實現 compute() 方法:

    • RecursiveTask: 子任務帶返回結果時使用
    • RecursiveAction: 子任務不帶返回結果時使用

    compute 方法的實現模式一般是:

    if 任務足夠小直接返回結果 else分割成N個子任務依次調用每個子任務的fork方法執行子任務依次調用每個子任務的join方法合并執行結果

    六、Fork-Join 例子演示

    • 計算 1+2+…+12 的結果。

    使用Fork/Join框架首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那么我們設置分割的閾值是2,由于是12個數字相加。同時,觀察執行任務的線程名稱,理解工作竊取算法的實現。

    public class CountTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool forkJoinPool = new ForkJoinPool();//創建一個計算任務,計算 由1加到12CountTask countTask = new CountTask(1, 12);Future<Integer> future = forkJoinPool.submit(countTask);System.out.println("最終的計算結果:" + future.get());} }class CountTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= THRESHOLD;//任務已經足夠小,可以直接計算,并返回結果if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}System.out.println("執行計算任務,計算 " + start + "到 " + end + "的和 ,結果是:" + sum + " 執行此任務的線程:" + Thread.currentThread().getName());} else { //任務過大,需要切割System.out.println("任務過大,切割的任務: " + start + "加到 " + end + "的和 執行此任務的線程:" + Thread.currentThread().getName());int middle = (start + end) / 2;//切割成兩個子任務CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);//執行子任務leftTask.fork();rightTask.fork();//等待子任務的完成,并獲取執行結果int leftResult = leftTask.join();int rightResult = rightTask.join();//合并子任務sum = leftResult + rightResult;}return sum;} }
    • 運行結果:
    任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-1 任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-3 任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool-1-worker-2 執行計算任務,計算 79的和 ,結果是:24 執行此任務的線程:ForkJoinPool-1-worker-3 執行計算任務,計算 13的和 ,結果是:6 執行此任務的線程:ForkJoinPool-1-worker-1 執行計算任務,計算 46的和 ,結果是:15 執行此任務的線程:ForkJoinPool-1-worker-1 執行計算任務,計算 1012的和 ,結果是:33 執行此任務的線程:ForkJoinPool-1-worker-3 最終的計算結果:78

    從結果可以看出:

    提交的計算任務是由線程1執行,線程1進行了第一次切割,切割成兩個子任務 “7加到12“ 和
    ”1加到6“,并提交這兩個子任務。然后這兩個任務便被 線程2、線程3 給竊取了。線程1 的內部隊列中已經沒有任務了,這時候,線程2、線程3
    也分別進行了一次任務切割并各自提交了兩個子任務,于是線程1也去竊取任務(這里竊取的都是線程2的子任務)。

    • RecursiveAction 演示
      遍歷指定目錄(含子目錄)找尋指定類型文件
    public class FindDirsFiles extends RecursiveAction{/*** 當前任務需要搜尋的目錄*/private File path;public FindDirsFiles(File path) {this.path = path;}public static void main(String [] args){try {// 用一個 ForkJoinPool 實例調度總任務ForkJoinPool pool = new ForkJoinPool();FindDirsFiles task = new FindDirsFiles(new File("D:/"));//異步調用pool.execute(task);System.out.println("Task is Running......");Thread.sleep(1);int otherWork = 0;for(int i=0;i<1000000;i++){otherWork = otherWork+i;}System.out.println("Main Thread done sth......,otherWork=" + otherWork);//阻塞的方法task.join();System.out.println("Task end");} catch (Exception e) {e.printStackTrace();}}@Overrideprotected void compute() {List<FindDirsFiles> subTasks = new ArrayList<>();File[] files = path.listFiles();if(files!=null) {for(File file:files) {if(file.isDirectory()) {subTasks.add(new FindDirsFiles(file));}else {//遇到文件,檢查if(file.getAbsolutePath().endsWith("txt")) {System.out.println("文件:"+file.getAbsolutePath());}}}if(!subTasks.isEmpty()) {for (FindDirsFiles subTask : invokeAll(subTasks)) {//等待子任務執行完成subTask.join();}}} } }

    參考文章
    參考文章

    總結

    以上是生活随笔為你收集整理的分支合并 Fork-Join 框架的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。