JAVA并行框架:Fork/Join
轉(zhuǎn)載自?https://www.cnblogs.com/dongguacai/p/6021859.html
JAVA并行框架:Fork/Join
一、背景
雖然目前處理器核心數(shù)已經(jīng)發(fā)展到很大數(shù)目,但是按任務(wù)并發(fā)處理并不能完全充分的利用處理器資源,因?yàn)橐话愕膽?yīng)用程序沒有那么多的并發(fā)處理任務(wù)。基于這種現(xiàn)狀,考慮把一個任務(wù)拆分成多個單元,每個單元分別得到執(zhí)行,最后合并每個單元的結(jié)果。
Fork/Join框架是JAVA7提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
二、工作竊取算法
指的是某個線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。使用的場景是一個大任務(wù)拆分成多個小任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊(duì)列中,并且每個隊(duì)列都有單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對應(yīng)。但是會出現(xiàn)這樣一種情況:A線程處理完了自己隊(duì)列的任務(wù),B線程的隊(duì)列里還有很多任務(wù)要處理。A是一個很熱情的線程,想過去幫忙,但是如果兩個線程訪問同一個隊(duì)列,會產(chǎn)生競爭,所以A想了一個辦法,從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。而B線程永遠(yuǎn)是從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行(任務(wù)是一個個獨(dú)立的小任務(wù)),這樣感覺A線程像是小偷在竊取B線程的東西一樣。
工作竊取算法的優(yōu)點(diǎn):
?????????利用了線程進(jìn)行并行計(jì)算,減少了線程間的競爭。
工作竊取算法的缺點(diǎn):
???????? 1、如果雙端隊(duì)列中只有一個任務(wù)時,線程間會存在競爭。
???????? 2、竊取算法消耗了更多的系統(tǒng)資源,如會創(chuàng)建多個線程和多個雙端隊(duì)列。
三、框架設(shè)計(jì)
?Fork/Join中兩個重要的類:
1、ForkJoinTask:使用該框架,需要創(chuàng)建一個ForkJoin任務(wù),它提供在任務(wù)中執(zhí)行fork和join操作的機(jī)制。一般情況下,我們并不需要直接繼承ForkJoinTask類,只需要繼承它的子類,它的子類有兩個:
a、RecursiveAction:用于沒有返回結(jié)果的任務(wù)。
b、RecursiveTask:用于有返回結(jié)果的任務(wù)。
2、ForkJoinPool:任務(wù)ForkJoinTask需要通過ForkJoinPool來執(zhí)行。
1 package test;2 3 import java.util.concurrent.ExecutionException;4 import java.util.concurrent.ForkJoinPool;5 import java.util.concurrent.Future;6 import java.util.concurrent.RecursiveTask;7 8 9 public class CountTask extends RecursiveTask<Integer> 10 { 11 private static final long serialVersionUID = 1L; 12 //閾值 13 private static final int THRESHOLD = 2; 14 private int start; 15 private int end; 16 17 public CountTask(int start, int end) 18 { 19 this.start = start; 20 this.end = end; 21 } 22 23 @Override 24 protected Integer compute() 25 { 26 int sum = 0; 27 //判斷任務(wù)是否足夠小 28 boolean canCompute = (end - start) <= THRESHOLD; 29 if(canCompute) 30 { 31 //如果小于閾值,就進(jìn)行運(yùn)算 32 for(int i=start; i<=end; i++) 33 { 34 sum += i; 35 } 36 } 37 else 38 { 39 //如果大于閾值,就再進(jìn)行任務(wù)拆分 40 int middle = (start + end)/2; 41 CountTask leftTask = new CountTask(start,middle); 42 CountTask rightTask = new CountTask(middle+1,end); 43 //執(zhí)行子任務(wù) 44 leftTask.fork(); 45 rightTask.fork(); 46 //等待子任務(wù)執(zhí)行完,并得到執(zhí)行結(jié)果 47 int leftResult = leftTask.join(); 48 int rightResult = rightTask.join(); 49 //合并子任務(wù) 50 sum = leftResult + rightResult; 51 52 } 53 return sum; 54 } 55 56 public static void main(String[] args) 57 { 58 ForkJoinPool forkJoinPool = new ForkJoinPool(); 59 CountTask task = new CountTask(1,6); 60 //執(zhí)行一個任務(wù) 61 Future<Integer> result = forkJoinPool.submit(task); 62 try 63 { 64 System.out.println(result.get()); 65 } 66 catch (InterruptedException e) 67 { 68 e.printStackTrace(); 69 } 70 catch (ExecutionException e) 71 { 72 e.printStackTrace(); 73 } 74 75 } 76 77 }這個程序是將1+2+3+4+5+6拆分成1+2;3+4;5+6三個部分進(jìn)行子程序進(jìn)行計(jì)算后合并。
四、源碼解讀
1、leftTask.fork();
1 public final ForkJoinTask<V> fork() { 2 Thread t; 3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 4 ((ForkJoinWorkerThread)t).workQueue.push(this); 5 else 6 ForkJoinPool.common.externalPush(this); 7 return this; 8 }fork方法內(nèi)部會先判斷當(dāng)前線程是否是ForkJoinWorkerThread的實(shí)例,如果滿足條件,則將task任務(wù)push到當(dāng)前線程所維護(hù)的雙端隊(duì)列中。
1 final void push(ForkJoinTask<?> task) {2 ForkJoinTask<?>[] a; ForkJoinPool p;3 int b = base, s = top, n;4 if ((a = array) != null) { // ignore if queue removed5 int m = a.length - 1; // fenced write for task visibility6 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);7 U.putOrderedInt(this, QTOP, s + 1);8 if ((n = s - b) <= 1) {9 if ((p = pool) != null) 10 p.signalWork(p.workQueues, this); 11 } 12 else if (n >= m) 13 growArray(); 14 } 15 }在push方法中,會調(diào)用ForkJoinPool的signalWork方法喚醒或創(chuàng)建一個工作線程來異步執(zhí)行該task任務(wù)。
2、
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}通過doJoin方法返回的任務(wù)狀態(tài)來判斷,如果不是NORMAL,則拋異常:
private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}來看下doJoin方法:
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}先查看任務(wù)狀態(tài),如果已經(jīng)完成,則直接返回任務(wù)狀態(tài);如果沒有完成,則從任務(wù)隊(duì)列中取出任務(wù)并執(zhí)行。
?
總結(jié)
以上是生活随笔為你收集整理的JAVA并行框架:Fork/Join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Fork/Join 框架
- 下一篇: 深入分析 ThreadLocal 内存泄