7. Java8新特性-并行数据处理(parallel)
在JDK7之前,并行處理數據集合非常麻煩。首先需要自己明確的把包含數據的數據結構分成若干個子部分,第二需要給每個子部分分配一個獨立的線程;第三需要在恰當的時候對它們進行同步來避免不希望出現的競爭條件,等待所有線程完成,最后把這些部分合并起來。
Doug Lea 在JDK7中引入了fork/join框架,讓這些操作更穩定,更不易出錯。
本節主要內容:
1. 用并行流并行處理數據
2. 并行流的性能分析
3. fork/join框架
4. 使用Spliterator分割流
學完本節期望能達到:
1. 熟練使用并行流,來加速業務性能
2. 了解流內部的工作原理,以防止誤用的情況
3. 通過Spliterator控制數據塊的劃分方式
并行流
可以通過對數據源調用parallelStream方法來將源轉換為并行流。并行流就是一個把內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流。這樣可以自動將工作負荷轉到多核中并行處理。
考慮下面一個實現:給定正整數n,計算 1 + 2 + … n的和。
使用stream的實現:
將上面的順序流轉換為并行流,實現如下:
private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }即通過調用方法parallel可將順序流轉換為并行流。
但需要注意的是流僅在終端操作時才開始執行,所以當前流是順序流還是并行流以最靠近終端操作的流類型為準,示例:
list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);此種情況并不會按預想的先使用并行流執行過濾,再按順序流執行映射轉換。而是整個流水線操作都按并行流執行。
配置并行流使用的線程池
并行流內部使用了默認的ForkJoinPool, 它默認的線程數量就是處理器的數量(Runtime.getRuntime().availableProcessors())。也可以通過設置系統屬性來改變它(System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”, “12”))。但它是一個全局設置,會影響所有的并行流,一般而言線程數等于處理器數量是一個合理的數值,不需要修改。
測試流性能
一般而言,同一個功能給我們的感覺是并行流性能會比順序流性能更好。然而在軟件工程中,優化性能的黃金準則是:測量。我們開發了程序,用來測量4種寫法的累加,看看性能如何:
@Slf4j public class SumSample {/*** 順序流、并行流性能測試* 實現1~1億整型數字累加**/public static void main(String[] args) {CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000)));log.info("================================================================================");CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000)));}/*** 內部迭代方式實現累加*/private static long forSum(long n) {long result = 0;for (int i = 1; i <= n; i ++) {result += i;}return result;}/*** 順序流實現累加*/private static long sequentialSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);}/*** 并行流實現累加*/private static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);}/*** long原生流范圍實現累加*/private static long longParallelSum(long n) {return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum);} } // result: 2022-01-18 10:53:59.035 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 58 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 1420 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.627 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 4167 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 60使用四種方法實現1~1億個數的累加,這是在i7 2.4GHz 6core/12threads CPU的執行結果。讓人很意外,并非是并行流性能最好,反而是最差的,最樸實的for循環單線程性能最佳。
原因:
通過上面的比較需要意識到:并行編程比較復雜,有時候甚至違反直覺。如果用的不對(如本例,采用了一個不易并行化的操作iterate),甚至會讓性能更差。所以了解parallel方法背后的執行細節非常必要。
LongStream.rangeClosed 代替 iterate
僅高效求和的示例,可用LongStream.rangeClosed高效替代iterate實現并行計算。它的優點是:
通過示例演示它的并行執行性能比同樣是并行流的iterate版本要快了70倍。可見它有效利用了并行。
為什么并行流還是比for慢?
上面的執行結果可以看出LongStream.rangeClosed的性能還是比for略慢一點,原因是:
并行化是有代價的,并行過程中需要對流做遞歸劃分,把流的歸納操作分配到不同的線程,最后合并。且多個核心之間移動數據的代價也很大。
正確使用并行流
使用并行流加速性能需要確保用對,如果計算結果是錯誤的,再快也沒意義。
誤用并行流而產生錯誤的首要原因是使用的算法改變了某些共享狀態。 如下面示例:
從上面示例看出雖然很快,但結果是錯誤的。 原因是total += value非原子操作,出現了競態條件。如果使用同步來修復,就失去了并行的意義。 所以寫并行流時一定要考慮多個線程是否會修改共享對象的可變狀態。
高效使用并行流
一些高效使用并行流的建議:
一些常見的數據源的可分解性匯總:
Fork/Join框架
想要正確的使用并行流,了解它背后的實現原理至關重要。 并行流背后就是采用的Fork/Join框架。
// TODO: 待補充
Spliterator
// TODO: 待補充
小結
行為和性能有時是違反直覺的,因此一定要測量,確保你并沒有把程序拖得更慢。
或處理單個元素特別耗時的時候。
總是比嘗試并行化某些操作更為重要。
上執行,然后將各個子任務的結果合并起來生成整體結果。
總結
以上是生活随笔為你收集整理的7. Java8新特性-并行数据处理(parallel)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: localStorage储存如何正确存储
- 下一篇: Java中的parallelStream