日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并行数据处理与性能详解与ForkJoin框架

發布時間:2025/1/21 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并行数据处理与性能详解与ForkJoin框架 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 一、并行流
      • 1、將順序流轉換成并行流
      • 2、測量流的性能
    • 二、分之/合并框架ForkJoinPool
      • 1、使用RecursiveTask
    • 三、Spliterator

本章節可以讓你用Stream接口不費力氣就能對數據集執行并行操作,可以聲明性的講順序流變成并行流。

一、并行流

Stream接口可以調用方法parallelStream很容易把集合轉換為并行流。所謂并行流就是把內容分成多個數據塊,用不同線程處理每塊數據。

1、將順序流轉換成并行流

可以將流轉換成并行流,調用方法parallel。例:

public static long parallelSum(long n) {return Stream.iterate(1L,i -> i+1).limit(n).parallel().reduce(0L,Long::sum);}

如果從并行流變成順序流可以調用sequential這個方法完成。

stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();

2、測量流的性能

按常理并行求和方法應該比迭代方法性能好。然而在軟件工程上,靠猜是絕對不行的,因此我們要進行實戰看結果。

public class ParallelStreams {public static long measureSumPerf(Function<Long, Long> adder, long n) {long fastest = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long start = System.nanoTime();long sum = adder.apply(n);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("Result: " + sum);if (duration < fastest) fastest = duration;}return fastest;}public static long iterativeSum(long n) {long result = 0;for (long i = 1L; i <= n; i++) {result += i;}return result;}public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}public static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}public static void main(String[] args) {System.out.println("Sequential sum done in:" +measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");System.out.println("Iterative sum done in:" +measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );} }

運行結果:

運行結果相當令人失望,求和方法并行是順序版本的將近10倍,為什么會出現這樣的結果,實際上有兩個問題:
1、iterate生成是裝箱的對象,必須拆箱才能求和。
2、很難把iterate分成多個獨立塊來并行執行。iterate很難分割成能獨立執行的小塊,因為每次應用這個函數都要 依賴前一次應用執行結果。

使用有針對性的方法,避免裝箱拆箱操作和能分成獨立塊并行。可以使用,LongStream.rangeClosed與iterate相比有兩個優點。
1、產生原始long數字,沒有裝箱拆箱操作。
2、會生成數字范圍,很容易拆成小塊。
例:

public static long rangedSum(long n) {return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);}public static long parallelRangedSum(long n) {return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);}System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::rangedSum, 10_000_000) + " msecs" );System.out.println("Parallel sum done in: " +measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs" );

執行結果:

得到結果終于比順序執行快,我們使用并行流的時候一定要正確使用,比如算法改變了某些共享狀態。

二、分之/合并框架ForkJoinPool

分之/合并框架的目的是有遞歸方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果。ForkJoinPool是ExecutorService的一個實現,它把子任務分配給線程池(稱為ForkJoinPool)中的工作線程。

1、使用RecursiveTask

要把任務提交到線程池中,必須創建RecursiveTask的子類,重寫compute方法。R是并行化產生的結果類型。如果任務不返回結果用RecursiveAction。
protected abstract R compute();這個方法同時定義了將任務拆分成子任務的邏輯,和任務無法在拆分時,生成單個任務邏輯。

if (任務足夠小或不可分) {
順序計算該任務
} else {
將任務分成兩個子任務
遞歸調用本方法,拆分每個子任務,等待所有子任務完成
合并每個子任務的結果
}

用分之/合并框架并行求和

public class ForkJoinSumCalculator extends RecursiveTask<Long> {private final long[] numbers;private final int start;private final int end;public static final long THRESHOLD = 10_000;public ForkJoinSumCalculator(long[] numbers) {this(numbers,0,numbers.length);}private ForkJoinSumCalculator(long[] numbers, int start, int end) {this.numbers = numbers;this.start = start;this.end = end;}@Overrideprotected Long compute() {//該任務負責求和的部分大小int length = end - start;//如果大小小于或等于閾值,順序執行結果if (length <= THRESHOLD) {return computeSequentially();}//創建一個子任務為數組的前一半求和ForkJoinSumCalculator leftTask =new ForkJoinSumCalculator(numbers, start, start + length/2);//利用另一個ForkJoinPool線程異步執行創建子任務leftTask.fork();//創建一個數組另一半求和ForkJoinSumCalculator rightTask =new ForkJoinSumCalculator(numbers, start + length/2, end);Long rightResult = rightTask.compute();//讀取第一個子任務結果,如果尚未完成就等待Long leftResult = leftTask.join();return leftResult + rightResult;}private long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {{sum += numbers[i];}}return sum;}public static long forkJoinSum(long n) {long[] numbers = LongStream.rangeClosed(1, n).toArray();ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);return new ForkJoinPool().invoke(task);}public static void main(String[] args) {System.out.println(forkJoinSum(10000000));} }

使用分之/合并框架需注意幾點:
1、對于一個任務調用join方法會阻塞調用方,直到該任務結束。因此,在兩個子任務的計算開始之后 再調用它。
2、不在RecursiveTask子類內部使用invoke方法

三、Spliterator

Spliterator是java8中加入的另一個新接口,字面意思是可分迭代器,主要作用于并行執行。

public interface Spliterator<T> {、//按順序一個一個使用Spliterator元素,如果有其它元素遍歷返回trueboolean tryAdvance(Consumer<? super T> action);//可以把元素劃分出去分給第二個SpliteratorSpliterator<T> trySplit();//還剩多少元素要遍歷long estimateSize();//特性int characteristics(); }

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的并行数据处理与性能详解与ForkJoin框架的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。