(转)ThreadPoolExecutor最佳实践--如何选择队列
轉自:?https://blog.hufeifei.cn/2018/08/12/Java/ThreadPoolExecutor%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5--%E5%A6%82%E4%BD%95%E9%80%89%E6%8B%A9%E9%98%9F%E5%88%97/?
?
前一篇文章《如何選擇線程數》講了如何決定線程池中線程個數,這篇文章討論“如何選擇工作隊列”。
再次強調一下,ThreadPoolExecutor最核心的四點:
1、當有任務提交的時候,會創建核心線程去執行任務(即使有核心線程空閑);
2、當核心線程數達到corePoolSize時,后續提交的都會進BlockingQueue中排隊;
3、當BlockingQueue滿了(offer失敗),就會創建臨時線程(臨時線程空閑超過一定時間后,會被銷毀);
4、當線程總數達到maximumPoolSize時,后續提交的任務都會被RejectedExecutionHandler拒絕。
1、BlockingQueue
線程池中工作隊列由BlockingQueue實現類提供功能,BlockingQueue定義了這么幾組方法:
| ? | Throws exception | Special value | Blocks | Times out |
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek() | not applicable | not applicable |
阻塞隊列是最典型的“生產者消費者”模型:
- 生產者調用put()方法將生產的元素入隊,消費者調用take()方法;
- 當隊列滿了,生產者調用的put()方法會阻塞,直到隊列有空間可入隊;
- 當隊列為空,消費者調用的get()方法會阻塞,直到隊列有元素可消費;
但是需要十分注意的是:ThreadPoolExecutor提交任務時使用offer方法(不阻塞),工作線程從隊列取任務使用take方法(阻塞)。正是因為ThreadPoolExecutor使用了不阻塞的offer方法,所以當隊列容量已滿,線程池會去創建新的臨時線程;同樣因為工作線程使用take()方法取任務,所以當沒有任務可取的時候線程池的線程將會空閑阻塞。
事實上,工作線程的超時銷毀是調用offer(e, time, unit)實現的。
2、JDK提供的阻塞隊列實現
JDK中提供了以下幾個BlockingQueue實現類:
?
2.1、ArrayBlockingQueue
這是一個由數組實現的容量固定的有界阻塞隊列。這個隊列的實現非常簡單:
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x; // 入隊if (++putIndex == items.length) // 如果指針到了末尾putIndex = 0; // 下一個入隊的位置變為0count++;notEmpty.signal(); // 提醒消費者線程消費 } private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null; // 出隊置空if (++takeIndex == items.length) // 如果指針到了末尾takeIndex = 0; // 下一個出隊的位置變為0count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); // 提醒生產者線程生產return x; } |
通過簡單的指針循環實現了一個環形隊列:
下面有一張維基百科關于環形緩沖區的的動畫,雖然動畫描述內容與ArrayBlockingQueue實現有所差異,但貴在生動形象(著實找不到更好的動畫了)。
ArrayBlockingQueue主要復雜在迭代,允許迭代中修改隊列(刪除元素時會更新迭代器),并不會拋出ConcurrentModificationException;好在大多數場景中我們不會迭代阻塞隊列。
2.2、SynchronousQueue
這是一個非常有意思的集合,更準確的說它并不是一個集合容器,因為它沒有容量。你可以“偷偷地”把它看作new ArrayBlockingQueue(0),之所以用”偷偷地”這么齷齪的詞,首先是因為ArrayBlockingQueue在capacity<1時會拋異常,其次ArrayBlockingQueue(0)并不能實現SynchronousQueue這么強大的功能。
正如SynchronousQueue的名字所描述一樣——“同步隊列”,它專門用于生產者線程與消費者線程之間的同步:
- 因為它任何時候都是空的,所以消費者線程調用take()方法的時候就會發生阻塞,直到有一個生產者線程生產了一個元素,消費者線程就可以拿到這個元素并返回。
- 同樣的,你也可以認為任何時候都是滿的,所以生產者線程調用put()方法的時候就會發生阻塞,直到有一個消費者線程消費了一個元素,生產者才會返回。
另外還有幾點需要注意:
- SynchronousQueue不能遍歷,因為它沒有元素可以遍歷;
- 所有的阻塞隊列都不允許插入null元素,因為當生產者生產了一個null的時候,消費者調用poll()返回null,無法判斷是生產者生產了一個null元素,還是隊列本身就是空。
CachedThreadPool使用的就是同步隊列:
Copy
| 1 2 3 4 5 | public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); } |
因為SynchronousQueue無容量的特性,所以CachedThreadPool不會對任務進行排隊,如果線程池中沒有空閑線程,CachedThreadPool會立即創建一個新線程來接收這個任務。
所以使用CachedThreadPool要注意避免提交長時間阻塞的任務,可能會由于線程數過多而導致內存溢出(OutOfOutOfMemoryError)。
2.3、LinkedBlockingQueue
這是一個由單鏈表實現的默認無界的阻塞隊列。LinkedBlockingQueue提供了一個可選有界的構造函數,而在未指明容量時,容量默認為Integer.MAX_VALUE。
按照官方文檔的說法LinkedBlockingQueue是一種可選有界(optionally-bounded)阻塞隊列。
SingleThreadPool和FixedThreadPool使用的就是LinkedBlockingQueue
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory)); } |
因為FixedThreadPool使用無界的LinkedBlockingQueue,所以當沒有線程空閑時,新提交的任務都會提交到阻塞隊列中,由于隊列永遠也不會滿,FixedThreadPool永遠也不會創建新的臨時線程。
但是需要注意的是,不要往FixedThreadPool提交過多的任務,因為所有未處理的任務都會到LinkedBlockingQueue中排隊,隊列中任務過多也可能會導致內存溢出。雖然這個過程會比較緩慢,因為隊列中的請求所占用的資源比線程占用的資源要少得多。
2.4、其他隊列
DelayQueue和PriorityBlockingQueue底層都是使用二叉堆實現的優先級阻塞隊列。
區別在于:
- 前者要求隊列中的元素實現Delayed接口,通過執行時延從隊列中提取任務,時間沒到任務取不出來;
- 后者對元素沒有要求,可以實現Comparable接口也可以提供Comparator來對隊列中的元素進行比較,跟時間沒有任何關系,僅僅是按照優先級取任務。
當我們提交的任務有優先順序時可以考慮選用這兩種隊列
事實上ScheduledThreadPoolExecutor內部實現了一個類似于DelayQueue的隊列。
除了這兩個,BlockingQueue還有兩個子接口BlockingDeque(雙端阻塞隊列),TransferQueue(傳輸隊列)
并且兩個接口都有自己唯一的實現類:
?
- LinkedBlockingDeque:使用雙向隊列實現的雙端阻塞隊列,雙端意味著可以像普通隊列一樣FIFO(先進先出),可以以像棧一樣FILO(先進后出)
- LinkedTransferQueue:它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的結合體,但是把它用在ThreadPoolExecutor中,和無限制的LinkedBlockingQueue行為一致。
?
3、讓生產者阻塞的線程池
前面說到CachedThreadPool和FixedThreadPool都有可能導致內存溢出,前者是由于線程數過多,后者是由于隊列任務過多。而究其根本就是因為任務生產速度遠大于線程池處理任務的速度。
所以有一個想法就是讓生產任務的線程在任務處理不過來的時候休息一會兒——也就是阻塞住任務生產者。
但是前面提到過ThreadPoolExecutor內部將任務提交到隊列時,使用的是不阻塞的offer方法。
我提供的第一種方式是:重寫offer方法把它變成阻塞式。
3.1、重寫BlockingQueue的offer
這種處理方式是將原來非阻塞的offer覆蓋,使用阻塞的put方法實現。
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Override public void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2) {@Override public boolean offer(Runnable runnable) {try {super.put(runnable); // 使用阻塞的put重寫offer方法} catch (InterruptedException e) {e.printStackTrace();}return true;}});threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}} |
執行的過程中會發現Task5要等到線程池中的一個任務執行完成后,才能提交成功。
這種方式把BlockingQueue的行為修改了,這時線程池的maximumPoolSize形同虛設,因為ThreadPoolExecutor調用offer入隊失敗返回false后才會創建臨時線程。現在offer改成了阻塞式的,實際上永遠是返回true,所以永遠都不會創建臨時線程,maximumPoolSize的限制也就沒有什么意義了。
3.2、重寫拒絕策略
在介紹第二種方式之前,先簡單介紹JDK中提供了四種拒絕策略:
?
- AbortPolicy——拋出RejectedExecutionException異常的方式拒絕任務。
- DiscardPolicy——什么都不干,靜默地丟棄任務
- DiscardOldestPolicy——把隊列中最老的任務拿出來扔掉
- CallerRunsPolicy——在任務提交的線程把任務給執行了
ThreadPoolExecutor默認使用AbortPolicy
DiscardPolicy和DiscardOldestPolicy兩種策略看上去都不怎么靠譜,除非真有這種特別的需求,比如客戶端應用中網絡請求擁堵(服務端宕機或網絡不通暢)的話可以選擇拋棄最老的請求,大多數情況還是使用默認的拒絕策略。
我們的第二種做法就是寫一個自己的RejectedExecutionHandler。這種方式相對“溫柔”一些,在線程池提交任務的最后一步——被線程池拒絕的任務,可以在拒絕后調用隊列的put()方法,讓任務的提交者阻塞,直到隊列中任務被被線程池執行后,隊列有了多余空間,調用方才返回。
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}private static class BlockCallerPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {executor.getQueue().put(r);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2), new BlockCallerPolicy());threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}} |
使用這種方式的好處是線程池仍可以設置maximumPoolSize,當任務入隊失敗仍可以創建臨時線程執行任務,只有當線程總數大于maximumPoolSize時,任務才會被拒絕。
4、Tomcat中的線程池
作為一個最常用的Java應用服務器之一,Tomcat中線程池還是值得我們借鑒學習的。
注意下面代碼來自Tomcat8.5.27,版本不同實現可能略有差異
org.apache.catelina.core.StandardThreadExecutor
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | public class StandardThreadExecutor extends LifecycleMBeanBaseimplements Executor, ResizableExecutor {// Tomcat線程池默認的配置protected int threadPriority = Thread.NORM_PRIORITY;protected boolean daemon = true;protected String namePrefix = "tomcat-exec-";protected int maxThreads = 200;protected int minSpareThreads = 25;protected int maxIdleTime = 60000;...protected boolean prestartminSpareThreads = false;protected int maxQueueSize = Integer.MAX_VALUE;protected void startInternal() throws LifecycleException {// 任務隊列:這里你看到的是一個無界隊列,但是隊列里面進行了特殊處理taskqueue = new TaskQueue(maxQueueSize);TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());// 創建線程池,這里的ThreadPoolExecutor是Tomcat繼承自JDK的ThreadPoolExecutorexecutor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), // 核心線程數與最大線程數maxIdleTime, TimeUnit.MILLISECONDS, // 默認6萬毫秒的超時時間,也就是一分鐘taskqueue, tf); // 玄機在任務隊列的設置executor.setThreadRenewalDelay(threadRenewalDelay);if (prestartminSpareThreads) {executor.prestartAllCoreThreads(); // 預熱所有的核心線程}taskqueue.setParent(executor);setState(LifecycleState.STARTING);}... } |
org.apache.tomcat.util.threads.ThreadPoolExecutor
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {private final AtomicInteger submittedCount = new AtomicInteger(0);private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);@Overrideprotected void afterExecute(Runnable r, Throwable t) {submittedCount.decrementAndGet(); // 執行完成后提交數量減一if (t == null) {// 如果有必要拋個異常讓線程終止stopCurrentThreadIfNeeded();}}@Overridepublic void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet(); // 提交時數量加一try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {// 如果任務被拒絕,則強制入隊if (!queue.force(command, timeout, unit)) {// 由于TaskQueue默認無界,所以默認強制入隊會成功submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet(); // 任務被拒絕,任務數減一throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet(); // 任務被拒絕,任務數減一throw rx;}}} } |
org.apache.tomcat.util.threads.TaskQueue
Copy
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public class TaskQueue extends LinkedBlockingQueue<Runnable> {private volatile ThreadPoolExecutor parent = null;public boolean force(Runnable o) {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");// 因為LinkedBlockingQueue無界,所以調用offer強制入隊return super.offer(o);}public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit);}@Overridepublic boolean offer(Runnable o) {// 不是上面Tomcat中定義地ThreadPoolExecutor,不做任何檢查if (parent==null) return super.offer(o);// 線程數達到最大線程數,嘗試入隊if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);// 提交的任務數小于線程數,也就是有空余線程,入隊讓空閑線程取任務if (parent.getSubmittedCount() < parent.getPoolSize()) return super.offer(o);// 走到這說明線程池沒有空閑線程// 這里返回false,改變了LinkedBlockingQueue默認的行為// 使得Tomcat可以創建臨時線程if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 到這里說明臨時線程也沒有空閑,只能排隊了return super.offer(o);} } |
Tomcat的線程池擴展了JDK線程池的功能,主要體現在兩點:
- Tomcat的ThreadPoolExecutor使用的TaskQueue,是無界的LinkedBlockingQueue,但是通過taskQueue的offer方法覆蓋了LinkedBlockingQueue的offer方法,改寫了規則,使得線程池能在任務較多的情況下增長線程池數量——JDK是先排隊再漲線程池,Tomcat則是先漲線程池再排隊。
- Tomcat的ThreadPoolExecutor改寫了execute方法,當任務被reject時,捕獲異常,并強制入隊。
參考鏈接:
支持生產阻塞的線程池 :http://ifeve.com/blocking-threadpool-executor/
Disruptor框架:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
線程池調整的重要性:https://blog.bramp.net/post/2015/12/17/the-importance-of-tuning-your-thread-pools/
線程池調整的重要性(譯):http://www.importnew.com/17633.html
SynchronousQueue與TransferQueue的區別:https://stackoverflow.com/questions/7317579/difference-between-blockingqueue-and-transferqueue/7317650
Tomcat配置線程池:https://tomcat.apache.org/tomcat-8.5-doc/config/executor.html
總結
以上是生活随笔為你收集整理的(转)ThreadPoolExecutor最佳实践--如何选择队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 魏蜀吴三国地图及实力对比( 魏蜀吴形成三
- 下一篇: (转)threadPoolExecuto