可动态调节参数的线程池实现
背景
線程池是一種基于池化思想管理線程的工具,使用線程池可以減少創(chuàng)建銷毀線程的開銷,避免線程過(guò)多導(dǎo)致系統(tǒng)資源耗盡。在高并發(fā)的任務(wù)處理場(chǎng)景,線程池的使用是必不可少的。在雙11主圖價(jià)格表達(dá)項(xiàng)目中為了提升處理性能,很多地方使用到了線程池。隨著線程池的使用,逐漸發(fā)現(xiàn)一個(gè)問(wèn)題,線程池的參數(shù)如何設(shè)置?
線程池參數(shù)中有三個(gè)比較關(guān)鍵的參數(shù),分別是corePoolSize(核心線程數(shù))、maximumPoolSize(最大線程數(shù))、workQueueSzie(工作隊(duì)列大小)。根據(jù)任務(wù)的類型可以區(qū)分為IO密集型和CPU密集型,對(duì)于CPU密集型,一般經(jīng)驗(yàn)是設(shè)置corePoolSize=CPU核數(shù)+1,對(duì)于IO密集型需要根據(jù)具體的RT和流量來(lái)設(shè)置,沒有普適的經(jīng)驗(yàn)值。然而,我們一般遇到的情況多數(shù)是處理IO密集型任務(wù),如果線程池參數(shù)不可動(dòng)態(tài)調(diào)節(jié),就沒辦法根據(jù)實(shí)際情況實(shí)時(shí)調(diào)整處理速度,只能通過(guò)發(fā)布代碼調(diào)整參數(shù)。
如果線程池參數(shù)不合理會(huì)導(dǎo)致什么問(wèn)題呢?下面列舉幾種可能出現(xiàn)的場(chǎng)景:
最大線程數(shù)設(shè)置偏小,工作隊(duì)列大小設(shè)置偏小,導(dǎo)致服務(wù)接口大量拋出RejectedExecutionException。
最大線程數(shù)設(shè)置偏小,工作隊(duì)列大小設(shè)置過(guò)大,任務(wù)堆積過(guò)度,接口響應(yīng)時(shí)長(zhǎng)變長(zhǎng)。
最大線程數(shù)設(shè)置過(guò)大,線程調(diào)度開銷增大,處理速度反而下降。
核心線程數(shù)設(shè)置過(guò)小,流量突增時(shí)需要先創(chuàng)建線程,導(dǎo)致響應(yīng)時(shí)長(zhǎng)過(guò)大。
核心線程數(shù)設(shè)置過(guò)大,空閑線程太多,占用系統(tǒng)資源。
線程池任務(wù)調(diào)度機(jī)制
要明白線程池參數(shù)對(duì)運(yùn)行時(shí)的影響,就必須理解其中的原理,所以下面先簡(jiǎn)單總結(jié)了線程池的核心原理。
Java中的線程池核心實(shí)現(xiàn)類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護(hù)自身的生命周期,另一方面同時(shí)管理線程和任務(wù),使兩者良好的結(jié)合從而執(zhí)行并行任務(wù)。用戶無(wú)需關(guān)注如何創(chuàng)建線程,如何調(diào)度線程來(lái)執(zhí)行任務(wù),用戶只需提供Runnable對(duì)象,將任務(wù)的運(yùn)行邏輯提交到執(zhí)行器(Executor)中,由Executor框架完成線程的調(diào)配和任務(wù)的執(zhí)行部分。
ThreadPoolExecutor是如何運(yùn)行,如何同時(shí)維護(hù)線程和執(zhí)行任務(wù)的呢?其運(yùn)行機(jī)制如下圖所示:
所有任務(wù)的調(diào)度都是由execute方法完成的,這部分完成的工作是:檢查現(xiàn)在線程池的運(yùn)行狀態(tài)、運(yùn)行線程數(shù)、運(yùn)行策略,決定接下來(lái)執(zhí)行的流程,是直接申請(qǐng)線程執(zhí)行,或是緩沖到隊(duì)列中執(zhí)行,亦或是直接拒絕該任務(wù)。其執(zhí)行過(guò)程如下:
首先檢測(cè)線程池運(yùn)行狀態(tài),如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務(wù)。
如果workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)。
如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)。
如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。
其執(zhí)行流程如下圖所示:
動(dòng)態(tài)調(diào)節(jié)線程池參數(shù)實(shí)現(xiàn)
線程池相關(guān)的重要參數(shù)有三個(gè),分別是核心線程數(shù)、最大線程數(shù)和工作隊(duì)列大小,接下來(lái)將闡述如何實(shí)現(xiàn)動(dòng)態(tài)調(diào)節(jié)線程池參數(shù)。
調(diào)節(jié)核心和最大線程數(shù)的原理
ThreadPoolExecutor已經(jīng)提供了兩個(gè)方法在運(yùn)行時(shí)設(shè)置核心線程數(shù)和最大線程數(shù),分別是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。
setCorePoolSize方法的執(zhí)行流程是:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的corePoolSize,然后,如果新的值比原始值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀,如果新的值比原來(lái)的值要大且工作隊(duì)列不為空,則會(huì)創(chuàng)建新的工作線程。流程圖如下:
setMaximumPoolSize方法執(zhí)行流程是:首先會(huì)覆蓋之前構(gòu)造函數(shù)設(shè)置的maximumPoolSize,然后,如果新的值比原來(lái)的值要小,當(dāng)多余的工作線程下次變成空閑狀態(tài)的時(shí)候會(huì)被中斷并銷毀。
調(diào)節(jié)工作隊(duì)列大小的原理
線程池中是以生產(chǎn)者消費(fèi)者模式,通過(guò)一個(gè)阻塞隊(duì)列來(lái)緩存任務(wù),工作線程從阻塞隊(duì)列中獲取任務(wù)。工作隊(duì)列的接口是阻塞隊(duì)列(BlockingQueue),在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?#xff0c;當(dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
目前JDK提供了以下阻塞隊(duì)列的實(shí)現(xiàn):
但是很不幸,這些阻塞隊(duì)列的實(shí)現(xiàn)都不支持動(dòng)態(tài)調(diào)整大小,那么為什么不自己實(shí)現(xiàn)一個(gè)可動(dòng)態(tài)調(diào)整大小的阻塞隊(duì)列呢。重復(fù)造輪子是不可取的,所以我選擇改造輪子。LinkedBlockingQueue是比較常用的一個(gè)阻塞隊(duì)列,它無(wú)法修改大小的原因是capacity字段設(shè)置成了final?private final int capacity;。如果我把final去掉,并提供修改capacity的方法,是不是就滿足我們的需求呢?事實(shí)證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實(shí)現(xiàn)。
結(jié)合Diamond進(jìn)行實(shí)現(xiàn)
Diamond可以管理我們的配置,如果可以通過(guò)Diamond實(shí)現(xiàn)線程池參數(shù)管理那就再好不過(guò)了。接下來(lái)就開始上代碼了,首先實(shí)現(xiàn)一個(gè)Diamond配置管理類DispatchConfig,然后,實(shí)現(xiàn)一個(gè)線程池管理的工廠方法StreamExecutorFactory。
DispatchConfig類是一個(gè)靜態(tài)類,在初始化的時(shí)候獲取了對(duì)應(yīng)Diamond的內(nèi)容并設(shè)置了監(jiān)聽,使用的時(shí)候只需要DispatchConfig.getConfig().getCorePoolSize()。
/**
-
@author moda
*/
@Slf4j
@Data
public class DispatchConfig {
public static final String DATA_ID = “com.alibaba.mkt.turbo.DispatchConfig”;
public static final String GROUP_ID = “mkt-turbo”;
private static DispatchConfig config;static {
try {
String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
config = JSON.parseObject(content, DispatchConfig.class);
Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
config = JSON.parseObject(content, DispatchConfig.class);
} catch (Throwable t) {
log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
}
}
});
} catch (Exception e) {
log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
}
}public static DispatchConfig getConfig() {
return config;
}private int corePoolSize = 10;
private int maximumPoolSize = 30;
private int workQueueSize = 1024;
/**
- 商品分批處理每批大小
*/
private int itemBatchProcessPageSize = 200;
}
StreamExecutorFactory是一個(gè)靜態(tài)類,維護(hù)了一個(gè)靜態(tài)屬性executor,并通過(guò)initExecutor()進(jìn)行初始化。在初始化的時(shí)候,工作隊(duì)列使用了可調(diào)節(jié)大小的阻塞隊(duì)列ResizeLinkedBlockingQueue,并設(shè)置了監(jiān)聽Diamond變更。Diamond發(fā)生變更的時(shí)候通過(guò)在callback中對(duì)比值是否發(fā)生改變,如果發(fā)生改變則調(diào)整workQueueSize、corePoolSize、maximumPoolSize。使用的時(shí)候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能動(dòng)態(tài)修改線程池參數(shù)。
- 商品分批處理每批大小
/**
-
@author moda
*/
@Slf4j
public class StreamExecutorFactory {
private static final String THREAD_NAME = “mkt-turbo_stream_dispatch”;private static ThreadPoolExecutor executor = initExecutor();
private static ThreadPoolExecutor initExecutor() {
Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {@Overridepublic void receiveConfigInfo(String content) {try {DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);if (workQueue.getCapacity() != config.getWorkQueueSize()) {workQueue.setCapacity(config.getWorkQueueSize());}if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());}if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());}} catch (Throwable t) {log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);}}});return threadPoolExecutor;
ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
ResizeLinkedBlockingQueue workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
//拒絕策略,調(diào)用者線程處理
RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
r.run();
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
DispatchConfig.getConfig().getCorePoolSize(),
DispatchConfig.getConfig().getMaximumPoolSize(),
10,
TimeUnit.SECONDS,
workQueue,
nameThreadFactory,
rejectedExecutionHandler
);}
public static Executor getExecutor() {
return executor;
}
}
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的可动态调节参数的线程池实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 实时计算 Flink 版 最佳实践
- 下一篇: 消息队列RocketMQ性能测试案例