线程池监控方案
5ycode
某信貸cto,專注于java技術研究與應用,包括JVM、DDD、軟件設計、源碼閱讀、以及經驗分享
9篇原創內容
公眾號
讀了Java線程池實現原理及其在美團業務中的實踐 后,我就想一個問題,如果讓我去做這個線程池的監控,我該怎么做?
要對線程池進行監控,首先得明白,我們監控線程池的目的是什么?
監控是為了防患于未然,防止生產事故的發生。或者能在未發生時就進行入狀態。
出問題線程池的現象:
-
線程池異步處理,消費速度過慢,導致任務積壓,響應過慢,或者隊列有限,導致提交被拒絕;
-
使用線程池做并行請求的時候,請求量過大,處理積壓,導致響應變慢;
-
業務評估不準確,導致線程池資源設置的合理;
對線程池監控的指標有以下幾種:
1,隊列飽和度;
2,單位時間內提交任務的速度遠大于消費速度;
監控方案:
方案一:繼承ThreadPoolExecutor對部分方法進行重寫
/\*\*\* 創建可監控的線程池\* @author yxkong\* @version 1.0\* @date 2021/3/22 13:29\*/ public class ThreadPoolExecutorMonitor extends ThreadPoolExecutor {public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public ThreadPoolExecutorMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void shutdown() {//獲取執行任務this.getCompletedTaskCount();//獲取正在運行的線程數this.getActiveCount();//獲取任務數this.getTaskCount();//隊列剩余個數this.getQueue().size();super.shutdown();}@Overridepublic List<Runnable> shutdownNow() {return super.shutdownNow();}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);if (t == null && r instanceof Future<?>) {try {//獲取線程執行結果Object result = ((Future<?>) r).get();} catch (CancellationException ce) {t = ce;} catch (ExecutionException ee) {t = ee.getCause();} catch (InterruptedException ie) {Thread.currentThread().interrupt(); // ignore/reset}}if (t != null) {//處理異常System.out.println(t);}//記錄線程執行時間} }方案二:自定義ThreadFactory、BlockingQueue、RejectedExecutionHandler
- ThreadFactory:是了為了線程的命名,方便統一管理;
- BlockingQueue:是為能動態調整隊列的長度(數組擴縮容時,需要考慮鎖以及性能,鏈表不用考慮)
- RejectedExecutionHandler: 隊列滿了如何處理(可以動態擴容,小心把jvm撐爆,或者無法創建隊列)
自定義線程池
/\*\*\* 自定義業務線程池\* @return\*/@Bean("bizThreadPool")public ThreadPoolExecutor bizThreadPool(){return new ThreadPoolExecutor(5,10,200,TimeUnit.SECONDS,new LinkedBlockingQueue<>(10),new NamedThreadFactory("bizThreadPool"));}/\*\*\* 自定義log線程池\* @return\*/@Bean("logThreadPool")public ThreadPoolExecutor logThreadPool(){return new ThreadPoolExecutor(5,10,200,TimeUnit.SECONDS,new CustomLinkedBlockingQueue<>(10),new NamedThreadFactory("bizThreadPool"));針對線程池的監控以及動態調整
@RestController @RequestMapping("/threadpool") @Slf4j public class ThreadPoolController {/\*\*\* 收集所有的線程池,線程池建議自己手動實現,不要用spring默認的\* 這里是偷懶了,用了spring的特性,如果是java項目,實現后自己注冊\*/@Autowiredpublic Map<String, ThreadPoolExecutor> map;/\*\*\* 獲取所有的線程池\* @return\*/@GetMapping("/list")public ResultBean<Map<String,ThreadPoolExecutor>> list(){return ResultBeanUtil.success("獲取所有線程池成功!",map);}@GetMapping("/get")public ResultBean<ThreadPoolExecutor> getThreadPool(String threadPool){ThreadPoolExecutor executor = map.get(threadPool);if(executor == null){return ResultBeanUtil.noData("未找到對應的線程池");}return ResultBeanUtil.success("獲取線程池成功!",executor);}@PostMapping("/modify")public ResultBean<ThreadPoolExecutor> modifyThreadPool(String threadPool,Integer coreSize,Integer maximumPoolSize,Integer capacity){ThreadPoolExecutor executor = map.get(threadPool);if(executor == null){return ResultBeanUtil.noData("未找到對應的線程池");}executor.setCorePoolSize(coreSize);executor.setMaximumPoolSize(maximumPoolSize);//啟動所有的核心線程數,getTask中不會根據核心線程數修改workers,如果再有新線程,會動態調整executor.prestartAllCoreThreads();//如果將線程池改小,設置下,默認核心線程數是不會回收的executor.allowCoreThreadTimeOut(true);BlockingQueue<Runnable> queue = executor.getQueue();if(queue instanceof CustomLinkedBlockingQueue){CustomLinkedBlockingQueue customQueue = (CustomLinkedBlockingQueue) queue;customQueue.setCapacity(capacity);}return ResultBeanUtil.success("獲取線程池成功!",executor);}@PostMapping("test")public ResultBean<Void> test(String threadPool,Integer size){if (size == null || size ==0){return ResultBeanUtil.paramEmpty("size不能為空");}ThreadPoolExecutor executor = map.get(threadPool);if(executor == null){return ResultBeanUtil.noData("未找到對應的線程池");}for (int i = 0; i < size; i++) {int finalI = i;executor.submit(new Runnable() {@Overridepublic void run() {log.info("任務{}執行",Integer.valueOf(finalI));}});}return ResultBeanUtil.success();} }方案三:通過agent進行監控,并對外暴露http服務
這里需要注意幾點:
1,ThreadPoolExecutor 是由Bootstrap ClassLoader加載,承載的線程池的類必須也是Bootstrap ClassLoader 加載,否則會出現找不到類定義的問題;
2,如果是實現ThreadPoolExecutor自定義的的Executor類,不需要考慮類加載的問題;
問題一的解決方案:
1,使用-Xbootclasspath/a: …/a.jar 讓承載容器由Bootstrap ClassLoader加載;
2,使用byte-buddy 增強某個類,強制讓Bootstrap ClassLoader加載
/\*\*\* 針對threadPoolExecutor 的增強\* @param instrumentation\*/private static void threadPoolExecutor(Instrumentation instrumentation){new AgentBuilder.Default().disableClassFormatChanges()//默認是不對bootstrap類加載器加載的對象instrumentation,忽略某個type后,就可以了.ignore(ElementMatchers.noneOf(ThreadPoolExecutor.class))//.with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)//.with(AgentBuilder.RedefinitionStrategy.REDEFINITION).with(AgentBuilder.TypeStrategy.Default.REDEFINE).with(AgentBuilder.InjectionStrategy.UsingUnsafe.INSTANCE).type(ElementMatchers.is(ThreadPoolExecutor.class))//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.Executor")))//.or(ElementMatchers.hasSuperType(ElementMatchers.named("java.util.concurrent.ExecutorService"))).transform((builder, typeDescription, classLoader, javaModule) ->builder.visit(Advice.to(ThreadPoolExecutorFinalizeAdvice.class).on(ElementMatchers.named("finalize"))).visit(Advice.to(ThreadPoolExecutorExecuteAdvice.class).on(ElementMatchers.named("execute")))).installOn(instrumentation);}暴露一個統一的接口,不需要各項目去實現。
public class MonitorTest {@Testpublic void test(){System.out.println(ThreadPoolMonitorData.class.getClassLoader());System.out.println(ThreadPoolMonitorData.alls());System.out.println(ThreadPoolMonitor.class.getClassLoader());ThreadPoolExecutor pool= threadpool();pool.submit(()->{System.out.println("線程池pool執行中1:"+Thread.currentThread().getName());});pool.submit(()->{System.out.println("線程池pool執行中2:"+Thread.currentThread().getName());});pool.submit(()->{System.out.println("線程池pool執行中3:"+Thread.currentThread().getName());});ExecutorService executorService = threadpool1();executorService.submit(()->{System.out.println("線程池executorService執行中1:"+Thread.currentThread().getName());});ThreadPoolMonitorData.alls().forEach((key,val) ->{System.out.println("ThreadPoolMonitorData key="+key+" val:"+val);});ThreadPoolMonitor monitor = new ThreadPoolMonitor();monitor.alls().forEach((key,val)->{System.out.println("ThreadPoolMonitor key="+key+" val:"+val);});try {Thread.sleep(3000);}catch (Exception e){e.printStackTrace();}}private ThreadPoolExecutor threadpool(){ThreadPoolExecutor pool = new ThreadPoolExecutor(5,10,200,TimeUnit.SECONDS,new LinkedBlockingQueue<>(10));return pool;}private ExecutorService threadpool1(){return Executors.newCachedThreadPool();} } public class ThreadPoolExecutorExecuteAdvice {/\*\*\* 對所有的線程的execute 進入方法進行監聽\* byteBuddy不支持對constructor\* @Advice.OnMethodEnter 必須作用與static方法\* @param obj\* @param abc\*/@Advice.OnMethodEnterpublic static void executeBefore(@Advice.This Object obj,@Advice.Argument(0) Object abc){try{ThreadPoolExecutor executor = (ThreadPoolExecutor) obj;ThreadPoolMonitorData.add(executor.hashCode()+"",(ThreadPoolExecutor) obj);}catch (Exception e){e.printStackTrace();}} } null BootstrapClassLoader 輸出是null {} sun.misc.Launcher$AppClassLoader@18b4aac2 線程池pool執行中1:pool-3-thread-1 線程池pool執行中2:pool-3-thread-2 線程池pool執行中3:pool-3-thread-3 線程池executorService執行中1:pool-4-thread-1 ThreadPoolMonitorData key=1564698139 val:java.util.concurrent.ThreadPoolExecutor@5d43661b\[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 3\] ThreadPoolMonitorData key=171421438 val:java.util.concurrent.ThreadPoolExecutor@a37aefe\[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1\]監控獲取到的數據,需要在某個地方進行統一采集。
建議的方案是:統一標準 以及 agent采集,根據實際情況采集需要的數據進行監控以及動態調整。
具體代碼實現,請看:
線程池監控-bytebuddy-agent模式
總結
- 上一篇: 冰与火之歌 《权力的游戏》
- 下一篇: 综合素质计算机考点,教师资格综合素质考前