【多线程】线程池拒绝策略详解与自定义拒绝策略
線程池的拒絕策略
ThreadPoolExecutor內部有實現4個拒絕策略,默認為AbortPolicy策略
- CallerRunsPolicy:由調用execute方法提交任務的線程來執行這個任務
- AbortPolicy:拋出異常RejectedExecutionException拒絕提交任務
- DiscardPolicy:直接拋棄任務,不做任何處理
- DiscardOldestPolicy:去除任務隊列中的第一個任務,重新提交
線程池中,有三個重要的參數,決定影響了拒絕策略:corePoolSize - 核心線程數,也即最小的線程數。workQueue - 阻塞隊列 。 maximumPoolSize - 最大線程數
當提交任務數大于 corePoolSize 的時候,會優先將任務放到 workQueue 阻塞隊列中。當阻塞隊列飽和后,會擴充線程池中線程數,直到達到 maximumPoolSize 最大線程數配置。此時,再多余的任務,則會觸發線程池的拒絕策略了。
總結起來,也就是一句話,當提交的任務數大于(workQueue.size() + maximumPoolSize ),就會觸發線程池的拒絕策略。
拒絕策略的源碼
CallerRunsPolicy
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.* 用于拒絕任務的處理程序,* 可以直接在{@code execute}方法的調用線程中運行被拒絕的任務* 除非執行器已被關閉,否則將丟棄該任務。*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.* 創建一個{@code CallerRunsPolicy}。*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 除非執行器已關閉,否則在調用者線程中執行任務,* r 在這種情況下,該任務將被丟棄。** @param r the runnable task requested to be executed* r 請求執行的可運行任務* @param e the executor attempting to execute this task* e 嘗試執行此任務的執行者*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}分析:
CallerRunsPolicy:線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
這個策略顯然不想放棄執行任務。但是由于池中已經沒有任何資源了,那么就直接使用調用該execute的線程本身來執行。(開始我總不想丟棄任務的執行,但是對某些應用場景來講,很有可能造成當前線程也被阻塞。如果所有線程都是不能執行的,很可能導致程序沒法繼續跑了。需要視業務情景而定吧。)
這樣生產者雖然沒有被阻塞,但提交任務也會被暫停。
但這種策略也有隱患,當生產者較少時,生產者消費任務的時間里,消費者可能已經把任務都消費完了,隊列處于空狀態,當生產者執行完任務后才能再繼續生產任務,這個過程中可能導致消費者線程的饑餓。
AbortPolicy
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.* 拋出{@code RejectedExecutionException}的拒絕任務處理程序。*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 總是拋出RejectedExecutionException* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}分析:
該策略是默認飽和策略。
使用該策略時在飽和時會拋出RejectedExecutionException(繼承自RuntimeException),調用者可捕獲該異常自行處理。
DiscardPolicy
/*** A handler for rejected tasks that silently discards the* rejected task.* 拒絕任務的處理程序,默認丟棄拒絕任務。*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不執行任何操作,這具有丟棄任務 r 的作用。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}分析:
如代碼所示,不做任何處理直接拋棄任務
DiscardOldestPolicy
/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.* 處理被拒絕任務的處理程序,它丟棄最舊的未處理請求,* 然后重試{@code execute},* 除非執行器*被關閉,在這種情況下,該任務將被丟棄。*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.* 獲取并忽略執行者*會立即執行的下一個任務(如果一個任務立即可用),* 然后重試任務r的執行,除非執行者*被關閉,在這種情況下,任務r會被丟棄。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}分析:
如代碼,先將阻塞隊列中的頭元素出隊拋棄,再嘗試提交任務。如果此時阻塞隊列使用PriorityBlockingQueue優先級隊列,將會導致優先級最高的任務被拋棄,因此不建議將該種策略配合優先級隊列使用。
自定義策略
看完發現默認的幾個拒絕策略并不是特別的友好,那么可不可以咱們自己搞個呢?
可以發現,所有的拒絕策略都是實現了 RejectedExecutionHandler 接口
public interface RejectedExecutionHandler {/*** Method that may be invoked by a {@link ThreadPoolExecutor} when* {@link ThreadPoolExecutor#execute execute} cannot accept a* task. This may occur when no more threads or queue slots are* available because their bounds would be exceeded, or upon* shutdown of the Executor.** <p>In the absence of other alternatives, the method may throw* an unchecked {@link RejectedExecutionException}, which will be* propagated to the caller of {@code execute}.** @param r the runnable task requested to be executed* @param executor the executor attempting to execute this task* @throws RejectedExecutionException if there is no remedy*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }這個接口只有一個 rejectedExecution 方法。
r 為待執行任務;executor 為線程池;方法可能會拋出拒絕異常。
那么咱們就可以通過實現 RejectedExecutionHandler 接口擴展
兩個栗子:一
netty自己實現的線程池里面私有的一個拒絕策略。單獨啟動一個新的臨時線程來執行任務。
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}}兩個栗子:二
dubbo的一個例子,它直接繼承的 AbortPolicy ,加強了日志輸出,并且輸出dump文件
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);} }自己玩
參考類似的思路,最簡單的做法,我們可以直接定義一個RejectedExecutionHandler,當隊列滿時改為調用BlockingQueue.put來實現生產者的阻塞:
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};這樣,我們就無需再關心Queue和Consumer的邏輯,只要把精力集中在生產者和消費者線程的實現邏輯上,只管往線程池提交任務就行了。
相比最初的設計,這種方式的代碼量能減少不少,而且能避免并發環境的很多問題。當然,你也可以采用另外的手段,例如在提交時采用信號量做入口限制等,但是如果僅僅是要讓生產者阻塞,那就顯得復雜了。
總結
四種線程池拒絕策略,具體使用哪種策略,還得根據實際業務場景才能做出抉擇。
總結
以上是生活随笔為你收集整理的【多线程】线程池拒绝策略详解与自定义拒绝策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【多线程】ThreadPoolExecu
- 下一篇: 【MaxCompute】学习笔记基础说明