日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

【java8】并行流Stream

發(fā)布時間:2023/12/8 55 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【java8】并行流Stream 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

流在處理數(shù)據(jù)進(jìn)行一些迭代操作的時候確認(rèn)很方便,但是在執(zhí)行一些耗時或是占用資源很高的任務(wù)時候,串行化的流無法帶來速度/性能上的提升,并不能滿足我們的需要。

通常我們會使用多線程來并行或是分片分解執(zhí)行任務(wù),而在Stream中也提供了這樣的并行方法,下面將會一一介紹這些方法。

將順序流轉(zhuǎn)為并行流

使用parallelStream()方法或者是使用stream().parallel()來轉(zhuǎn)化為并行流。

但是只是可能會返回一個并行的流,流是否能并行執(zhí)行還受到其他一些條件的約束(如是否有序,是否支持并行)。

對順序流調(diào)用parallel方法并不意味著流本身有任何實(shí)際的變化。它在內(nèi)部實(shí)際上就是設(shè)了一個boolean標(biāo)志,表示你想讓調(diào)用parallel之后進(jìn)行的所有操作都并行執(zhí)行。類似地,你只需要對并行流調(diào)用sequential方法就可以把它變成順序流。如果對這個方法調(diào)用了多次,將以最后一次執(zhí)行為準(zhǔn)。

package com.morris.java8.parallel;import java.util.concurrent.TimeUnit; import java.util.stream.IntStream;public class ParallerDemo {public static void main(String[] args) {IntStream list = IntStream.range(0, 6);//開始并行執(zhí)行list.parallel().forEach(i -> {Thread thread = Thread.currentThread();System.err.println("integer:" + i + "," + "currentThread:" + thread.getName());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}});} }

運(yùn)行結(jié)果如下:

integer:3,currentThread:main integer:4,currentThread:ForkJoinPool.commonPool-worker-3 integer:5,currentThread:ForkJoinPool.commonPool-worker-2 integer:1,currentThread:ForkJoinPool.commonPool-worker-1 integer:2,currentThread:ForkJoinPool.commonPool-worker-1 integer:0,currentThread:ForkJoinPool.commonPool-worker-3

從運(yùn)行結(jié)果里面我們可以很清楚的看到parallelStream同時使用了主線程和ForkJoinPool.commonPool創(chuàng)建的線程。 值得說明的是這個運(yùn)行結(jié)果并不是唯一的,實(shí)際運(yùn)行的時候可能會得到多個結(jié)果。

看看流的parallel方法,你可能會想,并行流用的線程是從哪兒來的?有多少個?怎么自定義這個過程呢?

并行流內(nèi)部使用了默認(rèn)的ForkJoinPool,它默認(rèn)的線程數(shù)量就是你的處理器數(shù)量,這個值是由Runtime.getRuntime().availableProcessors()得到的。

但是你可以通過系統(tǒng)屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小,如下所示:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

這是一個全局設(shè)置,因此它將影響代碼中所有的并行流。反過來說,目前還無法專為某個并行流指定這個值。一般而言,讓ForkJoinPool的大小等于處理器數(shù)量是個不錯的默認(rèn)值,除非你有很好的理由,否則我們強(qiáng)烈建議你不要修改它。

// 設(shè)置全局并行流并發(fā)線程數(shù) System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 12

為什么兩次的運(yùn)行結(jié)果是一樣的呢?上面剛剛說過了這是一個全局設(shè)置,java.util.concurrent.ForkJoinPool.common.parallelism是final類型的,整個JVM中只允許設(shè)置一次。既然默認(rèn)的并發(fā)線程數(shù)不能反復(fù)修改,那怎么進(jìn)行不同線程數(shù)量的并發(fā)測試呢?答案是:引入ForkJoinPool。

IntStream range = IntStream.range(1, 100000); // 傳入parallelism new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();

因此,使用parallelStream時需要注意的一點(diǎn)是,多個parallelStream之間默認(rèn)使用的是同一個線程池,所以IO操作盡量不要放進(jìn)parallelStream中,否則會阻塞其他parallelStream。

// 獲取當(dāng)前機(jī)器CPU處理器的數(shù)量 System.out.println(Runtime.getRuntime().availableProcessors());// 輸出 4 // parallelStream默認(rèn)的并發(fā)線程數(shù) System.out.println(ForkJoinPool.getCommonPoolParallelism());// 輸出 3

為什么parallelStream默認(rèn)的并發(fā)線程數(shù)要比CPU處理器的數(shù)量少1個?因?yàn)樽顑?yōu)的策略是每個CPU處理器分配一個線程,然而主線程也算一個線程,所以要占一個名額。 這一點(diǎn)可以從源碼中看出來:

static final int MAX_CAP = 0x7fff; // max #workers - 1 // 無參構(gòu)造函數(shù) public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false); }

測試流的性能

下面通過幾種方式計(jì)算數(shù)據(jù)的和來測試流的性能。

package com.morris.java8.parallel;import java.util.function.Function; import java.util.stream.LongStream; import java.util.stream.Stream;public class ParallerStreamExample {public static void main(String[] args) {long n = 100_000_000;System.out.println("normal:" + recordTime(ParallerStreamExample::normal, n) + " MS");System.out.println("iterator:" + recordTime(ParallerStreamExample::iterator, n) + " MS");// 太耗時,暫時注釋// System.out.println("iteratorParallel:" + recordTime(ParallerStreamExample::iteratorParallel, n) + " MS");System.out.println("longStream:" + recordTime(ParallerStreamExample::longStream, n) + " MS");System.out.println("longStreamParallel:" + recordTime(ParallerStreamExample::longStreamParallel, n) + " MS");}public static long recordTime(Function<Long, Long> function, long n) {long lowestCostTime = Long.MAX_VALUE;for (int i = 0; i < 10; i++) {long startTime = System.currentTimeMillis();function.apply(n);long costTime = System.currentTimeMillis() - startTime;if(costTime < lowestCostTime) {lowestCostTime = costTime;}}return lowestCostTime;}/*** 正常for循環(huán)* @param n* @return*/public static long normal(long n) {long result = 0;for(long i = 1; i <= n; i++) {result += i;}return result;}/*** iterate順序流* @param n* @return*/public static long iterator(long n) {return Stream.iterate(1L, t -> t + 1).limit(n).reduce(0L, Long::sum);}/*** iterate并行流* @param n* @return*/public static long iteratorParallel(long n) {return Stream.iterate(1L, t -> t + 1).parallel().limit(n).reduce(0L, Long::sum);}/*** rangeClosed順序流* @param n* @return*/public static long longStream(long n) {return LongStream.rangeClosed(1, n).sum();}/*** rangeClosed并行流* @param n* @return*/public static long longStreamParallel(long n) {return LongStream.rangeClosed(1, n).parallel().sum();} }

運(yùn)行結(jié)果如下:

normal:33 MS iterator:990 MS longStream:44 MS longStreamParallel:16 MS

結(jié)論:

  • Stream串行性能明顯差于for循環(huán)迭代,因?yàn)镾tream串行還有流水線成本在里面。

  • 并行的Stream API能夠發(fā)揮多核特性,但是有時候不如串行流(比如后面的計(jì)算依賴前面的計(jì)算結(jié)果就不適宜用并行流)

高效使用并行流

下面是一些使用并行流需要思考的方面:

  • 留意裝箱。自動裝箱和拆箱操作會大大降低性能。Java 8中有原始類型流(IntStream、LongStream、DoubleStream)來避免這種操作,但凡有可能都應(yīng)該用這些流。

  • 有些操作本身在并行流上的性能就比順序流差,比如后面的計(jì)算依賴前面的計(jì)算結(jié)果。

  • 還要考慮流的操作流水線的總計(jì)算成本。設(shè)N是要處理的元素的總數(shù),Q是一個元素通過流水線的大致處理成本,則N*Q就是這個對成本的一個粗略的定性估計(jì)。Q值較高就意味著使用并行流時性能好的可能性比較大。

  • 對于較小的數(shù)據(jù)量,選擇并行流幾乎從來都不是一個好的決定。并行處理少數(shù)幾個元素的好處還抵不上并行化造成的額外開銷。

  • 要考慮流背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因?yàn)榍罢哂貌恢闅v就可以平均拆分,而后者則必須遍歷。

  • 流自身的特點(diǎn),以及流水線中的中間操作修改流的方式,都可能會改變分解過程的性能。例如,一個SIZED流可以分成大小相等的兩部分,這樣每個部分都可以比較高效地并行處理,但篩選操作可能丟棄的元素個數(shù)卻無法預(yù)測,導(dǎo)致流本身的大小未知。

  • 還要考慮終端操作中合并步驟的代價是大是小(例如Collector中的combiner方法)。如果這一步代價很大,那么組合每個子流產(chǎn)生的部分結(jié)果所付出的代價就可能會超出通過并行流得到的性能提升。

總結(jié)

以上是生活随笔為你收集整理的【java8】并行流Stream的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。