日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable

發(fā)布時(shí)間:2023/12/6 java 59 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、Exectuor框架簡(jiǎn)介?? ? ?

?Java從1.5版本開(kāi)始,為簡(jiǎn)化多線程并發(fā)編程,引入全新的并發(fā)編程包:java.util.concurrent及其并發(fā)編程框架(Executor框架)。?Executor框架是指java 5中引入的一系列并發(fā)庫(kù)中與executor相關(guān)的一些功能類(lèi),其中包括線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他們的關(guān)系為? ?

??

?

?

在Executor框架中,使用執(zhí)行器(Exectuor)來(lái)管理Thread對(duì)象,從而簡(jiǎn)化了并發(fā)編程。

?

二、認(rèn)識(shí)Exectuor(執(zhí)行器)

1、并發(fā)編程的一種編程方式是把任務(wù)拆分為一系列的小任務(wù),即Runnable,然后將這些任務(wù)提交給一個(gè)Executor執(zhí)行,Executor.execute(Runnalbe)?。Executor在執(zhí)行時(shí)使用其內(nèi)部的線程池來(lái)完成操作。

? ? ? Executor的子接口有:ExecutorService,ScheduledExecutorService,已知實(shí)現(xiàn)類(lèi):AbstractExecutorService,ScheduledThreadPoolExecutor,ThreadPoolExecutor。

?

2、Executor屬于public類(lèi)型的接口??梢杂糜谔峤?#xff0c;管理或者執(zhí)行Runnable任務(wù)。實(shí)現(xiàn)Executor接口的class還可以控制Runnable任務(wù)執(zhí)行線程的具體細(xì)節(jié)。包括線程使用的細(xì)節(jié)、調(diào)度等。一般來(lái)說(shuō),Runnable任務(wù)開(kāi)辟在新線程中的使用方法為:new Thread(new RunnableTask())).start()

?

3、但在Executor中,可以使用Executor而不用顯示地創(chuàng)建線程。例如,可以使用以下方法創(chuàng)建線程,而不是像第2點(diǎn)中為一種任務(wù)中的每個(gè)任務(wù)都調(diào)用new Thread(...)的方法。

?

Java代碼??
  • Exectuor?executor?=?anExecutor();??
  • executor.execute(new?RunnableTask());?//?異步執(zhí)行??
  • executor.execute(new?RunnableTask());??
  • ?

    ?

    ?

    4、但是,Executor接口并沒(méi)有嚴(yán)格地要求執(zhí)行必須是異步/同步的,一切都相當(dāng)自由。在最簡(jiǎn)單的情況下,執(zhí)行程序可以在調(diào)用者的線程中立即運(yùn)行已提交的任務(wù),

    ?

    Java代碼??
  • class?DirectExecutor?implements?Executor?{????????
  • ???????public?void?execute(Runnable?r)?{????????????
  • ??????????????r.run();??????
  • ???????}????
  • }??
  • ?更常見(jiàn)的是,任務(wù)在某個(gè)不是調(diào)用者線程的線程中執(zhí)行的。如在另一個(gè)線程中啟動(dòng):

    ?

    Java代碼??
  • class?ThreadPerTaskExecutor?implements?Executor?{????????
  • ???????????public?void?execute(Runnable?r)?{????????????
  • ??????????????new?Thread(r).start();????????
  • ????????????}????
  • }??
  • ?

    ?

    ?也可以在實(shí)現(xiàn)中用另一個(gè)Executor來(lái)序列化執(zhí)行過(guò)程:

    ?

    Java代碼??
  • class?SerialExecutor?implements?Executor?{????
  • ????final?Queue<Runnable>?tasks?=?new?ArrayDeque<Runnable>();????
  • ????final?Executor?executor;????
  • ????Runnable?active;????
  • ????
  • ????SerialExecutor(Executor?executor)?{????
  • ????????this.executor?=?executor;????
  • ????}????
  • ????
  • ????public?synchronized?void?execute(final?Runnable?r)?{????
  • ????????tasks.offer(new?Runnable()?{????
  • ????????????public?void?run()?{????
  • ????????????????try?{????
  • ????????????????????r.run();????
  • ????????????????}?finally?{????
  • ????????????????????scheduleNext();????
  • ????????????????}????
  • ????????????}????
  • ????????});????
  • ????????if?(active?==?null)?{????
  • ????????????scheduleNext();????
  • ????????}????
  • ????}????
  • ????
  • ????protected?synchronized?void?scheduleNext()?{????
  • ????????if?((active?=?tasks.poll())?!=?null)?{????
  • ????????????executor.execute(active);????
  • ????????}????
  • ????}????
  • }????
  • ?

    ?

    ?

    ?

    ?5、ThreadPoolExecutor類(lèi)提供了一個(gè)可供可擴(kuò)展的線程池實(shí)現(xiàn)。Executors類(lèi)為Executor接口及其實(shí)現(xiàn)提供了便捷的工廠方法。

    ?

    6、 Executor中的方法execute。void execute(Runnable command)表示在未來(lái)的某個(gè)時(shí)間執(zhí)行給定的命令。該命令可能在新的線程、已經(jīng)入池的線程或者正在調(diào)用的線程中執(zhí)行。

    ?

    三、Executors類(lèi): 主要用于提供線程池相關(guān)的操作

    Executors類(lèi),提供了一系列工廠方法用于創(chuàng)建線程池,返回的線程池都實(shí)現(xiàn)了ExecutorService接口。

    ?1、public static ExecutorService newFiexedThreadPool(int Threads)?創(chuàng)建固定數(shù)目線程的線程池。

    ?

    2、public static ExecutorService newCachedThreadPool():創(chuàng)建一個(gè)可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果沒(méi)有可用的線程,則創(chuàng)建一個(gè)新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

    ?

    3、public static ExecutorService newSingleThreadExecutor():創(chuàng)建一個(gè)單線程化的Executor。

    ?

    4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    創(chuàng)建一個(gè)支持定時(shí)及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來(lái)替代Timer類(lèi)。

    ?

    ?四、ExecutorService與生命周期

    ?

    1、ExecutorService可以理解為程序員提供了一堆操作Executor的API

    ?

    2、ExecutorService擴(kuò)展了Executor并添加了一些生命周期管理的方法。一個(gè)Executor的生命周期有三種狀態(tài)

    運(yùn)行、關(guān)閉和終止。

    ? ? ?Executor創(chuàng)建時(shí)處于運(yùn)行狀態(tài)。當(dāng)調(diào)用ExecutorService.shutdown()后,處于關(guān)閉狀態(tài),isShutdown()方法返回true。這時(shí),不應(yīng)該再向Executor中添加任務(wù),所有已添加的任務(wù)執(zhí)行完畢后,Executor處于終止?fàn)顟B(tài),isTerminated()返回true。如果Executor處于關(guān)閉狀態(tài),往Executor提交任務(wù)會(huì)拋出unchecked exception RejectedExecutionException。

    ?

    3、本質(zhì)

    ? ? 接口ExecutorService 表述了異步執(zhí)行的機(jī)制,并且可以讓任務(wù)在后臺(tái)執(zhí)行。一個(gè)ExecutorService 實(shí)例因此特別像一個(gè)線程池。事實(shí)上,在 java.util.concurrent 包中的 ExecutorService 的實(shí)現(xiàn)就是一個(gè)線程池的實(shí)現(xiàn)。

    ?

    Java代碼??
  • ExecutorService?executorService?=?Executors.newFixedThreadPool(10);??
  • ???
  • executorService.execute(new?Runnable()?{??
  • ????public?void?run()?{??
  • ????????System.out.println("Asynchronous?task");??
  • ????}??
  • });??
  • ???
  • executorService.shutdown();??
  • ?

    ?

    ? ?該示例代碼首先使用 newFixedThreadPool() 工廠方法創(chuàng)建一個(gè)ExecutorService ,上述代碼創(chuàng)建了一個(gè)可以容納10個(gè)線程任務(wù)的線程池。其次,向 execute() 方法中傳遞一個(gè)異步的 Runnable 接口的實(shí)現(xiàn),這樣做會(huì)讓 ExecutorService 中的某個(gè)線程執(zhí)行這個(gè)Runnable 線程。

    ?

    4、任務(wù)的委托

    下方展示了一個(gè)線程的把任務(wù)委托異步執(zhí)行的ExecutorService的示意圖。



    ?一旦線程把任務(wù)委托給 ExecutorService,該線程就會(huì)繼續(xù)執(zhí)行與運(yùn)行任務(wù)無(wú)關(guān)的其它任務(wù)。

    ?

    5、ExecutorService 的實(shí)現(xiàn)

    由于 ExecutorService 只是一個(gè)接口,ExecutorService 接口在 java.util.concurrent 包中有如下實(shí)現(xiàn)類(lèi):

    • ThreadPoolExecutor
    • ScheduledThreadPoolExecutor

    6、ExecutorService 使用方法

    這里有幾種不同的方式讓你將任務(wù)委托給一個(gè)ExecutorService:

    ?

    Java代碼??
  • execute(Runnable)??
  • submit(Runnable)??
  • submit(Callable)??
  • invokeAny()??
  • invokeAll()??
  • ?

    ?

    7、execute(Runnable)

    方法 execute(Runnable) 接收一個(gè)java.lang.Runnable 對(duì)象作為參數(shù),并且以異步的方式執(zhí)行它。如下是一個(gè)使用 ExecutorService 執(zhí)行 Runnable 的例子:

    ?

    Java代碼??
  • ExecutorService?executorService?=?Executors.newSingleThreadExecutor();??
  • ???
  • executorService.execute(new?Runnable()?{??
  • ????public?void?run()?{??
  • ????????System.out.println("Asynchronous?task");??
  • ????}??
  • });??
  • ???????
  • executorService.shutdown();??
  • 使用這種方式?jīng)]有辦法獲取執(zhí)行 Runnable 之后的結(jié)果,如果你希望獲取運(yùn)行之后的返回值,就必須使用接收 Callable 參數(shù)的 execute() 方法。接下來(lái)會(huì)提到。

    ?

    ?

    8、submit(Runnable)

    方法 submit(Runnable) 同樣接收一個(gè)Runnable 的實(shí)現(xiàn)作為參數(shù),但是會(huì)返回一個(gè)Future 對(duì)象。這個(gè)Future 對(duì)象可以用于判斷 Runnable 是否結(jié)束執(zhí)行。如下是一個(gè)ExecutorService 的 submit() 方法的例子:

    ?

    Java代碼??
  • Future?future?=?executorService.submit(new?Runnable()?{??
  • ????public?void?run()?{??
  • ????????System.out.println("Asynchronous?task");??
  • ????}??
  • });??
  • //如果任務(wù)結(jié)束執(zhí)行則返回?null??
  • System.out.println("future.get()="?+?future.get());??
  • ?

    ?

    9、submit(Callable)

    方法 submit(Callable) 和方法 submit(Runnable) 比較類(lèi)似,但是區(qū)別則在于它們接收不同的參數(shù)類(lèi)型。Callable 的實(shí)例與 Runnable 的實(shí)例很類(lèi)似,但是 Callable 的 call() 方法可以返回一個(gè)結(jié)果。方法 Runnable.run() 則不能返回結(jié)果。

    Callable 的返回值可以從方法 submit(Callable) 返回的 Future 對(duì)象中獲取。如下是一個(gè) ExecutorService Callable 的樣例:

    ?

    Java代碼??
  • Future?future?=?executorService.submit(new?Callable(){??
  • ????public?Object?call()?throws?Exception?{??
  • ????????System.out.println("Asynchronous?Callable");??
  • ????????return?"Callable?Result";??
  • ????}??
  • });??
  • ???
  • System.out.println("future.get()?=?"?+?future.get());??
  • ?上述樣例代碼會(huì)輸出如下結(jié)果:

    ?

    Java代碼??
  • Asynchronous?Callable??
  • future.get()?=?Callable?Result??
  • ?

    10、inVokeAny()

    方法 invokeAny() 接收一個(gè)包含 Callable 對(duì)象的集合作為參數(shù)。調(diào)用該方法不會(huì)返回 Future 對(duì)象,而是返回集合中某一個(gè)Callable 對(duì)象的結(jié)果,而且無(wú)法保證調(diào)用之后返回的結(jié)果是哪一個(gè) Callable,只知道它是這些 Callable 中一個(gè)執(zhí)行結(jié)束的 Callable 對(duì)象。如果一個(gè)任務(wù)運(yùn)行完畢或者拋出異常,方法會(huì)取消其它的 Callable 的執(zhí)行。
    以下是一個(gè)樣例:

    Java代碼??
  • ExecutorService?executorService?=?Executors.newSingleThreadExecutor();??
  • ???
  • Set<Callable<String>>?callables?=?new?HashSet<Callable<String>>();??
  • ???
  • callables.add(new?Callable<String>()?{??
  • ????public?String?call()?throws?Exception?{??
  • ????????return?"Task?1";??
  • ????}??
  • });??
  • callables.add(new?Callable<String>()?{??
  • ????public?String?call()?throws?Exception?{??
  • ????????return?"Task?2";??
  • ????}??
  • });??
  • callables.add(new?Callable<String>()?{??
  • ????public?String?call()?throws?Exception?{??
  • ????????return?"Task?3";??
  • ????}??
  • });??
  • ???
  • String?result?=?executorService.invokeAny(callables);??
  • ???
  • System.out.println("result?=?"?+?result);??
  • ???
  • executorService.shutdown();??
  • ?以上樣例代碼會(huì)打印出在給定的集合中的某一個(gè)Callable 的返回結(jié)果。嘗試運(yùn)行后發(fā)現(xiàn)每次結(jié)果都在改變。有時(shí)候返回結(jié)果是"Task 1",有時(shí)候是"Task 2",等等。

    ?

    11、invokeAll()

    方法 invokeAll() 會(huì)調(diào)用存在于參數(shù)集合中的所有 Callable 對(duì)象,并且返回一個(gè)包含 Future 對(duì)象的集合,你可以通過(guò)這個(gè)返回的集合來(lái)管理每個(gè) Callable 的執(zhí)行結(jié)果。需要注意的是,任務(wù)有可能因?yàn)楫惓6鴮?dǎo)致運(yùn)行結(jié)束,所以它可能并不是真的成功運(yùn)行了。但是我們沒(méi)有辦法通過(guò) Future 對(duì)象來(lái)了解到這個(gè)差異。

    12、ExecutorService服務(wù)的關(guān)閉

    ? ? ? 當(dāng)使用 ExecutorService 完畢之后,我們應(yīng)該關(guān)閉它,這樣才能保證線程不會(huì)繼續(xù)保持運(yùn)行狀態(tài)。?
    ? ? ? 舉例來(lái)說(shuō),如果你的程序通過(guò) main() 方法啟動(dòng),并且主線程退出了你的程序,如果還有一個(gè)活動(dòng)的 ExecutorService 存在于程序中,那么程序?qū)?huì)繼續(xù)保持運(yùn)行狀態(tài)。存在于 ExecutorService 中的活動(dòng)線程會(huì)阻止Java虛擬機(jī)關(guān)閉。?
    ? ? ? 為了關(guān)閉在 ExecutorService 中的線程,需要調(diào)用 shutdown() 方法。但ExecutorService 并不會(huì)馬上關(guān)閉,而是不再接收新的任務(wù),一旦所有的線程結(jié)束執(zhí)行當(dāng)前任務(wù),ExecutorServie 才會(huì)真的關(guān)閉。所有在調(diào)用 shutdown() 方法之前提交到 ExecutorService 的任務(wù)都會(huì)執(zhí)行。?
    ? ? ?如果你希望立即關(guān)閉 ExecutorService,你可以調(diào)用 shutdownNow() 方法。這個(gè)方法會(huì)嘗試馬上關(guān)閉所有正在執(zhí)行的任務(wù),并且跳過(guò)所有已經(jīng)提交但是還沒(méi)有運(yùn)行的任務(wù)。但是對(duì)于正在執(zhí)行的任務(wù),是否能夠成功關(guān)閉它是無(wú)法保證的,有可能他們真的被關(guān)閉掉了,也有可能它會(huì)一直執(zhí)行到任務(wù)結(jié)束。這是一個(gè)最好的嘗試。

    ?

    五、CompletionService

    ? ? ? ? 根據(jù)上面的介紹我們知道,現(xiàn)在在Java中使用多線程通常不會(huì)再使用Thread對(duì)象了。而是會(huì)用到j(luò)ava.util.concurrent包下的ExecutorService來(lái)初始化一個(gè)線程池供我們使用。使用ExecutorService類(lèi)的時(shí)候,我們常維護(hù)一個(gè)list保存submit的callable task所返回的Future對(duì)象。然后在主線程中遍歷這個(gè)list并調(diào)用Future的get()方法取到Task的返回值。

    ? ? ? ?其實(shí)除了使用ExecutorService外,還可通過(guò)CompletionService包裝ExecutorService,然后調(diào)用其take()方法去取Future對(duì)象。

    ? ? ? ?CompletionService和ExecutorService的主要的區(qū)別在于submit的task不一定是按照加入自己維護(hù)的list順序完成的。

    ? ? ? ?ExecutorService中從list中遍歷的每個(gè)Future對(duì)象并不一定處于完成狀態(tài),這時(shí)調(diào)用get()方法就會(huì)被阻塞住,如果系統(tǒng)是設(shè)計(jì)成每個(gè)線程完成后就能根據(jù)其結(jié)果繼續(xù)做后面的事,這樣對(duì)于處于list后面的但是先完成的線程就會(huì)增加了額外的等待時(shí)間。

    ? ? ? ?而CompletionService的實(shí)現(xiàn)是維護(hù)一個(gè)保存Future對(duì)象的BlockingQueue。只有當(dāng)這個(gè)Future對(duì)象狀態(tài)是結(jié)束的時(shí)候,才會(huì)加入到這個(gè)Queue中,take()方法其實(shí)就是Producer-Consumer中的Consumer。它會(huì)從Queue中取出Future對(duì)象,如果Queue是空的,就會(huì)阻塞在那里,直到有完成的Future對(duì)象加入到Queue中。所以,先完成的必定先被取出。這樣就減少了不必要的等待時(shí)間。

    ?

    六、使用Callable,Future返回結(jié)果

    ? ? ? ?Future<V>代表一個(gè)異步執(zhí)行的操作,通過(guò)get()方法可以獲得操作的結(jié)果,如果異步操作還沒(méi)有完成,則,get()會(huì)使當(dāng)前線程阻塞。FutureTask<V>實(shí)現(xiàn)了Future<V>和Runable<V>。Callable代表一個(gè)有返回值的操作。

  • Callable<Integer>?func?=?new?Callable<Integer>(){??
  • ????public?Integer?call()?throws?Exception?{??
  • ????????System.out.println("inside?callable");??
  • ????????Thread.sleep(1000);??
  • ????????return?new?Integer(8);??
  • ????}?????????
  • };????????
  • FutureTask<Integer>?futureTask??=?new?FutureTask<Integer>(func);??
  • Thread?newThread?=?new?Thread(futureTask);??
  • newThread.start();??
  • ??
  • try?{??
  • ????System.out.println("blocking?here");??
  • ????Integer?result?=?futureTask.get();??
  • ????System.out.println(result);??
  • }?catch?(InterruptedException?ignored)?{??
  • }?catch?(ExecutionException?ignored)?{??
  • }?
  • ?

    ? ? ? ?ExecutoreService提供了submit()方法,傳遞一個(gè)Callable,或Runnable,返回Future。如果Executor后臺(tái)線程池還沒(méi)有完成Callable的計(jì)算,則調(diào)用返回Future對(duì)象的get()方法,會(huì)阻塞直到計(jì)算完成。

    ? ? ? ?

    ? ? ? ?Java5以后可以利用Future來(lái)跟蹤異步計(jì)算的結(jié)果。在此之前主線程要想獲得工作線程(異步計(jì)算線程)的結(jié)果是比較麻煩的事情,需要我們進(jìn)行特殊的程序結(jié)構(gòu)設(shè)計(jì),比較繁瑣而且容易出錯(cuò)。有了Future我們就可以設(shè)計(jì)出比較優(yōu)雅的異步計(jì)算程序結(jié)構(gòu)模型:根據(jù)分而治之的思想,我們可以把異步計(jì)算的線程按照職責(zé)分為3類(lèi):

    ? ? ? 1. 異步計(jì)算的發(fā)起線程(控制線程):負(fù)責(zé)異步計(jì)算任務(wù)的分解和發(fā)起,把分解好的任務(wù)交給異步計(jì)算的work線程去執(zhí)行,發(fā)起異步計(jì)算后,發(fā)起線程可以獲得Futrue的集合,從而可以跟蹤異步計(jì)算結(jié)果。

    ? ? ? 2. 異步計(jì)算work線程:負(fù)責(zé)具體的計(jì)算任務(wù)

    ? ? ? 3. 異步計(jì)算結(jié)果收集線程:從發(fā)起線程那里獲得Future的集合,并負(fù)責(zé)監(jiān)控Future的狀態(tài),根據(jù)Future的狀態(tài)來(lái)處理異步計(jì)算的結(jié)果。



    本文轉(zhuǎn)自農(nóng)夫山泉?jiǎng)e墅博客園博客,原文鏈接:http://www.cnblogs.com/yaowen/p/6323689.html,如需轉(zhuǎn)載請(qǐng)自行聯(lián)系原作者

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的Java并发编程 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。