日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流

發布時間:2024/2/28 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載自?https://blog.csdn.net/weixin_41404773/article/details/80733324

什么是Fork/Join框架
????????Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

?????????我們再通過Fork和Join這兩個單詞來理解下Fork/Join框架,Fork就是把一個大任務切分為若干子任務并行的執行,Join就是合并這些子任務的執行結果,最后得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終匯總這10個子任務的結果。Fork/Join的運行流程圖如下:

工作竊取算法
?????????工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。工作竊取的運行流程圖如下:


????????
????????那么為什么需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
?????

????????工作竊取算法的優點是充分利用線程進行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。并且消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。

ForkJoinPool
????????Java提供了ForkJoinPool來支持將一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合成總的計算結果。

????????ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。ForkJoinPool提供了如下兩個常用的構造器。

?public ForkJoinPool(int parallelism):創建一個包含parallelism個并行線程的ForkJoinPool
?public ForkJoinPool() :以Runtime.getRuntime().availableProcessors()的返回值作為parallelism來創建ForkJoinPool
?????????創建ForkJoinPool實例后,可以釣魚ForkJoinPool的submit(ForkJoinTask<T> task)或者invoke(ForkJoinTask<T> task)來執行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。ForkJoinTask是一個抽象類,它有兩個抽象子類:RecursiveAction和RecursiveTask。

RecursiveTask代表有返回值的任務
RecursiveAction代表沒有返回值的任務。


RecursiveAction
下面以一個沒有返回值的大任務為例,介紹一下RecursiveAction的用法。

大任務是:打印0-100的數值。

小任務是:每次只能打印20個數值。

代碼執行


package com.example.jedis.test;

import java.util.concurrent.RecursiveAction;

/**
?*
?* @Author : Wukn
?* @Date : 2018/2/5
?*/
public class RaskDemo extends RecursiveAction {
? ? /**
? ? ?* ?每個"小任務"最多只打印20個數
? ? ? */
? ? private static final int MAX = 20;

? ? private int start;
? ? private int end;

? ? public RaskDemo(int start, int end) {
? ? ? ? this.start = start;
? ? ? ? this.end = end;
? ? }

? ? @Override
? ? protected void compute() {
? ? ? ? //當end-start的值小于MAX時,開始打印
? ? ? ? if((end-start) < MAX) {
? ? ? ? ? ? for(int i= start; i<end;i++) {
? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName()+"i的值"+i);
? ? ? ? ? ? }
? ? ? ? }else {
? ? ? ? ? ? // 將大任務分解成兩個小任務
? ? ? ? ? ? int middle = (start + end) / 2;
? ? ? ? ? ? RaskDemo left = new RaskDemo(start, middle);
? ? ? ? ? ? RaskDemo right = new RaskDemo(middle, end);
? ? ? ? ? ? left.fork();
? ? ? ? ? ? right.fork();
? ? ? ? }
? ? }
}

public static void main(String[] args) throws Exception{
? ? // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool
? ? ForkJoinPool forkJoinPool = new ForkJoinPool();

? ? // 提交可分解的PrintTask任務
? ? forkJoinPool.submit(new RaskDemo(0, 1000));

? ? //阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束
? ? forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);

? ? // 關閉線程池
? ? forkJoinPool.shutdown();
}


運行結果

從上面結果來看,ForkJoinPool啟動了四個線程來執行這個打印任務,我的計算機的CPU是四核的。大家還可以看到程序雖然打印了0-999這一千個數字,但是并不是連續打印的,這是因為程序將這個打印任務進行了分解,分解后的任務會并行執行,所以不會按順序打印。

RecursiveTask
下面以一個有返回值的大任務為例,介紹一下RecursiveTask的用法。

大任務是:計算隨機的1000個數字的和。

小任務是:每次只能70個數值的和。


package com.example.jedis.test;

import java.util.concurrent.RecursiveTask;

/**
?*
?* @Author : Wukn
?* @Date : 2018/2/5
?*/
public class RecursiveTaskDemo extends RecursiveTask<Integer> {

? ? /**
? ? ?* ?每個"小任務"最多只打印70個數
? ? ?*/
? ? private static final int MAX = 70;
? ? private int arr[];
? ? private int start;
? ? private int end;


? ? public RecursiveTaskDemo(int[] arr, int start, int end) {
? ? ? ? this.arr = arr;
? ? ? ? this.start = start;
? ? ? ? this.end = end;
? ? }

? ? @Override
? ? protected Integer compute() {
? ? ? ? int sum = 0;
? ? ? ? // 當end-start的值小于MAX時候,開始打印
? ? ? ? if((end - start) < MAX) {
? ? ? ? ? ? for (int i = start; i < end; i++) {
? ? ? ? ? ? ? ? sum += arr[i];
? ? ? ? ? ? }
? ? ? ? ? ? return sum;
? ? ? ? }else {
? ? ? ? ? ? System.err.println("=====任務分解======");
? ? ? ? ? ? // 將大任務分解成兩個小任務
? ? ? ? ? ? int middle = (start + end) / 2;
? ? ? ? ? ? RecursiveTaskDemo left = new RecursiveTaskDemo(arr, start, middle);
? ? ? ? ? ? RecursiveTaskDemo right = new RecursiveTaskDemo(arr, middle, end);
? ? ? ? ? ? // 并行執行兩個小任務
? ? ? ? ? ? left.fork();
? ? ? ? ? ? right.fork();
? ? ? ? ? ? // 把兩個小任務累加的結果合并起來
? ? ? ? ? ? return left.join() + right.join();
? ? ? ? }
? ? }


}
? ? @Test
? ? public void dfs() throws Exception{
? ? ? ? int arr[] = new int[1000];
? ? ? ? Random random = new Random();
? ? ? ? int total = 0;
? ? ? ? // 初始化100個數字元素
? ? ? ? for (int i = 0; i < arr.length; i++) {
? ? ? ? ? ? int temp = random.nextInt(100);
? ? ? ? ? ? // 對數組元素賦值,并將數組元素的值添加到total總和中
? ? ? ? ? ? total += (arr[i] = temp);
? ? ? ? }
? ? ? ? System.out.println("初始化時的總和=" + total);

? ? ? ? // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的并行線程的ForkJoinPool
? ? ? ? ForkJoinPool forkJoinPool = new ForkJoinPool();

? ? ? ? // 提交可分解的PrintTask任務
// ? ? ? ?Future<Integer> future = forkJoinPool.submit(new RecursiveTaskDemo(arr, 0, arr.length));
// ? ? ? ?System.out.println("計算出來的總和="+future.get());


? ? ? ? Integer integer = forkJoinPool.invoke( new RecursiveTaskDemo(arr, 0, arr.length) ?);
? ? ? ? System.out.println("計算出來的總和=" + integer);

? ? ? ? // 關閉線程池
? ? ? ? forkJoinPool.shutdown();
? ? }


從上面結果來看,ForkJoinPool將任務分解了15次,程序通過SumTask計算出來的結果,和初始化數組時統計出來的總和是相等的,這表明計算結果一切正常。

總結

第一步分割任務 ?

????????首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小。

第二步執行任務并合并結果。

??????分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。

能夠輕松的利用多個 CPU 提供的計算資源來協作完成一個復雜的計算任務,提高運行效率!

java8新的寫法


/************************************** ?并行流 與 順序流 ?******************************************************/

? ? /**
? ? ?*并行流 與 順序流
? ? ?*/
? ? @Test
? ? public void test03() {

? ? ? ? Instant start = Instant.now();
? ? ? ? LongStream.rangeClosed( 0,110 )
? ? ? ? ? ? ? ? //并行流
? ? ? ? ? ? ? ? .parallel()
? ? ? ? ? ? ? ? .reduce( 0,Long::sum );


? ? ? ? LongStream.rangeClosed( 0,110 )
? ? ? ? ? ? ? ? //順序流
? ? ? ? ? ? ? ? .sequential()
? ? ? ? ? ? ? ? .reduce( 0,Long::sum );


? ? ? ? Instant end = Instant.now();
? ? ? ? System.out.println("耗費時間"+ Duration.between( start,end ).toMillis());

}



?

總結

以上是生活随笔為你收集整理的RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。