日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java并发编程学习5--forkJoin

發布時間:2024/4/13 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java并发编程学习5--forkJoin 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【概念

分支和并框架的目的是以遞歸的方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體的結果,它是ExecutorService的一個實現,它把子任務分配給線程池(ForkJoinPool)中的工作線程。某些應用可能對每個處理器內核飯別試用一個線程,來完成計算密集任務,例如圖像處理。java7引入forkjoin框架,專門用來支持這一類應用。假設有一個處理任務,它可以很自然的分解成子任務。

【使用方式

要把任務提交到線程池,必須創建RecursiveTask<R>的一個子類,其中R是并行化任務產生的結果(如果沒有結果使用RecursiveAction類型)。然后在子類中實現product abstract R compute()方法即可。這個方法同時實現了“拆分子任務”,“任務不可拆時”的處理邏輯。如下所示:

if(任務足夠小){順序計算該任務的值; }else{將任務分成兩個子任務;遞歸調用本方法;合并每個子任務的結果; }

【最佳實踐

  • 對一個任務調用join()方法會阻塞調用方,直到該任務結束。因此,有必要在兩個子任務的計算都開始之后再調用join()。否則,你得到的版本會比原始的順序執行更加緩慢,因為每個子任務都需要等到另一個子任務完成后才能開始計算,中途還要加上開啟線程的開銷。
  • 不應該在RecursiveTask<R>的內部使用ForkJoinPool.invoke(),相反你應該始終調用compute()或者fork(),只有順序代碼才使用ForkJoinPool.invoke()來啟動并行運算。
  • 對子任務調用fork()可以把他排進ForkJoinPool。同時對左邊和右邊的子任務調用它似乎很自然,但是這樣做的效率比直接對其中一個調用compute()低。對一個子任務調用compute()的話,可以使一個子任務重用當前線程,避免線程池中多分配一個任務帶來的開銷。
  • 不應該認為多核系統中,分支合并就比順序計算要快。一個任務可以分解成多個獨立的子任務,才能讓性能在并行化時有所提升。所有的子任務運行時間應該比分出新任務花費的時間要長。通常我們把輸入輸出放在一個方法中,計算在另一個方法中。
  • 【工作竊取

    我們很難確定要滿足什么條件才可以不再拆分任務。但是分出大量的小任務是一個好的選擇,因為在理想情況下,劃分并行任務時因該讓每個任務都花費相同的時間。讓cpu的所有內核都一樣的繁忙,但是現實中我們的子任務花費的時間大不相同,這是因為有許多我們無法確認的情況出現:io,rpc,分配效率等等。分支合并框架使用:工作竊取來解決內核之間任務不匹配的問題。讓所有任務大致相同的被平均分配到forkjoinpool的每個線程上。每個線程都擁有一個雙向鏈式隊列來保存它的任務,每完成一個任務就從隊列頭部取出下一個任務執行。當一個線程的任務隊列已空,而其他線程還在繁忙,這個空閑線程就隨機選擇一個繁忙線程并從其隊列尾部拿走一個任務開始執行,直到所有的任務執行完畢。

    【例子

    1.輸出數組中有多少個數字小于0.5

    public class ExerciseFilter {//數據源static double numbers[] = new double[100];static {for(int i = 0 ; i < 100 ; i++){numbers[i] = i + 1;}numbers[0] = 0.08;numbers[1] = 0.18;numbers[11] = 0.18;}public static void main(String[] args) {Counter counter = new Counter(numbers,x -> x < 0.5);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動并行任務pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class Counter extends RecursiveTask<Integer>{//分界線,當一個數組的長度 < 1000 就不再繼續拆分public static final int THRESHOLD = 1000;//數組private double[] values;//判斷條件private DoublePredicate filter;public Counter(double [] values,DoublePredicate filter){this.values = values;this.filter = filter;}@Overrideprotected Integer compute() {//任務足夠小就不再拆分if(values.length < THRESHOLD ){//返回該數組中有多少數字滿足判斷邏輯int count = 0;for(int i = 0; i < values.length ; i++){if(filter.test(values[i])){count++;}}return count;}else {//將打數組拆分成兩個int mid = values.length / 2;Counter first = new Counter(Arrays.copyOfRange(values,0,mid),filter);//第一個子任務提交到線程池first.fork();Counter second = new Counter(Arrays.copyOfRange(values,mid,values.length),filter);//當前線程執行第二個子任務,節約一個線程的開銷int secondResult = second.compute();//等待第一個子任務執行完畢int firstResult = first.join();return firstResult + secondResult;}} }

    2.列表中求和

    public class ExerciseSum {//數據源static int sum[] = new int[100];static {for(int i = 0 ; i < 100 ; i++){sum[i] = i + 1;}}public static void main(String[] args) {CounterSum counter = new CounterSum(sum);ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class CounterSum extends RecursiveTask<Integer> {//最小拆分單位:每個小數組length = 10public static final int THRESHOLD = 10;private int[] values;public CounterSum(int [] values){this.values = values;}@Overrideprotected Integer compute() {if(values.length < THRESHOLD ){int count = 0;for(int i = 0; i < values.length ; i++){count += values[i];}return count;}else {int mid = values.length / 2;CounterSum first = new CounterSum(Arrays.copyOfRange(values,0,mid));first.fork();CounterSum second = new CounterSum(Arrays.copyOfRange(values,mid,values.length));int secondResult = second.compute();int firstResult = first.join();return firstResult + secondResult;}} }

    3.排序

    public class ExerciseSort {//數據源static int num[] = new int[100];static {Random r = new Random();for(int i = 0 ; i < 100 ; i++){num[i] = r.nextInt(100);}}public static void main(String[] args) {CounterSort counter = new CounterSort(num);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動并行任務pool.invoke(counter);System.out.println((System.currentTimeMillis() - st));Arrays.stream(counter.join()).forEach(System.out::println);} }class CounterSort extends RecursiveTask<int[]> {//最小拆分單位:每個小數組length = 10public static final int THRESHOLD = 10;//待排序數組private int[] values;public CounterSort(int [] values){this.values = values;}@Overrideprotected int[] compute() {if(values.length < THRESHOLD ){int[] result = new int[values.length];//1.單數組排序result = Arrays.stream(values).sorted().toArray();return result;}else {int mid = values.length / 2;CounterSort first = new CounterSort(Arrays.copyOfRange(values,0,mid));first.fork();CounterSort second = new CounterSort(Arrays.copyOfRange(values,mid,values.length));int[] secondResult = second.compute();int[] firstResult = first.join();//兩個數組混合排序int[] combineRsult = combineIntArray(firstResult,secondResult);return combineRsult;}}private int[] combineIntArray(int[] a1,int[] a2){int result[] = new int[a1.length + a2.length];int a1Len = a1.length;int a2Len = a2.length;int destLen = result.length;for(int index = 0 , a1Index = 0 , a2Index = 0 ; index < destLen ; index++) {int value1 = a1Index >= a1Len?Integer.MAX_VALUE:a1[a1Index];int value2 = a2Index >= a2Len?Integer.MAX_VALUE:a2[a2Index];if(value1 < value2) {a1Index++;result[index] = value1;}else {a2Index++;result[index] = value2;}}return result;} }

    總結

    以上是生活随笔為你收集整理的java并发编程学习5--forkJoin的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。