java高并发(十六)J.U.C之ForkJoin
ForkJoin框架是Java7提供的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
與MapReduce思想非常類似。從字面意思上看,Fork就是把一個大任務切割成若干個子任務并行執行,Join就是合并這些子任務的執行結果,最后得到大任務的結果。主要采用工作竊取算法。
? ? 工作竊取算法是指某個線程從其他隊列里竊取任務來執行。下面是工作竊取的流程圖:
為什么要使用工作竊取算法?
? ? 加入我們需要做一個比較大的任務,我們可以把這個任務分割成若干個互不依賴的子任務,為了減少線程間的競爭,于是把這些子任務分別放到不同的隊列里,為每個隊列創建一個單獨的線程來執行隊列里面的任務,線程和隊列一一對應。比如A線程負責處理A隊列里面的任務,但是有些線程會先把自己的隊列里面的任務干完,而其他線程還有對應的任務等待處理,干完活的線程就會幫其他線程干活,于是就會去其他線程的隊列里竊取一個任務來執行,這時他們會訪問同一個隊列,所以為了減少竊取任務線程與被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務的線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。這種優點就是充分利用線程進行并行計算,減少了線程間的競爭,缺點是在某些情況下還是存在競爭(比如在雙端隊列只有一個任務時),同時也消耗了更多的系統資源(比如創建了多個線程和多個雙端隊列)。
? ? 對于ForkJoin框架而言,當一個任務正在等待他使用ForkJoin操作創建的子任務結束時,執行這個任務的工作線程查找其他未被執行的任務,并開始他的執行,通過這種方式,線程充分利用他的運行時間來提高應用程序的性能,為了實現這個目標ForkJoin框架執行的任務有一些局限性。
局限性
- 任務只能使用Fork和Join操作來作為同步機制,如果使用了其他同步機制,那他們在同步操作時工作線程就不能執行其他任務了。比如在forkjoin框架中使任務進入sleep,在睡眠期間內正在執行這個任務的工作線程將不會執行其他任務了。
- 我們所拆分的任務不應該去執行IO操作。例如讀寫數據文件
- 任務不能拋出檢查異常。必須通過必要的代碼來處理他們
ForkJoin框架的核心是兩個類:ForkJoinPool和ForkJoinTask。ForkJoinPool負責做實現(包括工作竊取算法)他管理工作線程提供任務的狀態以及他們的執行信息。而ForkJoinTask則主要提供在任務中執行Fork和Join操作的機制。
@Slf4j public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任務足夠小就計算任務boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任務大于閾值,就分裂成兩個子任務計算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 執行子任務leftTask.fork();rightTask.fork();//等待任務執行結束合并其結果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任務sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();//生成一個計算任務,計算1+2+3+4...+100ForkJoinTaskExample taskExample = new ForkJoinTaskExample(1, 100);Future<Integer> result = forkJoinPool.submit(taskExample);try{log.info("result:{}", result.get());} catch (InterruptedException e) {log.error("exception", e);e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}} }?
總結
以上是生活随笔為你收集整理的java高并发(十六)J.U.C之ForkJoin的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java高并发(十五)J.U.C之Fut
- 下一篇: java高并发(十七)J.U.C之Blo