日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

(转)ThreadPoolExecutor最佳实践--如何选择队列

發(fā)布時(shí)間:2023/12/3 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 (转)ThreadPoolExecutor最佳实践--如何选择队列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

轉(zhuǎn)自:?https://blog.hufeifei.cn/2018/08/12/Java/ThreadPoolExecutor%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5--%E5%A6%82%E4%BD%95%E9%80%89%E6%8B%A9%E9%98%9F%E5%88%97/?

?

前一篇文章《如何選擇線程數(shù)》講了如何決定線程池中線程個(gè)數(shù),這篇文章討論“如何選擇工作隊(duì)列”。

再次強(qiáng)調(diào)一下,ThreadPoolExecutor最核心的四點(diǎn):

1、當(dāng)有任務(wù)提交的時(shí)候,會(huì)創(chuàng)建核心線程去執(zhí)行任務(wù)(即使有核心線程空閑);

2、當(dāng)核心線程數(shù)達(dá)到corePoolSize時(shí),后續(xù)提交的都會(huì)進(jìn)BlockingQueue中排隊(duì);

3、當(dāng)BlockingQueue滿了(offer失敗),就會(huì)創(chuàng)建臨時(shí)線程(臨時(shí)線程空閑超過一定時(shí)間后,會(huì)被銷毀);

4、當(dāng)線程總數(shù)達(dá)到maximumPoolSize時(shí),后續(xù)提交的任務(wù)都會(huì)被RejectedExecutionHandler拒絕。

1、BlockingQueue

線程池中工作隊(duì)列由BlockingQueue實(shí)現(xiàn)類提供功能,BlockingQueue定義了這么幾組方法:

Summary of BlockingQueue methods
?Throws exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

阻塞隊(duì)列是最典型的“生產(chǎn)者消費(fèi)者”模型:

  • 生產(chǎn)者調(diào)用put()方法將生產(chǎn)的元素入隊(duì),消費(fèi)者調(diào)用take()方法;
  • 當(dāng)隊(duì)列滿了,生產(chǎn)者調(diào)用的put()方法會(huì)阻塞,直到隊(duì)列有空間可入隊(duì);
  • 當(dāng)隊(duì)列為空,消費(fèi)者調(diào)用的get()方法會(huì)阻塞,直到隊(duì)列有元素可消費(fèi);

但是需要十分注意的是:ThreadPoolExecutor提交任務(wù)時(shí)使用offer方法(不阻塞),工作線程從隊(duì)列取任務(wù)使用take方法(阻塞)。正是因?yàn)門hreadPoolExecutor使用了不阻塞的offer方法,所以當(dāng)隊(duì)列容量已滿,線程池會(huì)去創(chuàng)建新的臨時(shí)線程;同樣因?yàn)楣ぷ骶€程使用take()方法取任務(wù),所以當(dāng)沒有任務(wù)可取的時(shí)候線程池的線程將會(huì)空閑阻塞。

事實(shí)上,工作線程的超時(shí)銷毀是調(diào)用offer(e, time, unit)實(shí)現(xiàn)的。

2、JDK提供的阻塞隊(duì)列實(shí)現(xiàn)

JDK中提供了以下幾個(gè)BlockingQueue實(shí)現(xiàn)類:

?

2.1、ArrayBlockingQueue

這是一個(gè)由數(shù)組實(shí)現(xiàn)容量固定的有界阻塞隊(duì)列。這個(gè)隊(duì)列的實(shí)現(xiàn)非常簡(jiǎn)單:

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x; // 入隊(duì)if (++putIndex == items.length) // 如果指針到了末尾putIndex = 0; // 下一個(gè)入隊(duì)的位置變?yōu)?count++;notEmpty.signal(); // 提醒消費(fèi)者線程消費(fèi) } private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null; // 出隊(duì)置空if (++takeIndex == items.length) // 如果指針到了末尾takeIndex = 0; // 下一個(gè)出隊(duì)的位置變?yōu)?count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); // 提醒生產(chǎn)者線程生產(chǎn)return x; }

通過簡(jiǎn)單的指針循環(huán)實(shí)現(xiàn)了一個(gè)環(huán)形隊(duì)列:

下面有一張維基百科關(guān)于環(huán)形緩沖區(qū)的的動(dòng)畫,雖然動(dòng)畫描述內(nèi)容與ArrayBlockingQueue實(shí)現(xiàn)有所差異,但貴在生動(dòng)形象(著實(shí)找不到更好的動(dòng)畫了)。

ArrayBlockingQueue主要復(fù)雜在迭代,允許迭代中修改隊(duì)列(刪除元素時(shí)會(huì)更新迭代器),并不會(huì)拋出ConcurrentModificationException;好在大多數(shù)場(chǎng)景中我們不會(huì)迭代阻塞隊(duì)列。

2.2、SynchronousQueue

這是一個(gè)非常有意思的集合,更準(zhǔn)確的說它并不是一個(gè)集合容器,因?yàn)?strong>它沒有容量。你可以“偷偷地”把它看作new ArrayBlockingQueue(0),之所以用”偷偷地”這么齷齪的詞,首先是因?yàn)锳rrayBlockingQueue在capacity<1時(shí)會(huì)拋異常,其次ArrayBlockingQueue(0)并不能實(shí)現(xiàn)SynchronousQueue這么強(qiáng)大的功能。

正如SynchronousQueue的名字所描述一樣——“同步隊(duì)列”,它專門用于生產(chǎn)者線程與消費(fèi)者線程之間的同步

  • 因?yàn)樗魏螘r(shí)候都是空的,所以消費(fèi)者線程調(diào)用take()方法的時(shí)候就會(huì)發(fā)生阻塞,直到有一個(gè)生產(chǎn)者線程生產(chǎn)了一個(gè)元素,消費(fèi)者線程就可以拿到這個(gè)元素并返回。
  • 同樣的,你也可以認(rèn)為任何時(shí)候都是滿的,所以生產(chǎn)者線程調(diào)用put()方法的時(shí)候就會(huì)發(fā)生阻塞,直到有一個(gè)消費(fèi)者線程消費(fèi)了一個(gè)元素,生產(chǎn)者才會(huì)返回。

另外還有幾點(diǎn)需要注意:

  • SynchronousQueue不能遍歷,因?yàn)樗鼪]有元素可以遍歷;
  • 所有的阻塞隊(duì)列都不允許插入null元素,因?yàn)楫?dāng)生產(chǎn)者生產(chǎn)了一個(gè)null的時(shí)候,消費(fèi)者調(diào)用poll()返回null,無(wú)法判斷是生產(chǎn)者生產(chǎn)了一個(gè)null元素,還是隊(duì)列本身就是空。

CachedThreadPool使用的就是同步隊(duì)列

Copy

1 2 3 4 5 public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }

因?yàn)镾ynchronousQueue無(wú)容量的特性,所以CachedThreadPool不會(huì)對(duì)任務(wù)進(jìn)行排隊(duì),如果線程池中沒有空閑線程,CachedThreadPool會(huì)立即創(chuàng)建一個(gè)新線程來(lái)接收這個(gè)任務(wù)。

所以使用CachedThreadPool要注意避免提交長(zhǎng)時(shí)間阻塞的任務(wù),可能會(huì)由于線程數(shù)過多而導(dǎo)致內(nèi)存溢出(OutOfOutOfMemoryError)。

2.3、LinkedBlockingQueue

這是一個(gè)由單鏈表實(shí)現(xiàn)默認(rèn)無(wú)界的阻塞隊(duì)列。LinkedBlockingQueue提供了一個(gè)可選有界的構(gòu)造函數(shù),而在未指明容量時(shí),容量默認(rèn)為Integer.MAX_VALUE。

按照官方文檔的說法LinkedBlockingQueue是一種可選有界(optionally-bounded)阻塞隊(duì)列

SingleThreadPool和FixedThreadPool使用的就是LinkedBlockingQueue

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory)); }

因?yàn)镕ixedThreadPool使用無(wú)界的LinkedBlockingQueue,所以當(dāng)沒有線程空閑時(shí),新提交的任務(wù)都會(huì)提交到阻塞隊(duì)列中,由于隊(duì)列永遠(yuǎn)也不會(huì)滿,FixedThreadPool永遠(yuǎn)也不會(huì)創(chuàng)建新的臨時(shí)線程。

但是需要注意的是,不要往FixedThreadPool提交過多的任務(wù),因?yàn)樗形刺幚淼娜蝿?wù)都會(huì)到LinkedBlockingQueue中排隊(duì),隊(duì)列中任務(wù)過多也可能會(huì)導(dǎo)致內(nèi)存溢出。雖然這個(gè)過程會(huì)比較緩慢,因?yàn)殛?duì)列中的請(qǐng)求所占用的資源比線程占用的資源要少得多。

2.4、其他隊(duì)列

DelayQueue和PriorityBlockingQueue底層都是使用二叉堆實(shí)現(xiàn)優(yōu)先級(jí)阻塞隊(duì)列

區(qū)別在于:

  • 前者要求隊(duì)列中的元素實(shí)現(xiàn)Delayed接口,通過執(zhí)行時(shí)延從隊(duì)列中提取任務(wù),時(shí)間沒到任務(wù)取不出來(lái);
  • 后者對(duì)元素沒有要求,可以實(shí)現(xiàn)Comparable接口也可以提供Comparator來(lái)對(duì)隊(duì)列中的元素進(jìn)行比較,跟時(shí)間沒有任何關(guān)系,僅僅是按照優(yōu)先級(jí)取任務(wù)。

當(dāng)我們提交的任務(wù)有優(yōu)先順序時(shí)可以考慮選用這兩種隊(duì)列

事實(shí)上ScheduledThreadPoolExecutor內(nèi)部實(shí)現(xiàn)了一個(gè)類似于DelayQueue的隊(duì)列。

除了這兩個(gè),BlockingQueue還有兩個(gè)子接口BlockingDeque(雙端阻塞隊(duì)列),TransferQueue(傳輸隊(duì)列)

并且兩個(gè)接口都有自己唯一的實(shí)現(xiàn)類:

?

  • LinkedBlockingDeque:使用雙向隊(duì)列實(shí)現(xiàn)的雙端阻塞隊(duì)列,雙端意味著可以像普通隊(duì)列一樣FIFO(先進(jìn)先出),可以以像棧一樣FILO(先進(jìn)后出)
  • LinkedTransferQueue:它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的結(jié)合體,但是把它用在ThreadPoolExecutor中,和無(wú)限制的LinkedBlockingQueue行為一致。

?

3、讓生產(chǎn)者阻塞的線程池

前面說到CachedThreadPool和FixedThreadPool都有可能導(dǎo)致內(nèi)存溢出,前者是由于線程數(shù)過多,后者是由于隊(duì)列任務(wù)過多。而究其根本就是因?yàn)槿蝿?wù)生產(chǎn)速度遠(yuǎn)大于線程池處理任務(wù)的速度。

所以有一個(gè)想法就是讓生產(chǎn)任務(wù)的線程在任務(wù)處理不過來(lái)的時(shí)候休息一會(huì)兒——也就是阻塞住任務(wù)生產(chǎn)者。

但是前面提到過ThreadPoolExecutor內(nèi)部將任務(wù)提交到隊(duì)列時(shí),使用的是不阻塞的offer方法。

我提供的第一種方式是:重寫offer方法把它變成阻塞式。

3.1、重寫B(tài)lockingQueue的offer

這種處理方式是將原來(lái)非阻塞的offer覆蓋,使用阻塞的put方法實(shí)現(xiàn)。

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Override public void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2) {@Override public boolean offer(Runnable runnable) {try {super.put(runnable); // 使用阻塞的put重寫offer方法} catch (InterruptedException e) {e.printStackTrace();}return true;}});threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}}

執(zhí)行的過程中會(huì)發(fā)現(xiàn)Task5要等到線程池中的一個(gè)任務(wù)執(zhí)行完成后,才能提交成功。

這種方式把BlockingQueue的行為修改了,這時(shí)線程池的maximumPoolSize形同虛設(shè),因?yàn)門hreadPoolExecutor調(diào)用offer入隊(duì)失敗返回false后才會(huì)創(chuàng)建臨時(shí)線程。現(xiàn)在offer改成了阻塞式的,實(shí)際上永遠(yuǎn)是返回true,所以永遠(yuǎn)都不會(huì)創(chuàng)建臨時(shí)線程,maximumPoolSize的限制也就沒有什么意義了。

3.2、重寫拒絕策略

在介紹第二種方式之前,先簡(jiǎn)單介紹JDK中提供了四種拒絕策略:

?

  • AbortPolicy——拋出RejectedExecutionException異常的方式拒絕任務(wù)。
  • DiscardPolicy——什么都不干,靜默地丟棄任務(wù)
  • DiscardOldestPolicy——把隊(duì)列中最老的任務(wù)拿出來(lái)扔掉
  • CallerRunsPolicy——在任務(wù)提交的線程把任務(wù)給執(zhí)行了

ThreadPoolExecutor默認(rèn)使用AbortPolicy

DiscardPolicy和DiscardOldestPolicy兩種策略看上去都不怎么靠譜,除非真有這種特別的需求,比如客戶端應(yīng)用中網(wǎng)絡(luò)請(qǐng)求擁堵(服務(wù)端宕機(jī)或網(wǎng)絡(luò)不通暢)的話可以選擇拋棄最老的請(qǐng)求,大多數(shù)情況還是使用默認(rèn)的拒絕策略。

我們的第二種做法就是寫一個(gè)自己的RejectedExecutionHandler。這種方式相對(duì)“溫柔”一些,在線程池提交任務(wù)的最后一步——被線程池拒絕的任務(wù),可以在拒絕后調(diào)用隊(duì)列的put()方法,讓任務(wù)的提交者阻塞,直到隊(duì)列中任務(wù)被被線程池執(zhí)行后,隊(duì)列有了多余空間,調(diào)用方才返回。

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}private static class BlockCallerPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {executor.getQueue().put(r);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2), new BlockCallerPolicy());threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}}

使用這種方式的好處是線程池仍可以設(shè)置maximumPoolSize,當(dāng)任務(wù)入隊(duì)失敗仍可以創(chuàng)建臨時(shí)線程執(zhí)行任務(wù),只有當(dāng)線程總數(shù)大于maximumPoolSize時(shí),任務(wù)才會(huì)被拒絕。

4、Tomcat中的線程池

作為一個(gè)最常用的Java應(yīng)用服務(wù)器之一,Tomcat中線程池還是值得我們借鑒學(xué)習(xí)的。

注意下面代碼來(lái)自Tomcat8.5.27,版本不同實(shí)現(xiàn)可能略有差異

org.apache.catelina.core.StandardThreadExecutor

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class StandardThreadExecutor extends LifecycleMBeanBaseimplements Executor, ResizableExecutor {// Tomcat線程池默認(rèn)的配置protected int threadPriority = Thread.NORM_PRIORITY;protected boolean daemon = true;protected String namePrefix = "tomcat-exec-";protected int maxThreads = 200;protected int minSpareThreads = 25;protected int maxIdleTime = 60000;...protected boolean prestartminSpareThreads = false;protected int maxQueueSize = Integer.MAX_VALUE;protected void startInternal() throws LifecycleException {// 任務(wù)隊(duì)列:這里你看到的是一個(gè)無(wú)界隊(duì)列,但是隊(duì)列里面進(jìn)行了特殊處理taskqueue = new TaskQueue(maxQueueSize);TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());// 創(chuàng)建線程池,這里的ThreadPoolExecutor是Tomcat繼承自JDK的ThreadPoolExecutorexecutor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), // 核心線程數(shù)與最大線程數(shù)maxIdleTime, TimeUnit.MILLISECONDS, // 默認(rèn)6萬(wàn)毫秒的超時(shí)時(shí)間,也就是一分鐘taskqueue, tf); // 玄機(jī)在任務(wù)隊(duì)列的設(shè)置executor.setThreadRenewalDelay(threadRenewalDelay);if (prestartminSpareThreads) {executor.prestartAllCoreThreads(); // 預(yù)熱所有的核心線程}taskqueue.setParent(executor);setState(LifecycleState.STARTING);}... }

org.apache.tomcat.util.threads.ThreadPoolExecutor

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {private final AtomicInteger submittedCount = new AtomicInteger(0);private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);@Overrideprotected void afterExecute(Runnable r, Throwable t) {submittedCount.decrementAndGet(); // 執(zhí)行完成后提交數(shù)量減一if (t == null) {// 如果有必要拋個(gè)異常讓線程終止stopCurrentThreadIfNeeded();}}@Overridepublic void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet(); // 提交時(shí)數(shù)量加一try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {// 如果任務(wù)被拒絕,則強(qiáng)制入隊(duì)if (!queue.force(command, timeout, unit)) {// 由于TaskQueue默認(rèn)無(wú)界,所以默認(rèn)強(qiáng)制入隊(duì)會(huì)成功submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet(); // 任務(wù)被拒絕,任務(wù)數(shù)減一throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet(); // 任務(wù)被拒絕,任務(wù)數(shù)減一throw rx;}}} }

org.apache.tomcat.util.threads.TaskQueue

Copy

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class TaskQueue extends LinkedBlockingQueue<Runnable> {private volatile ThreadPoolExecutor parent = null;public boolean force(Runnable o) {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");// 因?yàn)長(zhǎng)inkedBlockingQueue無(wú)界,所以調(diào)用offer強(qiáng)制入隊(duì)return super.offer(o);}public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit);}@Overridepublic boolean offer(Runnable o) {// 不是上面Tomcat中定義地ThreadPoolExecutor,不做任何檢查if (parent==null) return super.offer(o);// 線程數(shù)達(dá)到最大線程數(shù),嘗試入隊(duì)if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);// 提交的任務(wù)數(shù)小于線程數(shù),也就是有空余線程,入隊(duì)讓空閑線程取任務(wù)if (parent.getSubmittedCount() < parent.getPoolSize()) return super.offer(o);// 走到這說明線程池沒有空閑線程// 這里返回false,改變了LinkedBlockingQueue默認(rèn)的行為// 使得Tomcat可以創(chuàng)建臨時(shí)線程if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 到這里說明臨時(shí)線程也沒有空閑,只能排隊(duì)了return super.offer(o);} }

Tomcat的線程池?cái)U(kuò)展了JDK線程池的功能,主要體現(xiàn)在兩點(diǎn):

  • Tomcat的ThreadPoolExecutor使用的TaskQueue,是無(wú)界的LinkedBlockingQueue,但是通過taskQueue的offer方法覆蓋了LinkedBlockingQueue的offer方法,改寫了規(guī)則,使得線程池能在任務(wù)較多的情況下增長(zhǎng)線程池?cái)?shù)量——JDK是先排隊(duì)再漲線程池,Tomcat則是先漲線程池再排隊(duì)。
  • Tomcat的ThreadPoolExecutor改寫了execute方法,當(dāng)任務(wù)被reject時(shí),捕獲異常,并強(qiáng)制入隊(duì)。

參考鏈接:

支持生產(chǎn)阻塞的線程池 :http://ifeve.com/blocking-threadpool-executor/

Disruptor框架:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf

線程池調(diào)整的重要性:https://blog.bramp.net/post/2015/12/17/the-importance-of-tuning-your-thread-pools/

線程池調(diào)整的重要性(譯):http://www.importnew.com/17633.html

SynchronousQueue與TransferQueue的區(qū)別:https://stackoverflow.com/questions/7317579/difference-between-blockingqueue-and-transferqueue/7317650

Tomcat配置線程池:https://tomcat.apache.org/tomcat-8.5-doc/config/executor.html

總結(jié)

以上是生活随笔為你收集整理的(转)ThreadPoolExecutor最佳实践--如何选择队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。