基于Fork/Join框架实现对大型浮点数数组排序(归并算法和插入排序算法)
分支/合并框架
說明
重點是那個浮點數(shù)數(shù)組排序的例子,從主函數(shù)展開,根據(jù)序號看
1、GitHub代碼歡迎star。你們輕輕的一點,對我鼓勵特大,我有一個習慣,看完別人的文章是會點贊的。
2、個人認為學習語言最好的方式就是模仿、思考別人為什么這么寫。結合栗子效果更好,也能記住知識點。
3、只因為自己知識欠缺,語言組織能力不行,所以只能以這樣方式記錄。感覺效果很好。
4、過年前一天,過年中、過年第一天。聽力三遍《我們不一樣》,感謝你們的鼓勵,點贊。
前面的知識點都利用率 現(xiàn)在計算機提供的并行機制,然而我們還需要粒度更細的并行機制。例如:考慮使用遞歸計算Fibonacci數(shù)列的方法,
finonacci( n - 1) + finonacci( n - 2) 復制代碼可以將這兩個子任務分配給每個新的線程,當他們計算完成時,將結果相加。事實上,每個字問題的計算又可以分解為兩個子問題,直到不可細分位置 這類算法被稱為分治算法復雜的問題被分解為較小的問題,在根據(jù)字問題的解推導出原始問題的解。這樣問題就容易并行化。Java SE 7引入了新的分支/合并(Fork/Join)框架以簡化這類分治算法的實現(xiàn)。
大型任務被分解為若干塊,然后放入隊列用于后續(xù)計算,在隊列中,任務還可以將自身分解為更小的部分。線程會從隊列中能夠如取出任務并執(zhí)行。當所有線程結束后 ,將各部分結果合并得到最終結果。“分支”是指任務分解,“合并”是指結果合并。每個工作線程都維著任務的雙端隊列。隊列中后來的任務先執(zhí)行。當工作中沒有任務時需要執(zhí)行的時候,會嘗試從其他線程“竊取”任務,如果”竊取失敗,沒有其他 工作可做,就會下線。“竊取”的好處是減少了工作隊列中爭用情況。
像Task這樣的大型任務將被分解為兩個或更多的子任務,每個子任務可以進一步分解為新的子任務。直到子任務變得足夠簡單并得到解決。子任務敵對得到解決。
ForkJoinPool類
ForkJoinPool類是用于執(zhí)行ForkJoinTask的ExecutorSerivce。與其他ExecutorService的不同之處在于ForkJoinPool采用了前面提到的“工作竊取”機制。在構造過程中,可以在構造函中指定線程池的大小。如果使用的是默認無參構造函數(shù),那么會創(chuàng)建大小等于可用處理器數(shù)量的線程池。盡管已之地功能線程池的大小,但線程還會在嘗試維護更多活躍線程的任意時刻動態(tài)調整自身大小。ForkJoinPool提供了相應方法用于管理和監(jiān)控那些提交的任務。ForkJoinPool與其他ExecutorService的另一個重大區(qū)別在于:線程池需要在程序結束時顯示停止,因為其中所有的線程都處于守護狀態(tài)
有三種不同的方式可以將任務提交給ForkJoinPool。在異步執(zhí)行模式下,可以調用execute方法,并將ForkJoinTask作為參數(shù)。至于任務本身需要調用fork方法將任務在多個線程間分解。如果需要等待計算結果,可以調用ForkJoinPool的invoke方法。在ForkJoinTask中,可以接著調用invoke方法。invoke方法開始執(zhí)行任務并在任務結束后返回結果。如果底層失敗就會拋出異常或錯誤。最后可以通過調用ForkJoinPool的submit方法將任務提交給線程池,submit會返回一個Future對象,可以使用該對象檢查任務狀態(tài)和獲取執(zhí)行任務的結果。
ForkJoinTask類
ForkJoinTask類是運行在前面提到的ForkJoinPool中,用來創(chuàng)建任務的抽象類,RecursiveAction和RecursiveTask僅有兩個直接子類。任務在做提交給ForkJoinPool后,便開始執(zhí)行。ForkJoinTask僅包含兩個操作---分支和合并一旦開始執(zhí)行,就會啟動其他子任務。合并操作會等待子操作結束并在結果后提取運行結果。ForkJoinTask實現(xiàn)類Future接口,是Fiture輕量級形式。Future接口的get方法實現(xiàn),可用于等待任務結束并取得結果。還可以通過invoke方法執(zhí)行任務。該方法會在任務結束后返回結果,invokeAl方法可以接受任務集合作為參數(shù),該方法會分解出指定集合中的所有任務,并在每個人物結束后返回,或異常時。
ForkJoinTask類提供若干檢查任務執(zhí)行狀態(tài)的方法,只要任務結束,不管什么方法,isDone方法都用于檢查任務是否結束,返回true表示任務結束。isCompleledAbnormally方法用于檢測任務是否在被取消并且沒有異常的情況下正常結束。返回true表示正常結束。 isCancelled方法用于檢查任務是否被取消,返回true表示正常取消。
通常情況下不會直接繼承ForkJoinTask類,相反,會創(chuàng)建基于RecursiveTask或RecursiveAction的類。兩者均為ForkJoinTask的抽象子類。RucursiveTask用于返回結果的任務, 而RecursiveAction用于不返回結果的任務。無論使用哪一種,都要在子類中實現(xiàn)compute方法,并在其中執(zhí)行任務的主要計算。
大型浮點數(shù)數(shù)組排序
/*** Created by guo on 16/2/2018.* 大型浮點數(shù)數(shù)組排序* 需求:* 1、假設有一些數(shù)目非常大的浮點數(shù)(一百萬個),需要編寫一個程序將這些數(shù)字按升序排序,* 2、針對單線程的常用排序算法需要消耗過長的時間才能生成排好序的數(shù)組。* 3、這類問題恰好與分治模式相契合。我們將數(shù)組分解成多個較小的數(shù)組,并在每個數(shù)組中獨立進行排序。* 4、最后將不斷歸并排好序的數(shù)組合并成更大的數(shù)組以創(chuàng)建最終排好序的數(shù)組。*/ public class ParallelMergeSort {private static ForkJoinPool threadPool;private static final int THRESHOLD = 16;/*** 4、1 sort方法接受對象數(shù)組作為參數(shù),目標是對數(shù)組進行升序排序。* @param objectArray*/private static void sort(Comparable[] objectArray) {//4、2 聲明臨時目標數(shù)組用于存儲排序結果,大小等同于輸入數(shù)組。Comparable[] destArray = new Comparable[objectArray.length];//4、3 創(chuàng)建一個SortTask對象,并調用invoke方法將它提交給線程池。//4、4 SortTask接受4個參數(shù),待排序的數(shù)組,已經用于存儲排序后的對象目標數(shù)組,源數(shù)組中待排序元素的開始索引與結束索引。threadPool.invoke(new SortTask(objectArray, destArray, 0, objectArray.length - 1));}/*** 重點:* --5、SortTask其功能繼承自RecursiveAction類。* a、此排序算法不直接返回結果給調用方,因此基于RecursiveAction類。* b、如果算法具有返回值,比如計算 finonacci( n - 1) + finonacci( n - 2) ,那么就應該繼承RecursiveTask類哦。* c、作為SortTask類的具體實現(xiàn)的一部分,需要重寫compute抽象方法,*/static class SortTask extends RecursiveAction {private Comparable[] sourceArray;private Comparable[] destArray;private int lowerIndex, upperIndex;public SortTask(Comparable[] sourceArray, Comparable[] destArray, int lowerIndex, int upperIndex) {this.sourceArray = sourceArray;this.destArray = destArray;this.lowerIndex = lowerIndex;this.upperIndex = upperIndex;}/*** 6、compute方法用于檢查帶排序原色的大小。*/protected void compute() {//6、1 如果小于預定義的值THRESHOLD(16),就調用insertSort方法進行排序 。if (upperIndex - lowerIndex < THRESHOLD) {insertionSort(sourceArray, lowerIndex, upperIndex);return;}//6、2 如果沒有超過,就創(chuàng)建兩個子任務并遞歸進行調用。//6、3 每個子任務接收原有數(shù)組的一半作為自己的源數(shù)組,//6、3 minIndex定義了原有數(shù)組的中心點。調用invokeAll,將這兩個人物提交給線程池//6、4 分解任務為子任務的過程會一直遞歸執(zhí)行,直到每個子任務變得足夠小位置。//6、5 所有分解得到的子任務都被遞交給線程池,當它們結束時,compute方法會調用merge方法int minIndex = (lowerIndex + upperIndex >>> 1);invokeAll(new SortTask(sourceArray, destArray, lowerIndex, minIndex), new SortTask(sourceArray, destArray, minIndex + 1, upperIndex));merge(sourceArray, destArray, lowerIndex, minIndex, upperIndex);}}/***歸并算法*/public static void merge(Comparable[] sourceArray, Comparable[] destArray, int lowerIndex, int mindIndex, int upperIndex) {//1、 如果源數(shù)組中間索引小于等于右邊的則直接返回。if (sourceArray[mindIndex].compareTo(sourceArray[mindIndex + 1]) <= 0) {return;}/*** 2、調用底層實現(xiàn)的數(shù)組拷貝,是一個本地方法* void arraycopy(Object src, int srcPos,Object dest, int destPos,int length);* src the source array.* srcPos starting position in the source array.* dest the destination array.* destPos starting position in the destination data.* length the number of array elements to be copied.*/System.arraycopy(sourceArray, lowerIndex, destArray, lowerIndex, mindIndex - lowerIndex + 1);int i = lowerIndex;int j = mindIndex + 1;int k = lowerIndex;//3、 將兩個數(shù)組進行歸并,while (k < j && j < upperIndex) {if (destArray[i].compareTo(sourceArray[j]) <= 0) {sourceArray[k++] = destArray[i++];} else {sourceArray[k++] = sourceArray[j++];}}System.arraycopy(destArray, i, sourceArray, k, j - k);}/*** 插入排序 (得好好研究下算法了)* 1、從后向前找到格式的位置插入,* 2、 每步將一個待排序的記錄,按其順序大小插入到前面已經排好的子序列合適的位置。* 3、直到全部插入位置。*/private static void insertionSort(Comparable[] objectArray, int lowerIndex, int upperIndex) {//1、控制比較的輪數(shù),for (int i = lowerIndex + 1; i <= upperIndex; i++) {int j = i;Comparable tempObject = objectArray[j];//2、后一個和前面一個比較,如果前面的小,則把前面的賦值到后面。while (j > lowerIndex && tempObject.compareTo(objectArray[j - 1]) < 0) {objectArray[j] = objectArray[j - 1];--j;}objectArray[j] = tempObject;}}/*** 3、1 使用Random類生成范圍在0-1000之間的數(shù)據(jù)點,并使用這些隨機生成的數(shù)據(jù)初始化數(shù)組每一個元素。* 3、2 創(chuàng)建好的數(shù)據(jù)點的數(shù)目等同于函數(shù)參數(shù)的數(shù)目,這個例子是1000,增大該數(shù)字,可以驗證并行排序算法的效率。* @param length* @return*/public static Double[] createRandomData(int length) {Double[] data = new Double[length];for (int i = 0; i < data.length; i++) {data[i] = length * Math.random();}return data;} }復制代碼主函數(shù)
/*** 主函數(shù)** @param args*/public static void main(String[] args) {//1、獲取當前運行代碼所在機器上的可以用處理器數(shù)目int processors = Runtime.getRuntime().availableProcessors();System.out.println("no of processors:" + processors);//2、1創(chuàng)建大小等同于處理器數(shù)目的線程池,這一數(shù)目是運行在可用硬件最佳數(shù)目//2、2如果創(chuàng)建更大的線程池,就會有CPU競爭的情況發(fā)生,//2、3通過實例化ForkJoinPool,并將線程池大小作為參數(shù)傳入構造函數(shù)以創(chuàng)建線程池threadPool = new ForkJoinPool(processors);//3、構造隨機數(shù)據(jù)的數(shù)組,并輸入以方便嚴重Double[] data = createRandomData(100000);System.out.println("original unsorted data:");for (Double d : data) {System.out.printf("%3.2f ", (double) d);}//4、調用sort方法對生成的數(shù)據(jù)進行排序,并再次輸出數(shù)組,以驗證數(shù)組已經排好序。sort(data);System.out.println("\n\n Sorted Array ");for (double d : data) {System.out.printf("%3.2f ", d);}} 復制代碼總結
以上是生活随笔為你收集整理的基于Fork/Join框架实现对大型浮点数数组排序(归并算法和插入排序算法)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NEO从源码分析看网络通信
- 下一篇: springboot项目打包成war并在