Java技巧:创建监视友好的ExecutorService
在本文中,我們將擴(kuò)展具有監(jiān)視功能的ExecutorService實(shí)現(xiàn)。 這種監(jiān)視功能將幫助我們?cè)趯?shí)時(shí)生產(chǎn)環(huán)境中測(cè)量多個(gè)池參數(shù),即活動(dòng)線程,工作隊(duì)列大小等。 它還將使我們能夠衡量任務(wù)執(zhí)行時(shí)間,成功任務(wù)計(jì)數(shù)和失敗任務(wù)計(jì)數(shù)。
監(jiān)控庫(kù)
至于監(jiān)控庫(kù),我們將使用Metrics 。 為了簡(jiǎn)單起見,我們將使用ConsoleReporter ,它將向控制臺(tái)報(bào)告指標(biāo)。 對(duì)于生產(chǎn)級(jí)應(yīng)用程序,我們應(yīng)該使用高級(jí)報(bào)告器(即Graphite報(bào)告器)。 如果您不熟悉指標(biāo),那么建議您閱讀入門指南 。
讓我們開始吧。
擴(kuò)展ThreadPoolExecutor
我們將使用ThreadPoolExecutor作為新類型的基類。 我們將其稱為MonitoredThreadPoolExecutor 。 此類將接受MetricRegistry作為其構(gòu)造函數(shù)參數(shù)之一–
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);this.metricRegistry = metricRegistry;}public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,MetricRegistry metricRegistry) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.metricRegistry = metricRegistry;} }注冊(cè)儀表以測(cè)量特定于池的參數(shù)
量表是一個(gè)值的瞬時(shí)量度。 我們將使用它來(lái)測(cè)量不同的池參數(shù),例如活動(dòng)線程數(shù),任務(wù)隊(duì)列大小等。
在注冊(cè)儀表之前,我們需要確定如何為線程池計(jì)算指標(biāo)名稱。 每個(gè)度量標(biāo)準(zhǔn),無(wú)論是儀表,計(jì)時(shí)器還是儀表,都有一個(gè)唯一的名稱。 此名稱用于標(biāo)識(shí)度量標(biāo)準(zhǔn)來(lái)源。 此處的約定是使用點(diǎn)分字符串,該點(diǎn)分字符串通常由要監(jiān)視的類的完全限定名稱構(gòu)成。
對(duì)于我們的線程池,我們將使用其完全限定名稱作為指標(biāo)名稱的前綴。 另外,我們將添加另一個(gè)名為
poolName,客戶端將使用它來(lái)指定特定于實(shí)例的標(biāo)識(shí)符。
實(shí)施這些更改后,該類如下所示–
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);}// Rest of the constructors }現(xiàn)在我們準(zhǔn)備注冊(cè)我們的儀表。 為此,我們將定義一個(gè)私有方法–
private void registerGauges() {metricRegistry.register(MetricRegistry.name(metricsPrefix, "corePoolSize"), (Gauge<Integer>) this::getCorePoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "activeThreads"), (Gauge<Integer>) this::getActiveCount);metricRegistry.register(MetricRegistry.name(metricsPrefix, "maxPoolSize"), (Gauge<Integer>) this::getMaximumPoolSize);metricRegistry.register(MetricRegistry.name(metricsPrefix, "queueSize"), (Gauge<Integer>) () -> getQueue().size()); }對(duì)于我們的示例,我們正在測(cè)量核心池大小,活動(dòng)線程數(shù),最大池大小和任務(wù)隊(duì)列大小。 根據(jù)監(jiān)視要求,我們可以注冊(cè)更多/更少的量規(guī)來(lái)測(cè)量不同的屬性。
現(xiàn)在,所有構(gòu)造函數(shù)都將調(diào)用此私有方法–
public MonitoredThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,MetricRegistry metricRegistry,String poolName ) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);this.metricRegistry = metricRegistry;this.metricsPrefix = MetricRegistry.name(getClass(), poolName);registerGauges(); }測(cè)量任務(wù)執(zhí)行時(shí)間
為了衡量任務(wù)執(zhí)行時(shí)間,我們將覆蓋ThreadPoolExecutor提供的兩個(gè)生命周期方法– beforeExecute和afterExecute 。
顧名思義,執(zhí)行任務(wù)之前,將由執(zhí)行任務(wù)的線程調(diào)用beforeExecute回調(diào)。 此回調(diào)的默認(rèn)實(shí)現(xiàn)不執(zhí)行任何操作。
同樣,在執(zhí)行每個(gè)任務(wù)之后,執(zhí)行任務(wù)的線程將調(diào)用afterExecute回調(diào)。 此回調(diào)的默認(rèn)實(shí)現(xiàn)也不執(zhí)行任何操作。 即使任務(wù)拋出未捕獲的RuntimeException或Error ,也會(huì)調(diào)用此回調(diào)。
我們將在beforeExecute覆蓋中啟動(dòng)一個(gè)Timer ,然后將其用于afterExecute覆蓋中以獲取總的任務(wù)執(zhí)行時(shí)間。 為了存儲(chǔ)對(duì)Timer的引用,我們將在類中引入一個(gè)新的ThreadLocal字段。
回調(diào)的實(shí)現(xiàn)如下:
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {private final MetricRegistry metricRegistry;private final String metricsPrefix;private ThreadLocal<Timer.Context> taskExecutionTimer = new ThreadLocal<>();// Constructors@Overrideprotected void beforeExecute(Thread thread, Runnable task) {super.beforeExecute(thread, task);Timer timer = metricRegistry.timer(MetricRegistry.name(metricsPrefix, "task-execution"));taskExecutionTimer.set(timer.time());}@Overrideprotected void afterExecute(Runnable task, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(task, throwable);} }記錄由于未捕獲的異常而導(dǎo)致的失敗任務(wù)數(shù)
afterExecute回調(diào)的第二個(gè)參數(shù)是Throwable 。 如果非null,則此Throwable引用導(dǎo)致執(zhí)行終止的未捕獲RuntimeException或Error 。 我們可以使用此信息來(lái)部分計(jì)算由于未捕獲的異常而突然終止的任務(wù)總數(shù)。
要獲得失敗任務(wù)的總數(shù),我們必須考慮另一種情況。 使用execute方法提交的任務(wù)將拋出任何未捕獲的異常,并且它將用作afterExecute回調(diào)的第二個(gè)參數(shù)。 但是,執(zhí)行者服務(wù)會(huì)吞下使用Submit方法提交的任務(wù)。 JavaDoc (重點(diǎn)是我的話)中對(duì)此做了清楚的解釋-
注意:如果將動(dòng)作顯式地或通過(guò)諸如Submit之類的方法包含在任務(wù)(例如FutureTask)中,則這些任務(wù)對(duì)象會(huì)捕獲并維護(hù)計(jì)算異常,因此它們不會(huì)導(dǎo)致突然終止,并且內(nèi)部異常不會(huì)傳遞給此方法。 。 如果您想使用此方法捕獲兩種類型的失敗,則可以進(jìn)一步探查此類情況,例如,在此示例子類中,如果任務(wù)被中止,則打印直接原因或潛在異常。 幸運(yùn)的是,同一文檔還為此提供了一種解決方案,即檢查可運(yùn)行對(duì)象以查看其是否為Future ,然后獲取基礎(chǔ)異常。
結(jié)合這些方法,我們可以如下修改afterExecute方法–
@Override protected void afterExecute(Runnable runnable, Throwable throwable) {Timer.Context context = taskExecutionTimer.get();context.stop();super.afterExecute(runnable, throwable);if (throwable == null && runnable instanceof Future && ((Future) runnable).isDone()) {try {((Future) runnable).get();} catch (CancellationException ce) {throwable = ce;} catch (ExecutionException ee) {throwable = ee.getCause();} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();} }計(jì)算成功任務(wù)的總數(shù)
先前的方法也可以用于計(jì)算成功任務(wù)的總數(shù):完成的任務(wù)不會(huì)拋出任何異?;蝈e(cuò)誤–
@Override protected void afterExecute(Runnable runnable, Throwable throwable) {// Rest of the method body .....if (throwable != null) {Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));failedTasksCounter.inc();} else {Counter successfulTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "successful-tasks"));successfulTasksCounter.inc();} }結(jié)論
在本文中,我們研究了對(duì)ExecutorService實(shí)現(xiàn)的一些監(jiān)視友好的自定義。 像往常一樣,任何建議/改進(jìn)/錯(cuò)誤修復(fù)將不勝感激。 至于示例源代碼,它已上傳到
Github 。
翻譯自: https://www.javacodegeeks.com/2018/05/java-tips-creating-a-monitoring-friendly-executorservice.html
總結(jié)
以上是生活随笔為你收集整理的Java技巧:创建监视友好的ExecutorService的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 联想s700一体机拆机(联想s700一体
- 下一篇: java servlet_Java Se