叉叉框架_叉/连接框架
叉叉框架
本文是我們名為Java Concurrency Essentials的學(xué)院課程的一部分。
在本課程中,您將深入探討并發(fā)的魔力。 將向您介紹并發(fā)和并發(fā)代碼的基礎(chǔ)知識,并學(xué)習(xí)諸如原子性,同步和線程安全性的概念。 在這里查看 !
目錄
1.簡介 2.叉/連接1.簡介
本文介紹了Fork / Join框架,該框架從1.7版開始就是JDK的一部分。 它描述了框架的基本功能,并提供了一些示例以提供一些實踐經(jīng)驗。
2.叉/連接
Fork / Join框架的基類是java.util.concurrent.ForkJoinPool 。 此類實現(xiàn)Executor和ExecutorService這兩個接口,并AbstractExecutorService 。 因此, ForkJoinPool基本上是一個線程池,它承擔(dān)特殊任務(wù),即ForkJoinTask 。 此類實現(xiàn)已知的Future接口以及諸如get() , cancel()和isDone() 。 除此之外,該類還提供了兩個為整個框架命名的方法: fork()和join() 。
調(diào)用fork()將啟動任務(wù)的異步執(zhí)行時,調(diào)用join()將等待直到任務(wù)完成并檢索其結(jié)果。 因此,我們可以將給定任務(wù)拆分為多個較小的任務(wù),分叉每個任務(wù),最后等待所有任務(wù)完成。 這使復(fù)雜問題的實現(xiàn)更加容易。
在計算機科學(xué)中,這種方法也稱為分治法。 每當(dāng)一個問題太復(fù)雜而無法立即解決時,它就會分為多個較小的問題,并且更容易解決。 可以這樣寫成偽代碼:
if(problem.getSize() > THRESHOLD) {SmallerProblem smallerProblem1 = new SmallerProblem();smallerProblem1.fork();SmallerProblem smallerProblem2 = new SmallerProblem();smallerProblem2.fork();return problem.solve(smallerProblem1.join(), smallerProblem2.join()); } else {return problem.solve(); }首先,我們檢查問題的當(dāng)前大小是否大于給定的閾值。 在這種情況下,我們將問題分成較小的問題,對每個新任務(wù)進(jìn)行fork() ,然后通過調(diào)用join()等待結(jié)果。 當(dāng)join()返回每個子任務(wù)的結(jié)果時,我們必須找到較小問題的最佳解決方案,并將其作為最佳解決方案返回。 重復(fù)這些步驟,直到給定的閾值太低并且問題很小,我們可以直接計算其解而無需進(jìn)一步除法。
遞歸任務(wù)
為了更好地掌握此過程,我們實現(xiàn)了一種算法,該算法可在整數(shù)值數(shù)組中找到最小的數(shù)字。 這個問題不是您使用ForkJoinPool在日常工作中解決的問題,但是以下實現(xiàn)非常清楚地顯示了基本原理。 在main()方法中,我們設(shè)置了一個帶有隨機值的整數(shù)數(shù)組,并創(chuàng)建了一個新的ForkJoinPool 。
傳遞給其構(gòu)造函數(shù)的第一個參數(shù)是所需并行度的指示器。 在這里,我們在Runtime查詢可用的CPU內(nèi)核數(shù)。 然后,我們調(diào)用invoke()方法并傳遞FindMin的實例。 FindMin擴展了RecursiveTask類,該類本身是前面提到的ForkJoinTask的子類。 類ForkJoinTask實際上有兩個子類:一個子類用于返回值的任務(wù)( RecursiveTask ),另一個子類用于不返回值的任務(wù)( RecursiveAction )。 超類迫使我們實現(xiàn)compute() 。 在這里,我們看一下整數(shù)數(shù)組的給定切片,并確定當(dāng)前問題是否太大而無法立即解決。
當(dāng)在數(shù)組中找到最小的數(shù)時,要直接解決的最小問題大小是將兩個元素相互比較并返回它們的最小值。 如果當(dāng)前有兩個以上的元素,則將數(shù)組分為兩部分,然后再在這兩個部分中找到最小的數(shù)字。 通過創(chuàng)建兩個新的FindMin實例來完成此操作。
構(gòu)造函數(shù)被提供給數(shù)組以及開始和結(jié)束索引。 然后,我們通過調(diào)用fork()異步開始執(zhí)行這兩個任務(wù)。 該調(diào)用將兩個任務(wù)提交到線程池的隊列中。 線程池實現(xiàn)了一種稱為工作竊取的策略,即,如果所有其他線程都有足夠的工作要做,則當(dāng)前線程會從其他任務(wù)之一中竊取其工作。 這樣可以確保任務(wù)盡快執(zhí)行。
public class FindMin extends RecursiveTask<Integer> {private static final long serialVersionUID = 1L;private int[] numbers;private int startIndex;private int endIndex;public FindMin(int[] numbers, int startIndex, int endIndex) {this.numbers = numbers;this.startIndex = startIndex;this.endIndex = endIndex;}@Overrideprotected Integer compute() {int sliceLength = (endIndex - startIndex) + 1;if (sliceLength > 2) {FindMin lowerFindMin = new FindMin(numbers, startIndex, startIndex + (sliceLength / 2) - 1);lowerFindMin.fork();FindMin upperFindMin = new FindMin(numbers, startIndex + (sliceLength / 2), endIndex);upperFindMin.fork();return Math.min(lowerFindMin.join(), upperFindMin.join());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) {int[] numbers = new int[100];Random random = new Random(System.currentTimeMillis());for (int i = 0; i < numbers.length; i++) {numbers[i] = random.nextInt(100);}ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());Integer min = pool.invoke(new FindMin(numbers, 0, numbers.length - 1));System.out.println(min);} }遞歸動作
正如上面在RecursiveTask旁邊提到的,我們還有RecursiveAction類。 與RecursiveTask相比,它不必返回值,因此可以將其用于可以直接在給定數(shù)據(jù)結(jié)構(gòu)上執(zhí)行的異步計算。 這樣的例子是從彩色圖像中計算出灰度圖像。 我們要做的就是遍歷圖像的每個像素,并使用以下公式從RGB值中計算灰度值:
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue浮點數(shù)表示特定顏色對我們?nèi)祟悓疑母兄龀龅呢暙I(xiàn)。 由于最高值用于綠色,因此可以得出結(jié)論,灰度圖像僅被計算為綠色部分的近3/4。 因此,假設(shè)圖像是代表實際像素數(shù)據(jù)的對象,并且使用setRGB()和getRGB()方法檢索實際RGB值,則基本實現(xiàn)將如下所示:
for (int row = 0; row < height; row++) {for (int column = 0; column < bufferedImage.getWidth(); column++) {int grayscale = computeGrayscale(image.getRGB(column, row));image.setRGB(column, row, grayscale);} }上面的實現(xiàn)在單個CPU機器上運行良好。 但是,如果我們有多個CPU可用,我們可能希望將此工作分配給可用的內(nèi)核。 因此,我們可以使用ForkJoinPool并為圖像的每一行(或每一列)提交一個新任務(wù),而不是遍歷所有像素的兩個嵌套for循環(huán)。 一旦將一行轉(zhuǎn)換為灰度,當(dāng)前線程就可以在另一行上工作。
以下示例實現(xiàn)了此原理:
public class GrayscaleImageAction extends RecursiveAction {private static final long serialVersionUID = 1L;private int row;private BufferedImage bufferedImage;public GrayscaleImageAction(int row, BufferedImage bufferedImage) {this.row = row;this.bufferedImage = bufferedImage;}@Overrideprotected void compute() {for (int column = 0; column < bufferedImage.getWidth(); column++) {int rgb = bufferedImage.getRGB(column, row);int r = (rgb >> 16) & 0xFF;int g = (rgb >> 8) & 0xFF;int b = (rgb & 0xFF);int gray = (int) (0.2126 * (float) r + 0.7152 * (float) g + 0.0722 * (float) b);gray = (gray << 16) + (gray << 8) + gray;bufferedImage.setRGB(column, row, gray);}}public static void main(String[] args) throws IOException {ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());BufferedImage bufferedImage = ImageIO.read(new File(args[0]));for (int row = 0; row < bufferedImage.getHeight(); row++) {GrayscaleImageAction action = new GrayscaleImageAction(row, bufferedImage);pool.execute(action);}pool.shutdown();ImageIO.write(bufferedImage, "jpg", new File(args[1]));} }在main()方法中,我們使用Java的ImageIO類讀取圖像。 返回的BufferedImage實例具有我們需要的所有方法。 我們可以查詢行數(shù)和列數(shù),并檢索和設(shè)置每個像素的RGB值。 因此,我們要做的是遍歷所有行并將新的GrayscaleImageAction提交到我們的ForkJoinPool 。 后者已收到有關(guān)可用處理器的提示,作為其構(gòu)造函數(shù)的參數(shù)。
現(xiàn)在, ForkJoinPool通過調(diào)用其compute()方法來異步啟動任務(wù)。 在此方法中,我們遍歷每行并通過其灰度值更新相應(yīng)的RGB值。 將所有任務(wù)提交給池后,我們在主線程中等待整個池的關(guān)閉,然后使用ImageIO.write()方法將更新的BufferedImage寫回到磁盤。
令人驚訝的是,與不使用可用處理器的情況相比,我們只需要多幾行代碼即可。 這再次顯示了使用java.util.concurrent包的可用資源可以節(jié)省多少工作。
ForkJoinPool提供了三種不同的提交任務(wù)的方法:
- execute(ForkJoinTask) :此方法異步執(zhí)行給定的任務(wù)。 它沒有返回值。
- invoke(ForkJoinTask) :此方法等待任務(wù)返回值。
- submit(ForkJoinTask) :此方法異步執(zhí)行給定的任務(wù)。 它返回對任務(wù)本身的引用。 因此,任務(wù)引用可用于查詢結(jié)果(因為它實現(xiàn)了Future接口)。
有了這些知識,很清楚為什么我們要使用execute()方法提交上述GrayscaleImageAction 。 如果我們改為使用invoke() ,則主線程將等待任務(wù)完成,而我們將不會利用可用的并行度。
仔細(xì)研究ForkJoinTask-API,我們會發(fā)現(xiàn)相同的區(qū)別:
- ForkJoinTask.fork() : ForkJoinTask是異步執(zhí)行的。 它沒有返回值。
- ForkJoinTask.invoke() :立即執(zhí)行ForkJoinTask并在完成后返回結(jié)果。
ForkJoinPool和ExecutorService
既然我們知道ExecutorService和ForkJoinPool ,您可能會問自己為什么我們應(yīng)該使用ForkJoinPool而不是ExecutorService 。 兩者之間的差異不是很大。 兩者都具有execute()和submit()方法,并采用一些常見接口的實例,例如Runnable , Callable , RecursiveAction或RecursiveTask 。
為了更好地理解這些區(qū)別,讓我們嘗試使用ExecutorService從上面實現(xiàn)FindMin類:
public class FindMinTask implements Callable<Integer> {private int[] numbers;private int startIndex;private int endIndex;private ExecutorService executorService;public FindMinTask(ExecutorService executorService, int[] numbers, int startIndex, int endIndex) {this.executorService = executorService;this.numbers = numbers;this.startIndex = startIndex;this.endIndex = endIndex;}public Integer call() throws Exception {int sliceLength = (endIndex - startIndex) + 1;if (sliceLength > 2) {FindMinTask lowerFindMin = new FindMinTask(executorService, numbers, startIndex, startIndex + (sliceLength / 2) - 1);Future<Integer> futureLowerFindMin = executorService.submit(lowerFindMin);FindMinTask upperFindMin = new FindMinTask(executorService, numbers, startIndex + (sliceLength / 2), endIndex);Future<Integer> futureUpperFindMin = executorService.submit(upperFindMin);return Math.min(futureLowerFindMin.get(), futureUpperFindMin.get());} else {return Math.min(numbers[startIndex], numbers[endIndex]);}}public static void main(String[] args) throws InterruptedException, ExecutionException {int[] numbers = new int[100];Random random = new Random(System.currentTimeMillis());for (int i = 0; i < numbers.length; i++) {numbers[i] = random.nextInt(100);}ExecutorService executorService = Executors.newFixedThreadPool(64);Future<Integer> futureResult = executorService.submit(new FindMinTask(executorService, numbers, 0, numbers.length-1));System.out.println(futureResult.get());executorService.shutdown();} }該代碼看起來非常相似,期望我們submit()任務(wù)submit()給ExecutorService ,然后使用返回的Future實例來wait()結(jié)果。 兩種實現(xiàn)之間的主要區(qū)別可以在構(gòu)造線程池的那一點上找到。 在上面的示例中,我們創(chuàng)建了一個具有64(!)個線程的固定線程池。 為什么選擇這么大的數(shù)字? 原因是,對每個返回的Future調(diào)用get()阻塞當(dāng)前線程,直到結(jié)果可用為止。 如果我們僅向可用池提供盡可能多的線程,則程序?qū)⒑谋M資源并無限期地掛起。
ForkJoinPool實現(xiàn)了已經(jīng)提到的工作竊取策略,即每次運行線程必須等待某些結(jié)果時; 該線程從工作隊列中刪除當(dāng)前任務(wù),并執(zhí)行其他準(zhǔn)備運行的任務(wù)。 這樣,當(dāng)前線程不會被阻塞,并且可以用來執(zhí)行其他任務(wù)。 一旦計算出最初暫停的任務(wù)的結(jié)果,該任務(wù)就會再次執(zhí)行,join()方法將返回結(jié)果。 這與普通的ExecutorService有一個重要的區(qū)別,在常規(guī)ExecutorService ,您必須在等待結(jié)果時阻止當(dāng)前線程。
翻譯自: https://www.javacodegeeks.com/2015/09/forkjoin-framework.html
叉叉框架
總結(jié)
以上是生活随笔為你收集整理的叉叉框架_叉/连接框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 雷蛇萨诺狼蛛和达尔优ek815哪个好
- 下一篇: 字节流和字符流哪个不刷新_不喜欢节流吗?