生活随笔
收集整理的這篇文章主要介紹了
浅谈Java的Fork/Join并发框架
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1. Fork/Join是什么
? Oracle的官方給出的定義是:Fork/Join框架是一個(gè)實(shí)現(xiàn)了ExecutorService接口的多線程處理器。它可以把一個(gè)大的任務(wù)劃分為若干個(gè)小的任務(wù)并發(fā)執(zhí)行,充分利用可用的資源,進(jìn)而提高應(yīng)用的執(zhí)行效率。
? Fork/Join實(shí)現(xiàn)了ExecutorService,所以它的任務(wù)也需要放在線程池中執(zhí)行。它的不同在于它使用了工作竊取算法,空閑的線程可以從滿負(fù)荷的線程中竊取任務(wù)來(lái)幫忙執(zhí)行。(我個(gè)人理解的工作竊取大意就是:由于線程池中的每個(gè)線程都有一個(gè)隊(duì)列,而且線程間互不影響。那么線程每次都從自己的任務(wù)隊(duì)列的頭部獲取一個(gè)任務(wù)出來(lái)執(zhí)行。如果某個(gè)時(shí)候一個(gè)線程的任務(wù)隊(duì)列空了,而其余的線程任務(wù)隊(duì)列中還有任務(wù),那么這個(gè)線程就會(huì)從其他線程的任務(wù)隊(duì)列中取一個(gè)任務(wù)出來(lái)幫忙執(zhí)行。就像偷取了其他人的工作一樣)
Fork/Join框架的核心是繼承了AbstractExecutorService的ForkJoinPool類(lèi),它保證了工作竊取算法和ForkJoinTask的正常工作。
下面是引用Oracle官方定義的原文:
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.
As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.
2. Fork/Join的基本用法
(1)Fork/Join基類(lèi)
? 上文已經(jīng)提到,Fork/Join就是要講一個(gè)大的任務(wù)分割成若干小的任務(wù),所以第一步當(dāng)然是要做任務(wù)的分割,大致方式如下:
if ?(這個(gè)任務(wù)足夠小){? ??執(zhí)行要做的任務(wù)? }?else ?{? ??將任務(wù)分割成兩小部分? ??執(zhí)行兩小部分并等待執(zhí)行結(jié)果? }?
要實(shí)現(xiàn)FrokJoinTask我們需要一個(gè)繼承了RecursiveTask或RecursiveAction的基類(lèi),并根據(jù)自身業(yè)務(wù)情況將上面的代碼放入基類(lèi)的coupute方法中。RecursiveTask和RecursiveAction都繼承了FrokJoinTask,它倆的區(qū)別就是RecursiveTask有返回值而RecursiveAction沒(méi)有。下面是我做的一個(gè)選出字符串列表中還有"a"的元素的Demo:
@Override ? protected ?List<String>?compute()?{? ????? ????if ?(end?-? this .start?<?threshold)?{? ????????List<String>?temp?=?list.subList(this .start,?end);? ????????return ?temp.parallelStream().filter(s?->?s.contains( "a" )).collect(Collectors.toList());? ????}?else ?{? ????????? ????????? ????????int ?middle?=?( this .start?+?end)?/? 2 ;? ????????ForkJoinTest?left?=?new ?ForkJoinTest(list,? this .start,?middle,?threshold);? ????????ForkJoinTest?right?=?new ?ForkJoinTest(list,?middle,?end,?threshold);? ????????? ????????left.fork();? ????????right.fork();? ????????? ????????List<String>?join?=?left.join();? ????????join.addAll(right.join());? ????????return ?join;? ????}? }?
(2)執(zhí)行類(lèi)
做好了基類(lèi)就可以開(kāi)始調(diào)用了,調(diào)用時(shí)首先我們需要Fork/Join線程池ForkJoinPool,然后向線程池中提交一個(gè)ForkJoinTask并得到結(jié)果。ForkJoinPool的submit方法的入?yún)⑹且粋€(gè)ForkJoinTask,返回值也是一個(gè)ForkJoinTask,它提供一個(gè)get方法可以獲取到執(zhí)行結(jié)果。
代碼如下:
ForkJoinPool?pool?=? new ?ForkJoinPool();? ? ForkJoinTask<List<String>>?future?=?pool.submit(forkJoinService);? System.out.println(future.get());? ? pool.shutdown();?
就這樣我們就完成了一個(gè)簡(jiǎn)單的Fork/Join的開(kāi)發(fā)。
提示:Java8中java.util.Arrays的parallelSort()方法和java.util.streams包中封裝的方法也都用到了Fork/Join。(細(xì)心的讀者可能注意到我在Fork/Join中也有用到stream,所以其實(shí)這個(gè)Fork/Join是多余的,因?yàn)閟tream已經(jīng)實(shí)現(xiàn)了Fork/Join,不過(guò)這只是一個(gè)Demo展示,沒(méi)有任何實(shí)際用處也就無(wú)所謂了)
引用官方原文:
One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems.
Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release.
附完整代碼以便以后參考:
1. 定義抽象類(lèi)(用于拓展,此例中沒(méi)有實(shí)際作用,可以不定義此類(lèi)):
import ?java.util.concurrent.RecursiveTask;? ? ? ? ? ? ? ? public ? abstract ? class ?ForkJoinService<T>? extends ?RecursiveTask<T>{? ????@Override ? ????protected ? abstract ?T?compute();? }?
2. 定義基類(lèi)
import ?java.util.List;? import ?java.util.stream.Collectors;? ? ? ? ? ? ? ? public ? class ?ForkJoinTest? extends ?ForkJoinService<List<String>>?{? ? ????private ? static ?ForkJoinTest?forkJoinTest;? ????private ? int ?threshold;?? ? ????private ?List<String>?list;? ? ? ????private ?ForkJoinTest(List<String>?list,? int ?threshold)?{? ????????this .list?=?list;? ????????this .threshold?=?threshold;? ????}? ? ????@Override ? ????protected ?List<String>?compute()?{? ????????? ????????if ?(list.size()?<?threshold)?{? ????????????return ?list.parallelStream().filter(s?->?s.contains( "a" )).collect(Collectors.toList());? ????????}?else ?{? ????????????? ????????????int ?middle?=?list.size()?/? 2 ;? ????????????List<String>?leftList?=?list.subList(0 ,?middle);? ????????????List<String>?rightList?=?list.subList(middle,?list.size());? ????????????ForkJoinTest?left?=?new ?ForkJoinTest(leftList,?threshold);? ????????????ForkJoinTest?right?=?new ?ForkJoinTest(rightList,?threshold);? ????????????? ????????????left.fork();? ????????????right.fork();? ????????????? ????????????List<String>?join?=?left.join();? ????????????join.addAll(right.join());? ????????????return ?join;? ????????}? ????}? ? ????? ? ? ? ? ? ????public ? static ?ForkJoinService<List<String>>?getInstance(List<String>?list,? int ?threshold)?{? ????????if ?(forkJoinTest?==? null )?{? ????????????synchronized ?(ForkJoinTest. class )?{? ????????????????if ?(forkJoinTest?==? null )?{? ????????????????????forkJoinTest?=?new ?ForkJoinTest(list,?threshold);? ????????????????}? ????????????}? ????????}? ????????return ?forkJoinTest;? ????}? }?
3. 執(zhí)行類(lèi)?
import ?java.util.ArrayList;? import ?java.util.Arrays;? import ?java.util.List;? import ?java.util.concurrent.ExecutionException;? import ?java.util.concurrent.ForkJoinPool;? import ?java.util.concurrent.ForkJoinTask;? ? ? ? ? ? ? ? public ? class ?Test?{? ? ????public ? static ? void ?main(String?args[])? throws ?ExecutionException,?InterruptedException?{? ? ????????String[]?strings?=?{"a" ,? "ah" ,? "b" ,? "ba" ,? "ab" ,? "ac" ,? "sd" ,? "fd" ,? "ar" ,? "te" ,? "se" ,? "te" ,? ????????????????"sdr" ,? "gdf" ,? "df" ,? "fg" ,? "gh" ,? "oa" ,? "ah" ,? "qwe" ,? "re" ,? "ty" ,? "ui" };? ????????List<String>?stringList?=?new ?ArrayList<>(Arrays.asList(strings));? ? ????????ForkJoinPool?pool?=?new ?ForkJoinPool();? ????????ForkJoinService<List<String>>?forkJoinService?=?ForkJoinTest.getInstance(stringList,?20 );? ????????? ????????ForkJoinTask<List<String>>?future?=?pool.submit(forkJoinService);? ????????System.out.println(future.get());? ????????? ????????pool.shutdown();? ? ????}? ? }? from: http://mdsa.51cto.com/art/201708/547101.htm
總結(jié)
以上是生活随笔 為你收集整理的浅谈Java的Fork/Join并发框架 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。