日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊ExecutorService的监控

發布時間:2025/3/15 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊ExecutorService的监控 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

為什么80%的碼農都做不了架構師?>>> ??

本文主要研究一下ExecutorService的監控

InstrumentedExecutorService

metrics-core-4.0.2-sources.jar!/com/codahale/metrics/InstrumentedExecutorService.java

/*** An {@link ExecutorService} that monitors the number of tasks submitted, running,* completed and also keeps a {@link Timer} for the task duration.* <p/>* It will register the metrics using the given (or auto-generated) name as classifier, e.g:* "your-executor-service.submitted", "your-executor-service.running", etc.*/ public class InstrumentedExecutorService implements ExecutorService {private static final AtomicLong NAME_COUNTER = new AtomicLong();private final ExecutorService delegate;private final Meter submitted;private final Counter running;private final Meter completed;private final Timer idle;private final Timer duration;/*** Wraps an {@link ExecutorService} uses an auto-generated default name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());}/*** Wraps an {@link ExecutorService} with an explicit name.** @param delegate {@link ExecutorService} to wrap.* @param registry {@link MetricRegistry} that will contain the metrics.* @param name name for this executor service.*/public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {this.delegate = delegate;this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));this.running = registry.counter(MetricRegistry.name(name, "running"));this.completed = registry.meter(MetricRegistry.name(name, "completed"));this.idle = registry.timer(MetricRegistry.name(name, "idle"));this.duration = registry.timer(MetricRegistry.name(name, "duration"));}@Overridepublic void execute(Runnable runnable) {submitted.mark();delegate.execute(new InstrumentedRunnable(runnable));}@Overridepublic Future<?> submit(Runnable runnable) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable));}@Overridepublic <T> Future<T> submit(Runnable runnable, T result) {submitted.mark();return delegate.submit(new InstrumentedRunnable(runnable), result);}@Overridepublic <T> Future<T> submit(Callable<T> task) {submitted.mark();return delegate.submit(new InstrumentedCallable<>(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented);}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAll(instrumented, timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws ExecutionException, InterruptedException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {submitted.mark(tasks.size());Collection<? extends Callable<T>> instrumented = instrument(tasks);return delegate.invokeAny(instrumented, timeout, unit);}private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Callable<T>> tasks) {final List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());for (Callable<T> task : tasks) {instrumented.add(new InstrumentedCallable<>(task));}return instrumented;}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {return delegate.awaitTermination(l, timeUnit);}//...... }
  • InstrumentedExecutorService實現了ExecutorService,對jdk原始的ExecutorService進行了包裝,對相應的方法織入指標統計
  • 主要統計了已提交的任務submitted(Meter),運行中的任務running(Counter),完成的任務completed(Meter),空閑時長idle(Timer),運行時長duration(Timer)
  • 為了統計后面幾個指標,需要對Runnable以及Callable進行織入,因而引入了InstrumentedRunnable、InstrumentedCallable

InstrumentedRunnable

private class InstrumentedRunnable implements Runnable {private final Runnable task;private final Timer.Context idleContext;InstrumentedRunnable(Runnable task) {this.task = task;this.idleContext = idle.time();}@Overridepublic void run() {idleContext.stop();running.inc();final Timer.Context durationContext = duration.time();try {task.run();} finally {durationContext.stop();running.dec();completed.mark();}}}
  • 織入了對idle、duration、running、completed的統計

InstrumentedCallable

private class InstrumentedCallable<T> implements Callable<T> {private final Callable<T> callable;InstrumentedCallable(Callable<T> callable) {this.callable = callable;}@Overridepublic T call() throws Exception {running.inc();final Timer.Context context = duration.time();try {return callable.call();} finally {context.stop();running.dec();completed.mark();}}}
  • 織入了對duration、running、completed的統計

ExecutorServiceMetrics

micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

/*** Monitors the status of executor service pools. Does not record timings on operations executed in the {@link ExecutorService},* as this requires the instance to be wrapped. Timings are provided separately by wrapping the executor service* with {@link TimedExecutorService}.** @author Jon Schneider* @author Clint Checketts*/ @NonNullApi @NonNullFields public class ExecutorServiceMetrics implements MeterBinder {@Nullableprivate final ExecutorService executorService;private final Iterable<Tag> tags;public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName, Iterable<Tag> tags) {this.executorService = executorService;this.tags = Tags.concat(tags, "name", executorServiceName);}//....../*** Record metrics on the use of an {@link Executor}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static Executor monitor(MeterRegistry registry, Executor executor, String executorName, Iterable<Tag> tags) {if (executor instanceof ExecutorService) {return monitor(registry, (ExecutorService) executor, executorName, tags);}return new TimedExecutor(registry, executor, executorName, tags);}/*** Record metrics on the use of an {@link ExecutorService}.** @param registry The registry to bind metrics to.* @param executor The executor to instrument.* @param executorServiceName Will be used to tag metrics with "name".* @param tags Tags to apply to all recorded metrics.* @return The instrumented executor, proxied.*/public static ExecutorService monitor(MeterRegistry registry, ExecutorService executor, String executorServiceName, Iterable<Tag> tags) {new ExecutorServiceMetrics(executor, executorServiceName, tags).bindTo(registry);return new TimedExecutorService(registry, executor, executorServiceName, tags);}@Overridepublic void bindTo(MeterRegistry registry) {if (executorService == null) {return;}String className = executorService.getClass().getName();if (executorService instanceof ThreadPoolExecutor) {monitor(registry, (ThreadPoolExecutor) executorService);} else if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));} else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));} else if (executorService instanceof ForkJoinPool) {monitor(registry, (ForkJoinPool) executorService);}}private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {if (tp == null) {return;}FunctionCounter.builder("executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount).tags(tags).description("The approximate total number of tasks that have completed execution").baseUnit("tasks").register(registry);Gauge.builder("executor.active", tp, ThreadPoolExecutor::getActiveCount).tags(tags).description("The approximate number of threads that are actively executing tasks").baseUnit("threads").register(registry);Gauge.builder("executor.queued", tp, tpRef -> tpRef.getQueue().size()).tags(tags).description("The approximate number of threads that are queued for execution").baseUnit("threads").register(registry);Gauge.builder("executor.pool.size", tp, ThreadPoolExecutor::getPoolSize).tags(tags).description("The current number of threads in the pool").baseUnit("threads").register(registry);}//...... }
  • ExecutorServiceMetrics實現了MeterBinder接口,另外提供了靜態方法來創建帶有監控指標的ExecutorService,該靜態方法命名為monitor,非常形象
  • monitor方法首先創建ExecutorServiceMetrics,并bindTo了MeterRegistry,然后返回TimedExecutorService
  • bindTo方法上報了executor.completed(FunctionCounter),executor.active(Gauge),executor.queued(Gauge),executor.pool.size(Gauge)這幾個指標

TimedExecutorService

micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/internal/TimedExecutorService.java

/*** An {@link java.util.concurrent.ExecutorService} that is timed** @author Jon Schneider*/ public class TimedExecutorService implements ExecutorService {private final ExecutorService delegate;private final Timer timer;public TimedExecutorService(MeterRegistry registry, ExecutorService delegate, String executorServiceName, Iterable<Tag> tags) {this.delegate = delegate;this.timer = registry.timer("executor", Tags.concat(tags ,"name", executorServiceName));}@Overridepublic void shutdown() {delegate.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return delegate.shutdownNow();}@Overridepublic boolean isShutdown() {return delegate.isShutdown();}@Overridepublic boolean isTerminated() {return delegate.isTerminated();}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return delegate.awaitTermination(timeout, unit);}@Overridepublic <T> Future<T> submit(Callable<T> task) {return delegate.submit(timer.wrap(task));}@Overridepublic <T> Future<T> submit(Runnable task, T result) {return delegate.submit(() -> timer.record(task), result);}@Overridepublic Future<?> submit(Runnable task) {return delegate.submit(() -> timer.record(task));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks));}@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {return delegate.invokeAll(wrapAll(tasks), timeout, unit);}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {return delegate.invokeAny(wrapAll(tasks));}@Overridepublic <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return delegate.invokeAny(wrapAll(tasks), timeout, unit);}@Overridepublic void execute(Runnable command) {delegate.execute(timer.wrap(command));}private <T> Collection<? extends Callable<T>> wrapAll(Collection<? extends Callable<T>> tasks) {return tasks.stream().map(timer::wrap).collect(toList());} }
  • 對ExecutorService進行包裝,增加了

Timer.record

micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/Timer.java

/*** Executes the runnable `f` and records the time taken.** @param f Function to execute and measure the execution time.*/void record(Runnable f);/*** Wrap a {@link Runnable} so that it is timed when invoked.** @param f The Runnable to time when it is invoked.* @return The wrapped Runnable.*/default Runnable wrap(Runnable f) {return () -> record(f);}/*** Wrap a {@link Callable} so that it is timed when invoked.** @param f The Callable to time when it is invoked.* @param <T> The return type of the callable.* @return The wrapped callable.*/default <T> Callable<T> wrap(Callable<T> f) {return () -> recordCallable(f);}
  • warp方法主要是包裝調用record方法,而record由實現類去實現

AbstractTimer

micrometer-core-1.0.3-sources.jar!/io/micrometer/core/instrument/AbstractTimer.java

@Overridepublic void record(Runnable f) {final long s = clock.monotonicTime();try {f.run();} finally {final long e = clock.monotonicTime();record(e - s, TimeUnit.NANOSECONDS);}}@Overridepublic final void record(long amount, TimeUnit unit) {if (amount >= 0) {histogram.recordLong(TimeUnit.NANOSECONDS.convert(amount, unit));recordNonNegative(amount, unit);if (intervalEstimator != null) {intervalEstimator.recordInterval(clock.monotonicTime());}}}
  • record采用histogram進行統計

小結

dropwizard及micrometer均提供了對ExecutorService的指標統計的包裝,micrometer則更近一步提供了靜態方法來直接創建,非常方便。

doc

  • InstrumentedExecutorService
  • ExecutorServiceMetrics

轉載于:https://my.oschina.net/go4it/blog/2223506

總結

以上是生活随笔為你收集整理的聊聊ExecutorService的监控的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。