面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~
線程池大家都## 標題很熟悉,無論是平時的業務開發還是框架中間件都會用到,大部分都是基于JDK線程池ThreadPoolExecutor做的封裝,
都會牽涉到這幾個核心參數的設置:核心線程數,等待(任務)隊列,最大線程數,拒絕策略等。
但如果線程池設置不當就會引起一系列問題, 下面就說下我最近碰到的問題。
案件還原
比如你有一個項目中有個接口部分功能使用了線程池,這個功能會去調用多個第三方接口,都有一定的耗時,為了不影響主流程的性能,不增加整體響應時間,所以放在線程池里和主線程并行執行,等線程池里的任務執行完通過future.get的方式獲取線程池里的線程執行結果,然后合并到主流程的結果里返回,大致流程如下:
線程池參數為:
- coresize:50
- max:200
- queuesize:1
- keepalivetime:60s
- 拒絕策略為reject
假設每次請求提交5個task到線程池,平均每個task是耗時50ms
沒過一會就收到了線程池滿了走了拒絕策略的報錯
結合你對線程池的了解,先思考下為什么
線程池的工作流程如下:
根據這個我們來列一個時間線
根據 jdk1.8的線程池的源碼:
線程池的線程處理處理了交給它的task之后,它會去getTask()
源碼如下:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;} //加入Java開發交流君樣:756584822一起吹水聊天int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//注意這段Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}請注意上面代碼中的bool類型的timed的賦值邏輯,
由于allowCoreThreadTimeOut默認為false,也就是說:
只要創建的線程數量超過了核心線程數,那么干完手上活后的線程(不管是核心線程,還是超過隊列后新開的線程)就會走進
//線程狀態為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)由于我們上面步驟里面還沒有超過coresize所以會走進
//線程狀態為 waiting workQueue.take()所以答案是:上面步驟干活的核心線程處理完之后核心線程會進入waiting狀態,
只要隊列一有活就會被喚醒去干活。
好家伙,到這步驟的時候 ,核心線程數已滿,那么就往隊列里面塞,但是設置的queuesize=1,
每次有5個task,那就是說往隊列里面塞1個,剩下4個(別較真我懂你意思)要創建新的max線程了。
結果:
核心線程數:50
隊列:1
max線程:4個
因為50個核心線程在waiting中,所以隊列只要一add,就會立馬被消費,假設消費的這個核心線程名字是小A。
這里要細品一下:
這里已經總線程數大于核心線程數了,那么getTask()里面
// timed=trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;那么小A干完活就會走進
//線程狀態為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)此處核心線程小A就會變成timedwaiting的狀態(keepalive設置的是60s)
繼續往隊列塞1個,創建4個max線程,max線程已經有8個了
這里 又會有一個新的核心線程小B ,會變成timedwaiting狀態了
max線程們干完手上的活后,也會去調用getTask() 也會進入timedwaiting狀態
因為queuesize=1,狼多肉少
max滿了,線程們都在timedwaiting(keepalive設置的是60s)
新的提交就會走拒絕策略了
問題總結
其實核心與非核心對于線程池來說都是一樣的,只要一旦線程數超過了核心線程數,那么線程就會走進timewaiting
把queuesize調大就好了?
這里又有一個新的注意點:
上面舉例的是I/O密集型業務,queuesize不是越大越好的,
因為:
線程池新創建的線程會優先處理新請求進來的任務,而不是去處理隊列里的任務,隊列里的任務只能等核心線程數忙完了才能被執行,這樣可能造成隊列里的任務長時間等待,導致隊列積壓,尤其是I/O密集場景
慎用CallRunnerPolicy這個拒絕策略
一定得理解這個策略會帶來什么影響,
先看下這個拒絕策略的源碼
如果你提交線程池的任務即時失敗也沒有關系的話,用這個拒絕策略是致命的,
因為一旦超過線程池的負載后開始吞噬tomcat線程。
用future.get的方式慎用DiscardPolicy這個拒絕策略
如果需要得到線程池里的線程執行結果,使用future的方式,拒絕策略不建議使用DiscardPolicy,這種丟棄策略雖然不執行子線程的任務,
但是還是會返回future對象(其實在這種情況下我們已經不需要線程池返回的結果了),然后后續代碼即使判斷了future!=null也沒用,
這樣的話還是會走到future.get()方法,如果get方法沒有設置超時時間會導致一直阻塞下去
類似下面的偽代碼:
// 如果線程池已滿,新的請求會被直接執行拒絕策略,此時如果拒絕策略設置的是DiscardPolicy丟棄任務, // 則還是會返回future對象, 這樣的話后續流程還是可能會走到get獲取結果的邏輯 Future<String> future = executor.submit(() -> {// 業務邏輯,比如調用第三方接口等操作return result; });// 主流程調用邏輯 if(future != null) // 如果拒絕策略是DiscardPolicy還是會走到下面代碼future.get(超時時間); // 調用方阻塞等待結果返回,直到超時推薦解決方案
對線程池的核心指標進行埋點監控,可以通過繼承 ThreadPoolExecutor 然后Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,進行埋點記錄到es
可以埋點的數據有:
包括線程池運行狀態、核心線程數、最大線程數、任務等待數、已完成任務數、線程池異常關閉等信息
基于以上數據,我們可以實時監控和排查定位問題
參考代碼:
/*** 自定義線程池<p>* 1.監控線程池狀態及異常關閉等情況<p>* 2.監控線程池運行時的各項指標, 比如:任務執行時間、任務等待數、已完成任務數、任務異常信息、核心線程數、最大線程數等<p>* author: maoyingxu*/ public class ThreadPoolExt extends ThreadPoolExecutor{private TimeUnit timeUnit;public ThreadPoolExt(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.timeUnit = unit;} //加入Java開發交流君樣:756584822一起吹水聊天@Overrideprotected void beforeExecute(Thread t, Runnable r) {monitor("ThreadPool monitor data:"); // 監控線程池運行時的各項指標}@Overrideprotected void afterExecute(Runnable r, Throwable ex) {// 記錄線程池執行任務的時間ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:{0}", executeTime));if (ex != null) { // 監控線程池中的線程執行是否異常LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);}}@Overridepublic void shutdown() {monitor("ThreadPool will be shutdown:"); // 線程池將要關閉事件,此方法會等待線程池中正在執行的任務和隊列中等待的任務執行完畢再關閉super.shutdown();}@Overridepublic List<Runnable> shutdownNow() {monitor("ThreadPool going to immediately be shutdown:"); // 線程池立即關閉事件,此方法會立即關閉線程池,但是會返回隊列中等待的任務// 記錄被丟棄的任務, 目前只記錄日志, 后續可根據業務場景做進一步處理List<Runnable> dropTasks = null;try {dropTasks = super.shutdownNow();ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("{0}ThreadPool discard task count:{1}{2}",System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));} catch (Exception e) {LogUtils.addClogException("ThreadPool shutdownNow error", e);}//加入Java開發交流君樣:756584822一起吹水聊天return dropTasks;}/*** 監控線程池運行時的各項指標, 比如:任務等待數、任務異常信息、已完成任務數、核心線程數、最大線程數等* @param title*/private void monitor(String title){try {// 線程池監控信息記錄, 這里需要注意寫ES的時機,尤其是多個子線程的日志合并到主流程的記錄方式String threadPoolMonitor = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +"thread name:{13}{14}",System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);LogUtils.info(title, threadPoolMonitor);ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋點線程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100Cat.logEvent(key, String.valueOf(useRate)); // 報警設置} catch (Exception e) {LogUtils.addClogException("ThreadPool monitor error", e);}}}最后,祝大家早日學有所成,拿到滿意offer
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 京东面试官:呦,你对中间件 Mycat了
- 下一篇: 被问到了!为什么一定要使用分布式,内行啊