日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

發(fā)布時(shí)間:2023/12/2 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前記:

?

  • jdk官方文檔(javadoc)是學(xué)習(xí)的最好,最權(quán)威的參考。
  • 文章分上中下。上篇中主要介紹ThreadPoolExecutor接受任務(wù)相關(guān)的兩方面入?yún)⒌囊饬x和區(qū)別,池大小參數(shù)corePoolSize和maximumPoolSize,BlockingQueue選型(SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue);中篇中主要聊聊與keepAliveTime這個(gè)參數(shù)相關(guān)的話題;下片中介紹一下一些比較少用的該類的API,及他的近親:ScheduledThreadPoolExecutor。

  • 如果理解錯誤,請直接指出。
  • ?

    ?

    查看JDK幫助文檔,可以發(fā)現(xiàn)該類比較簡單,繼承自AbstractExecutorService,而AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口。

    ?

    ThreadPoolExecutor的完整構(gòu)造方法的簽名是:

    ?

    ThreadPoolExecutor(int?corePoolSize, int?maximumPoolSize, long?keepAliveTime, TimeUnit?unit, BlockingQueue<Runnable>?workQueue, ThreadFactory?threadFactory, RejectedExecutionHandler?handler)?

    ?

    先記著,后面慢慢解釋。

    ?

    ===============================神奇分割線==================================

    ?

    其實(shí)對于ThreadPoolExecutor的構(gòu)造函數(shù)網(wǎng)上有N多的解釋的,大多講得都很好,不過我想先換個(gè)方式,從Executors這個(gè)類入手。因?yàn)樗膸讉€(gè)構(gòu)造工廠構(gòu)造方法名字取得令人很容易了解有什么特點(diǎn)。但是其實(shí)Executors類的底層實(shí)現(xiàn)便是ThreadPoolExecutor!

    ?

    ThreadPoolExecutor是Executors類的底層實(shí)現(xiàn)。

    ?

    在JDK幫助文檔中,有如此一段話:

    強(qiáng)烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進(jìn)行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個(gè)后臺線程),它們均為大多數(shù)使用場景預(yù)定義了設(shè)置。

    ?

    可以推斷出ThreadPoolExecutor與Executors類必然關(guān)系密切。

    ?

    ===============================神奇分割線==================================

    ?

    ?

    OK,那就來看看源碼吧,從newFixedThreadPool開始。

    ?

    ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。

    ?

    可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實(shí)際上,后面會介紹,如果使用無界queue的話maximumPoolSize參數(shù)是沒有意義的),keepAliveTime和unit的設(shè)值表名什么?-就是該實(shí)現(xiàn)不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個(gè)特點(diǎn),他是無界的

    ?

    Java代碼 ?
  • public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{??
  • ????????return?new?ThreadPoolExecutor(nThreads,?nThreads,??
  • ??????????????????????????????????????0L,?TimeUnit.MILLISECONDS,??
  • ??????????????????????????????????????new?LinkedBlockingQueue<Runnable>());??
  • ????}??
  • ?

    ExecutorService newSingleThreadExecutor():單線程。

    ?

    可以看到,與fixedThreadPool很像,只不過fixedThreadPool中的入?yún)⒅苯油嘶癁?

    ?

    ?

    Java代碼 ?
  • public?static?ExecutorService?newSingleThreadExecutor()?{??
  • ????????return?new?FinalizableDelegatedExecutorService??
  • ????????????(new?ThreadPoolExecutor(1,?1,??
  • ????????????????????????????????????0L,?TimeUnit.MILLISECONDS,??
  • ????????????????????????????????????new?LinkedBlockingQueue<Runnable>()));??
  • ????}??
  • ?

    ?

    ExecutorService newCachedThreadPool():無界線程池,可以進(jìn)行自動線程回收。

    ?

    這個(gè)實(shí)現(xiàn)就有意思了。首先是無界的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對于該BlockingQueue有些陌生,簡單說:該QUEUE中,每個(gè)插入操作必須等待另一個(gè)

    線程的對應(yīng)移除操作。比如,我先添加一個(gè)元素,接下來如果繼續(xù)想嘗試添加則會阻塞,直到另一個(gè)線程取走一個(gè)元素,反之亦然。(想到什么?就是緩沖區(qū)為1的生產(chǎn)者消費(fèi)者模式^_^)

    注意到介紹中的自動回收線程的特性嗎,為什么呢?先不說,但注意到該實(shí)現(xiàn)中corePoolSize和maximumPoolSize的大小不同。

    ?

    ?

    Java代碼 ?
  • public?static?ExecutorService?newCachedThreadPool()?{??
  • ????????return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,??
  • ??????????????????????????????????????60L,?TimeUnit.SECONDS,??
  • ??????????????????????????????????????new?SynchronousQueue<Runnable>());??
  • ????}??
  • ?

    ===============================神奇分割線==================================

    ?

    到此如果有很多疑問,那是必然了(除非你也很了解了)

    ?

    先從BlockingQueue<Runnable>?workQueue這個(gè)入?yún)㈤_始說起。在JDK中,其實(shí)已經(jīng)說得很清楚了,一共有三種類型的queue。以下為引用:(我會稍微修改一下,并用紅色突出顯示)

    ?

    ?

    所有 BlockingQueue 都可用于傳輸和保持提交的任務(wù)。可以使用此隊(duì)列與池大小進(jìn)行交互:
    • 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。(什么意思?如果當(dāng)前運(yùn)行的線程小于corePoolSize,則任務(wù)根本不會存放,添加到queue中,而是直接抄家伙(thread)開始運(yùn)行
    • 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊(duì)列而不添加新的線程
    • 如果無法將請求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。
    先不著急舉例子,因?yàn)槭紫刃枰纐ueue上的三種類型。
    排隊(duì)有三種通用策略:
  • 直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長的可能性。
  • 無界隊(duì)列。使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長的可能性。
  • 有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。 ?
  • ?

    ===============================神奇分割線==================================

    ?

    到這里,該了解的理論已經(jīng)夠多了,可以調(diào)節(jié)的就是corePoolSize和maximumPoolSizes 這對參數(shù)還有就是BlockingQueue的選擇。

    ?

    例子一:使用直接提交策略,也即SynchronousQueue。

    ?

    首先SynchronousQueue是無界的,也就是說他存數(shù)任務(wù)的能力是沒有限制的,但是由于該Queue本身的特性在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加。在這里不是核心線程便是新創(chuàng)建的線程,但是我們試想一樣下,下面的場景。

    ?

    我們使用一下參數(shù)構(gòu)造ThreadPoolExecutor:

    ?

    ?

    Java代碼 ?
  • new?ThreadPoolExecutor(??
  • ????????????2,?3,?30,?TimeUnit.SECONDS,???
  • ????????????new?<span?style="white-space:?normal;">SynchronousQueue</span><Runnable>(),???
  • ????????????new?RecorderThreadFactory("CookieRecorderPool"),???
  • ????????????new?ThreadPoolExecutor.CallerRunsPolicy());??
  • ?當(dāng)核心線程已經(jīng)有2個(gè)正在運(yùn)行.

    ?

  • 此時(shí)繼續(xù)來了一個(gè)任務(wù)(A),根據(jù)前面介紹的“如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊(duì)列而不添加新的線程。”,所以A被添加到queue中。
  • 又來了一個(gè)任務(wù)(B),且核心2個(gè)線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由于使用的SynchronousQueue,所以一定無法加入進(jìn)去。
  • 此時(shí)便滿足了上面提到的“如果無法將請求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出maximumPoolSize,在這種情況下,任務(wù)將被拒絕。”,所以必然會新建一個(gè)線程來運(yùn)行這個(gè)任務(wù)。
  • 暫時(shí)還可以,但是如果這三個(gè)任務(wù)都還沒完成,連續(xù)來了兩個(gè)任務(wù),第一個(gè)添加入queue中,后一個(gè)呢?queue中無法插入,而線程數(shù)達(dá)到了maximumPoolSize,所以只好執(zhí)行異常策略了。
  • 所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發(fā)生(如果希望限制就直接使用有界隊(duì)列)。對于使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內(nèi)部依賴性的請求集時(shí)出現(xiàn)鎖
    什么意思?如果你的任務(wù)A1,A2有內(nèi)部關(guān)聯(lián),A1需要先運(yùn)行,那么先提交A1,再提交A2,當(dāng)使用SynchronousQueue我們可以保證,A1必定先被執(zhí)行,在A1么有被執(zhí)行前,A2不可能添加入queue中
    例子二:使用無界隊(duì)列策略,即LinkedBlockingQueue
    這個(gè)就拿newFixedThreadPool來說,根據(jù)前文提到的規(guī)則: 寫道 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。 那么當(dāng)任務(wù)繼續(xù)增加,會發(fā)生什么呢? 寫道 ? 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊(duì)列,而不添加新的線程。

    ?OK,此時(shí)任務(wù)變加入隊(duì)列之中了,那什么時(shí)候才會添加新線程呢?

    ?

    寫道 如果無法將請求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。

    這里就很有意思了,可能會出現(xiàn)無法加入隊(duì)列嗎?不像SynchronousQueue那樣有其自身的特點(diǎn),對于無界隊(duì)列來說,總是可以加入的(資源耗盡,當(dāng)然另當(dāng)別論)。換句說,永遠(yuǎn)也不會觸發(fā)產(chǎn)生新的線程!corePoolSize大小的線程數(shù)會一直運(yùn)行,忙完當(dāng)前的,就從隊(duì)列中拿任務(wù)開始運(yùn)行。所以要防止任務(wù)瘋長,比如任務(wù)運(yùn)行的實(shí)行比較長,而添加任務(wù)的速度遠(yuǎn)遠(yuǎn)超過處理任務(wù)的時(shí)間,而且還不斷增加,如果任務(wù)內(nèi)存大一些,不一會兒就爆了,呵呵。

    ?

    可以仔細(xì)想想哈。

    ?

    例子三:有界隊(duì)列,使用ArrayBlockingQueue。

    ?

    這個(gè)是最為復(fù)雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點(diǎn)便是可以防止資源耗盡的情況發(fā)生。

    ?

    舉例來說,請看如下構(gòu)造方法:

    ?

    Java代碼 ?
  • new?ThreadPoolExecutor(??
  • ????????????2,?4,?30,?TimeUnit.SECONDS,???
  • ????????????new?ArrayBlockingQueue<Runnable>(2),???
  • ????????????new?RecorderThreadFactory("CookieRecorderPool"),???
  • ????????????new?ThreadPoolExecutor.CallerRunsPolicy());??
  • 假設(shè),所有的任務(wù)都永遠(yuǎn)無法執(zhí)行完。

    ?

    對于首先來的A,B來說直接運(yùn)行,接下來,如果來了C,D,他們會被放到queu中,如果接下來再來E,F,則增加線程運(yùn)行E,F。但是如果再來任務(wù),隊(duì)列無法再接受了,線程數(shù)也到達(dá)最大的限制了,所以就會使用拒絕策略來處理。

    ?

    總結(jié):

  • ThreadPoolExecutor的使用還是很有技巧的。
  • 使用無界queue可能會耗盡系統(tǒng)資源。
  • 使用有界queue可能不能很好的滿足性能,需要調(diào)節(jié)線程數(shù)和queue大小
  • 線程數(shù)自然也有開銷,所以需要根據(jù)不同應(yīng)用進(jìn)行調(diào)節(jié)。
  • 通常來說對于靜態(tài)任務(wù)可以歸為:
  • 數(shù)量大,但是執(zhí)行時(shí)間很短
  • 數(shù)量小,但是執(zhí)行時(shí)間較長
  • 數(shù)量又大執(zhí)行時(shí)間又長
  • 除了以上特點(diǎn)外,任務(wù)間還有些內(nèi)在關(guān)系
  • 看完這篇問文章后,希望能夠可以選擇合適的類型了

    總結(jié)

    以上是生活随笔為你收集整理的ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。