java并发编程学习5--forkJoin
【概念
分支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個(gè)子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個(gè)實(shí)現(xiàn),它把子任務(wù)分配給線程池(ForkJoinPool)中的工作線程。某些應(yīng)用可能對每個(gè)處理器內(nèi)核飯別試用一個(gè)線程,來完成計(jì)算密集任務(wù),例如圖像處理。java7引入forkjoin框架,專門用來支持這一類應(yīng)用。假設(shè)有一個(gè)處理任務(wù),它可以很自然的分解成子任務(wù)。
【使用方式
要把任務(wù)提交到線程池,必須創(chuàng)建RecursiveTask<R>的一個(gè)子類,其中R是并行化任務(wù)產(chǎn)生的結(jié)果(如果沒有結(jié)果使用RecursiveAction類型)。然后在子類中實(shí)現(xiàn)product abstract R compute()方法即可。這個(gè)方法同時(shí)實(shí)現(xiàn)了“拆分子任務(wù)”,“任務(wù)不可拆時(shí)”的處理邏輯。如下所示:
if(任務(wù)足夠小){順序計(jì)算該任務(wù)的值; }else{將任務(wù)分成兩個(gè)子任務(wù);遞歸調(diào)用本方法;合并每個(gè)子任務(wù)的結(jié)果; }【最佳實(shí)踐
【工作竊取
我們很難確定要滿足什么條件才可以不再拆分任務(wù)。但是分出大量的小任務(wù)是一個(gè)好的選擇,因?yàn)樵诶硐肭闆r下,劃分并行任務(wù)時(shí)因該讓每個(gè)任務(wù)都花費(fèi)相同的時(shí)間。讓cpu的所有內(nèi)核都一樣的繁忙,但是現(xiàn)實(shí)中我們的子任務(wù)花費(fèi)的時(shí)間大不相同,這是因?yàn)橛性S多我們無法確認(rèn)的情況出現(xiàn):io,rpc,分配效率等等。分支合并框架使用:工作竊取來解決內(nèi)核之間任務(wù)不匹配的問題。讓所有任務(wù)大致相同的被平均分配到forkjoinpool的每個(gè)線程上。每個(gè)線程都擁有一個(gè)雙向鏈?zhǔn)疥?duì)列來保存它的任務(wù),每完成一個(gè)任務(wù)就從隊(duì)列頭部取出下一個(gè)任務(wù)執(zhí)行。當(dāng)一個(gè)線程的任務(wù)隊(duì)列已空,而其他線程還在繁忙,這個(gè)空閑線程就隨機(jī)選擇一個(gè)繁忙線程并從其隊(duì)列尾部拿走一個(gè)任務(wù)開始執(zhí)行,直到所有的任務(wù)執(zhí)行完畢。
【例子
1.輸出數(shù)組中有多少個(gè)數(shù)字小于0.5
public class ExerciseFilter {//數(shù)據(jù)源static double numbers[] = new double[100];static {for(int i = 0 ; i < 100 ; i++){numbers[i] = i + 1;}numbers[0] = 0.08;numbers[1] = 0.18;numbers[11] = 0.18;}public static void main(String[] args) {Counter counter = new Counter(numbers,x -> x < 0.5);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動(dòng)并行任務(wù)pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class Counter extends RecursiveTask<Integer>{//分界線,當(dāng)一個(gè)數(shù)組的長度 < 1000 就不再繼續(xù)拆分public static final int THRESHOLD = 1000;//數(shù)組private double[] values;//判斷條件private DoublePredicate filter;public Counter(double [] values,DoublePredicate filter){this.values = values;this.filter = filter;}@Overrideprotected Integer compute() {//任務(wù)足夠小就不再拆分if(values.length < THRESHOLD ){//返回該數(shù)組中有多少數(shù)字滿足判斷邏輯int count = 0;for(int i = 0; i < values.length ; i++){if(filter.test(values[i])){count++;}}return count;}else {//將打數(shù)組拆分成兩個(gè)int mid = values.length / 2;Counter first = new Counter(Arrays.copyOfRange(values,0,mid),filter);//第一個(gè)子任務(wù)提交到線程池first.fork();Counter second = new Counter(Arrays.copyOfRange(values,mid,values.length),filter);//當(dāng)前線程執(zhí)行第二個(gè)子任務(wù),節(jié)約一個(gè)線程的開銷int secondResult = second.compute();//等待第一個(gè)子任務(wù)執(zhí)行完畢int firstResult = first.join();return firstResult + secondResult;}} }2.列表中求和
public class ExerciseSum {//數(shù)據(jù)源static int sum[] = new int[100];static {for(int i = 0 ; i < 100 ; i++){sum[i] = i + 1;}}public static void main(String[] args) {CounterSum counter = new CounterSum(sum);ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();pool.invoke(counter);System.out.println((System.currentTimeMillis() - st) + " : " + counter.join());} }class CounterSum extends RecursiveTask<Integer> {//最小拆分單位:每個(gè)小數(shù)組length = 10public static final int THRESHOLD = 10;private int[] values;public CounterSum(int [] values){this.values = values;}@Overrideprotected Integer compute() {if(values.length < THRESHOLD ){int count = 0;for(int i = 0; i < values.length ; i++){count += values[i];}return count;}else {int mid = values.length / 2;CounterSum first = new CounterSum(Arrays.copyOfRange(values,0,mid));first.fork();CounterSum second = new CounterSum(Arrays.copyOfRange(values,mid,values.length));int secondResult = second.compute();int firstResult = first.join();return firstResult + secondResult;}} }3.排序
public class ExerciseSort {//數(shù)據(jù)源static int num[] = new int[100];static {Random r = new Random();for(int i = 0 ; i < 100 ; i++){num[i] = r.nextInt(100);}}public static void main(String[] args) {CounterSort counter = new CounterSort(num);//使用單例ForkJoinPool pool = ForkJoinPool.commonPool();long st = System.currentTimeMillis();//啟動(dòng)并行任務(wù)pool.invoke(counter);System.out.println((System.currentTimeMillis() - st));Arrays.stream(counter.join()).forEach(System.out::println);} }class CounterSort extends RecursiveTask<int[]> {//最小拆分單位:每個(gè)小數(shù)組length = 10public static final int THRESHOLD = 10;//待排序數(shù)組private int[] values;public CounterSort(int [] values){this.values = values;}@Overrideprotected int[] compute() {if(values.length < THRESHOLD ){int[] result = new int[values.length];//1.單數(shù)組排序result = Arrays.stream(values).sorted().toArray();return result;}else {int mid = values.length / 2;CounterSort first = new CounterSort(Arrays.copyOfRange(values,0,mid));first.fork();CounterSort second = new CounterSort(Arrays.copyOfRange(values,mid,values.length));int[] secondResult = second.compute();int[] firstResult = first.join();//兩個(gè)數(shù)組混合排序int[] combineRsult = combineIntArray(firstResult,secondResult);return combineRsult;}}private int[] combineIntArray(int[] a1,int[] a2){int result[] = new int[a1.length + a2.length];int a1Len = a1.length;int a2Len = a2.length;int destLen = result.length;for(int index = 0 , a1Index = 0 , a2Index = 0 ; index < destLen ; index++) {int value1 = a1Index >= a1Len?Integer.MAX_VALUE:a1[a1Index];int value2 = a2Index >= a2Len?Integer.MAX_VALUE:a2[a2Index];if(value1 < value2) {a1Index++;result[index] = value1;}else {a2Index++;result[index] = value2;}}return result;} }總結(jié)
以上是生活随笔為你收集整理的java并发编程学习5--forkJoin的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HMAC-SHA1加密
- 下一篇: linux中shell变量$#,$@,$