ForkJoinPool 学习示例
? ??
? ? ?在JAVA7之前,并行處理數(shù)據(jù)非常麻煩。第一,你得明確把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干份。第二,你要將每個子部分分配給一個獨立的線程。第三,你要在恰當(dāng)?shù)臅r候?qū)λ鼈冞M行同步避免不希望的競爭條件,等待所有線程完成,最后把這些部分結(jié)果合并起來。在Java 7引入了分支/合并框架,讓這些操作更穩(wěn)定、更不容易出錯。
? ? ?分支/合并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分為更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成整體結(jié)果。要把子任務(wù)提交到ForkJoinPool必須創(chuàng)建RecursiveTask<R>的子類。需要實現(xiàn)它唯一的抽象方法 protected abstract R compute(); ?在這個方法中定義了將任務(wù)拆分成子任務(wù)的邏輯,以及無法拆分時生成單個子任務(wù)結(jié)果的邏輯。
?
?
? 計算1到10000000的和
?
/*** Desc:Fork/Join框架的目的是以遞歸方式將可以并行的任務(wù)拆分為更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成一個整體結(jié)果。* 要把任務(wù)提交到ForkJoinPool必須創(chuàng)建RecursiveTask<T> 的一個子類* * @author wei.zw* @since 2016年7月6日 下午9:27:56* @version v 0.1*/ public class ForkJoinSumCalculator extends RecursiveTask<Long> {/** */private static final long serialVersionUID = -8013303660374621470L;private final long[] numbers;private final int start;private final int end;private static final long THRESHOLD = 1000;/*** @param numbers* @param start* @param end*/public ForkJoinSumCalculator(long[] numbers, int start, int end) {super();this.numbers = numbers;this.start = start;this.end = end;}/*** @param numbers*/public ForkJoinSumCalculator(long[] numbers) {super();this.numbers = numbers;this.start = 0;this.end = numbers.length;}/*** @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Long compute() {int length = end - start;if (length <= THRESHOLD) {return computeSequentially();}ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);leftTask.fork();ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);rightTask.fork();Long rightResult = 0L;try {rightResult = rightTask.get();} catch (Exception e) {}Long leftResult = leftTask.join();return leftResult + rightResult;}/*** * @return* @author wei.zw*/private Long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;}public static void main(String[] args) {long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();long start = System.currentTimeMillis();System.out.println(new ForkJoinPool().invoke(new ForkJoinSumCalculator(numbers)) + " 耗時:"+ (System.currentTimeMillis() - start));}}
?結(jié)果是:50000005000000 耗時:37
?
優(yōu)化后的
/*** @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Long compute() {int length = end - start;if (length <= THRESHOLD) {return computeSequentially();}ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);leftTask.fork();ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);Long rightResult = rightTask.compute();Long leftResult = leftTask.join();return leftResult + rightResult;}
?計算結(jié)果是:50000005000000 耗時:25
?
使用Fork/Join框架的最佳做法:
- 對一個任務(wù)調(diào)用join方法會阻塞調(diào)用方,直到該任務(wù)作出結(jié)果。因此,又必須要在兩個子任務(wù)的計算都開始之后再調(diào)用它。
- 不應(yīng)該在RecursiveTask內(nèi)部使用ForkJoinPool的invoke方法,應(yīng)該直接調(diào)用compute或者fork方法
- 對子任務(wù)調(diào)用fork方法可以將這個子任務(wù)排進ForkJoinPool。同時對左右兩邊的子任務(wù)都調(diào)用似乎很自然,但是這樣做的效率比直接對其中一個調(diào)用compute方法低。這樣做可以為其中一個子任務(wù)重用同一線程,從而避免在線程池中多分配一個任務(wù)造成的開銷。
看完了基本示例,在分析一下源碼;首先看一下RecursiveTask,通過名稱可以知道這是一個遞歸Task.源碼很簡單
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {private static final long serialVersionUID = 5232453952276485270L;//計算結(jié)果V result;//抽象的計算方法protected abstract V compute();//獲取計算結(jié)果public final V getRawResult() {return result;}//設(shè)置計算結(jié)果protected final void setRawResult(V value) {result = value;}//執(zhí)行計算protected final boolean exec() {result = compute();return true;}}RecursiveTask源碼看完以后,繼續(xù)分析ForkJoinTask
??
?
轉(zhuǎn)載于:https://www.cnblogs.com/wei-zw/p/8797732.html
總結(jié)
以上是生活随笔為你收集整理的ForkJoinPool 学习示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CSS3属性之圆角效果——border-
- 下一篇: 【转】void及void指针的深刻解析