面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~
線程池大家都## 標(biāo)題很熟悉,無(wú)論是平時(shí)的業(yè)務(wù)開(kāi)發(fā)還是框架中間件都會(huì)用到,大部分都是基于JDK線程池ThreadPoolExecutor做的封裝,
都會(huì)牽涉到這幾個(gè)核心參數(shù)的設(shè)置:核心線程數(shù),等待(任務(wù))隊(duì)列,最大線程數(shù),拒絕策略等。
但如果線程池設(shè)置不當(dāng)就會(huì)引起一系列問(wèn)題, 下面就說(shuō)下我最近碰到的問(wèn)題。
案件還原
比如你有一個(gè)項(xiàng)目中有個(gè)接口部分功能使用了線程池,這個(gè)功能會(huì)去調(diào)用多個(gè)第三方接口,都有一定的耗時(shí),為了不影響主流程的性能,不增加整體響應(yīng)時(shí)間,所以放在線程池里和主線程并行執(zhí)行,等線程池里的任務(wù)執(zhí)行完通過(guò)future.get的方式獲取線程池里的線程執(zhí)行結(jié)果,然后合并到主流程的結(jié)果里返回,大致流程如下:
線程池參數(shù)為:
- coresize:50
- max:200
- queuesize:1
- keepalivetime:60s
- 拒絕策略為reject
假設(shè)每次請(qǐng)求提交5個(gè)task到線程池,平均每個(gè)task是耗時(shí)50ms
沒(méi)過(guò)一會(huì)就收到了線程池滿了走了拒絕策略的報(bào)錯(cuò)
結(jié)合你對(duì)線程池的了解,先思考下為什么
線程池的工作流程如下:
根據(jù)這個(gè)我們來(lái)列一個(gè)時(shí)間線
根據(jù) jdk1.8的線程池的源碼:
線程池的線程處理處理了交給它的task之后,它會(huì)去getTask()
源碼如下:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;} //加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//注意這段Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}請(qǐng)注意上面代碼中的bool類(lèi)型的timed的賦值邏輯,
由于allowCoreThreadTimeOut默認(rèn)為false,也就是說(shuō):
只要?jiǎng)?chuàng)建的線程數(shù)量超過(guò)了核心線程數(shù),那么干完手上活后的線程(不管是核心線程,還是超過(guò)隊(duì)列后新開(kāi)的線程)就會(huì)走進(jìn)
//線程狀態(tài)為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)由于我們上面步驟里面還沒(méi)有超過(guò)coresize所以會(huì)走進(jìn)
//線程狀態(tài)為 waiting workQueue.take()所以答案是:上面步驟干活的核心線程處理完之后核心線程會(huì)進(jìn)入waiting狀態(tài),
只要隊(duì)列一有活就會(huì)被喚醒去干活。
好家伙,到這步驟的時(shí)候 ,核心線程數(shù)已滿,那么就往隊(duì)列里面塞,但是設(shè)置的queuesize=1,
每次有5個(gè)task,那就是說(shuō)往隊(duì)列里面塞1個(gè),剩下4個(gè)(別較真我懂你意思)要?jiǎng)?chuàng)建新的max線程了。
結(jié)果:
核心線程數(shù):50
隊(duì)列:1
max線程:4個(gè)
因?yàn)?0個(gè)核心線程在waiting中,所以隊(duì)列只要一add,就會(huì)立馬被消費(fèi),假設(shè)消費(fèi)的這個(gè)核心線程名字是小A。
這里要細(xì)品一下:
這里已經(jīng)總線程數(shù)大于核心線程數(shù)了,那么getTask()里面
// timed=trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;那么小A干完活就會(huì)走進(jìn)
//線程狀態(tài)為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)此處核心線程小A就會(huì)變成timedwaiting的狀態(tài)(keepalive設(shè)置的是60s)
繼續(xù)往隊(duì)列塞1個(gè),創(chuàng)建4個(gè)max線程,max線程已經(jīng)有8個(gè)了
這里 又會(huì)有一個(gè)新的核心線程小B ,會(huì)變成timedwaiting狀態(tài)了
max線程們干完手上的活后,也會(huì)去調(diào)用getTask() 也會(huì)進(jìn)入timedwaiting狀態(tài)
因?yàn)閝ueuesize=1,狼多肉少
max滿了,線程們都在timedwaiting(keepalive設(shè)置的是60s)
新的提交就會(huì)走拒絕策略了
問(wèn)題總結(jié)
其實(shí)核心與非核心對(duì)于線程池來(lái)說(shuō)都是一樣的,只要一旦線程數(shù)超過(guò)了核心線程數(shù),那么線程就會(huì)走進(jìn)timewaiting
把queuesize調(diào)大就好了?
這里又有一個(gè)新的注意點(diǎn):
上面舉例的是I/O密集型業(yè)務(wù),queuesize不是越大越好的,
因?yàn)?#xff1a;
線程池新創(chuàng)建的線程會(huì)優(yōu)先處理新請(qǐng)求進(jìn)來(lái)的任務(wù),而不是去處理隊(duì)列里的任務(wù),隊(duì)列里的任務(wù)只能等核心線程數(shù)忙完了才能被執(zhí)行,這樣可能造成隊(duì)列里的任務(wù)長(zhǎng)時(shí)間等待,導(dǎo)致隊(duì)列積壓,尤其是I/O密集場(chǎng)景
慎用CallRunnerPolicy這個(gè)拒絕策略
一定得理解這個(gè)策略會(huì)帶來(lái)什么影響,
先看下這個(gè)拒絕策略的源碼
如果你提交線程池的任務(wù)即時(shí)失敗也沒(méi)有關(guān)系的話,用這個(gè)拒絕策略是致命的,
因?yàn)橐坏┏^(guò)線程池的負(fù)載后開(kāi)始吞噬tomcat線程。
用future.get的方式慎用DiscardPolicy這個(gè)拒絕策略
如果需要得到線程池里的線程執(zhí)行結(jié)果,使用future的方式,拒絕策略不建議使用DiscardPolicy,這種丟棄策略雖然不執(zhí)行子線程的任務(wù),
但是還是會(huì)返回future對(duì)象(其實(shí)在這種情況下我們已經(jīng)不需要線程池返回的結(jié)果了),然后后續(xù)代碼即使判斷了future!=null也沒(méi)用,
這樣的話還是會(huì)走到future.get()方法,如果get方法沒(méi)有設(shè)置超時(shí)時(shí)間會(huì)導(dǎo)致一直阻塞下去
類(lèi)似下面的偽代碼:
// 如果線程池已滿,新的請(qǐng)求會(huì)被直接執(zhí)行拒絕策略,此時(shí)如果拒絕策略設(shè)置的是DiscardPolicy丟棄任務(wù), // 則還是會(huì)返回future對(duì)象, 這樣的話后續(xù)流程還是可能會(huì)走到get獲取結(jié)果的邏輯 Future<String> future = executor.submit(() -> {// 業(yè)務(wù)邏輯,比如調(diào)用第三方接口等操作return result; });// 主流程調(diào)用邏輯 if(future != null) // 如果拒絕策略是DiscardPolicy還是會(huì)走到下面代碼future.get(超時(shí)時(shí)間); // 調(diào)用方阻塞等待結(jié)果返回,直到超時(shí)推薦解決方案
對(duì)線程池的核心指標(biāo)進(jìn)行埋點(diǎn)監(jiān)控,可以通過(guò)繼承 ThreadPoolExecutor 然后Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,進(jìn)行埋點(diǎn)記錄到es
可以埋點(diǎn)的數(shù)據(jù)有:
包括線程池運(yùn)行狀態(tài)、核心線程數(shù)、最大線程數(shù)、任務(wù)等待數(shù)、已完成任務(wù)數(shù)、線程池異常關(guān)閉等信息
基于以上數(shù)據(jù),我們可以實(shí)時(shí)監(jiān)控和排查定位問(wèn)題
參考代碼:
/*** 自定義線程池<p>* 1.監(jiān)控線程池狀態(tài)及異常關(guān)閉等情況<p>* 2.監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo), 比如:任務(wù)執(zhí)行時(shí)間、任務(wù)等待數(shù)、已完成任務(wù)數(shù)、任務(wù)異常信息、核心線程數(shù)、最大線程數(shù)等<p>* author: maoyingxu*/ public class ThreadPoolExt extends ThreadPoolExecutor{private TimeUnit timeUnit;public ThreadPoolExt(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.timeUnit = unit;} //加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天@Overrideprotected void beforeExecute(Thread t, Runnable r) {monitor("ThreadPool monitor data:"); // 監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo)}@Overrideprotected void afterExecute(Runnable r, Throwable ex) {// 記錄線程池執(zhí)行任務(wù)的時(shí)間ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:{0}", executeTime));if (ex != null) { // 監(jiān)控線程池中的線程執(zhí)行是否異常LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);}}@Overridepublic void shutdown() {monitor("ThreadPool will be shutdown:"); // 線程池將要關(guān)閉事件,此方法會(huì)等待線程池中正在執(zhí)行的任務(wù)和隊(duì)列中等待的任務(wù)執(zhí)行完畢再關(guān)閉super.shutdown();}@Overridepublic List<Runnable> shutdownNow() {monitor("ThreadPool going to immediately be shutdown:"); // 線程池立即關(guān)閉事件,此方法會(huì)立即關(guān)閉線程池,但是會(huì)返回隊(duì)列中等待的任務(wù)// 記錄被丟棄的任務(wù), 目前只記錄日志, 后續(xù)可根據(jù)業(yè)務(wù)場(chǎng)景做進(jìn)一步處理List<Runnable> dropTasks = null;try {dropTasks = super.shutdownNow();ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("{0}ThreadPool discard task count:{1}{2}",System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));} catch (Exception e) {LogUtils.addClogException("ThreadPool shutdownNow error", e);}//加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天return dropTasks;}/*** 監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo), 比如:任務(wù)等待數(shù)、任務(wù)異常信息、已完成任務(wù)數(shù)、核心線程數(shù)、最大線程數(shù)等* @param title*/private void monitor(String title){try {// 線程池監(jiān)控信息記錄, 這里需要注意寫(xiě)ES的時(shí)機(jī),尤其是多個(gè)子線程的日志合并到主流程的記錄方式String threadPoolMonitor = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +"thread name:{13}{14}",System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);LogUtils.info(title, threadPoolMonitor);ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋點(diǎn)線程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100Cat.logEvent(key, String.valueOf(useRate)); // 報(bào)警設(shè)置} catch (Exception e) {LogUtils.addClogException("ThreadPool monitor error", e);}}}最后,祝大家早日學(xué)有所成,拿到滿意offer
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 京东面试官:呦,你对中间件 Mycat了
- 下一篇: 面试避坑手册之 Java字节流和字符流总