日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~

發(fā)布時(shí)間:2023/12/4 java 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~ 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

線程池大家都## 標(biāo)題很熟悉,無(wú)論是平時(shí)的業(yè)務(wù)開(kāi)發(fā)還是框架中間件都會(huì)用到,大部分都是基于JDK線程池ThreadPoolExecutor做的封裝,

都會(huì)牽涉到這幾個(gè)核心參數(shù)的設(shè)置:核心線程數(shù),等待(任務(wù))隊(duì)列,最大線程數(shù),拒絕策略等。

但如果線程池設(shè)置不當(dāng)就會(huì)引起一系列問(wèn)題, 下面就說(shuō)下我最近碰到的問(wèn)題。

案件還原

比如你有一個(gè)項(xiàng)目中有個(gè)接口部分功能使用了線程池,這個(gè)功能會(huì)去調(diào)用多個(gè)第三方接口,都有一定的耗時(shí),為了不影響主流程的性能,不增加整體響應(yīng)時(shí)間,所以放在線程池里和主線程并行執(zhí)行,等線程池里的任務(wù)執(zhí)行完通過(guò)future.get的方式獲取線程池里的線程執(zhí)行結(jié)果,然后合并到主流程的結(jié)果里返回,大致流程如下:


線程池參數(shù)為:

  • coresize:50
  • max:200
  • queuesize:1
  • keepalivetime:60s
  • 拒絕策略為reject

假設(shè)每次請(qǐng)求提交5個(gè)task到線程池,平均每個(gè)task是耗時(shí)50ms

沒(méi)過(guò)一會(huì)就收到了線程池滿了走了拒絕策略的報(bào)錯(cuò)

結(jié)合你對(duì)線程池的了解,先思考下為什么

線程池的工作流程如下:

根據(jù)這個(gè)我們來(lái)列一個(gè)時(shí)間線

  • 項(xiàng)目剛啟動(dòng) 第1次請(qǐng)求(每次5個(gè)task提交到線程池),創(chuàng)建5個(gè)核心線程
  • 第2次請(qǐng)求 繼續(xù)創(chuàng)建5個(gè)(共10個(gè)核心線程了)
  • 直到第10次 核心線程數(shù)會(huì)達(dá)滿50個(gè)
  • 核心線程處理完之后核心線程會(huì)干嘛呢
  • 根據(jù) jdk1.8的線程池的源碼:
    線程池的線程處理處理了交給它的task之后,它會(huì)去getTask()

    源碼如下:

    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;} //加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//注意這段Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

    請(qǐng)注意上面代碼中的bool類(lèi)型的timed的賦值邏輯,

    由于allowCoreThreadTimeOut默認(rèn)為false,也就是說(shuō):

    只要?jiǎng)?chuàng)建的線程數(shù)量超過(guò)了核心線程數(shù),那么干完手上活后的線程(不管是核心線程,還是超過(guò)隊(duì)列后新開(kāi)的線程)就會(huì)走進(jìn)

    //線程狀態(tài)為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

    由于我們上面步驟里面還沒(méi)有超過(guò)coresize所以會(huì)走進(jìn)

    //線程狀態(tài)為 waiting workQueue.take()

    所以答案是:上面步驟干活的核心線程處理完之后核心線程會(huì)進(jìn)入waiting狀態(tài),
    只要隊(duì)列一有活就會(huì)被喚醒去干活。

  • 到第11次的時(shí)候
    好家伙,到這步驟的時(shí)候 ,核心線程數(shù)已滿,那么就往隊(duì)列里面塞,但是設(shè)置的queuesize=1,
    每次有5個(gè)task,那就是說(shuō)往隊(duì)列里面塞1個(gè),剩下4個(gè)(別較真我懂你意思)要?jiǎng)?chuàng)建新的max線程了。
  • 結(jié)果:

    核心線程數(shù):50
    隊(duì)列:1
    max線程:4個(gè)
    因?yàn)?0個(gè)核心線程在waiting中,所以隊(duì)列只要一add,就會(huì)立馬被消費(fèi),假設(shè)消費(fèi)的這個(gè)核心線程名字是小A。

    這里要細(xì)品一下:

    這里已經(jīng)總線程數(shù)大于核心線程數(shù)了,那么getTask()里面

    // timed=trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    那么小A干完活就會(huì)走進(jìn)

    //線程狀態(tài)為 timedwaiting workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

    此處核心線程小A就會(huì)變成timedwaiting的狀態(tài)(keepalive設(shè)置的是60s)

  • 到第12次的時(shí)候
    繼續(xù)往隊(duì)列塞1個(gè),創(chuàng)建4個(gè)max線程,max線程已經(jīng)有8個(gè)了
  • 這里 又會(huì)有一個(gè)新的核心線程小B ,會(huì)變成timedwaiting狀態(tài)了

    max線程們干完手上的活后,也會(huì)去調(diào)用getTask() 也會(huì)進(jìn)入timedwaiting狀態(tài)

    因?yàn)閝ueuesize=1,狼多肉少

  • 繼續(xù)下去,那么最終會(huì)變成
    max滿了,線程們都在timedwaiting(keepalive設(shè)置的是60s)
  • 新的提交就會(huì)走拒絕策略了

    問(wèn)題總結(jié)

    其實(shí)核心與非核心對(duì)于線程池來(lái)說(shuō)都是一樣的,只要一旦線程數(shù)超過(guò)了核心線程數(shù),那么線程就會(huì)走進(jìn)timewaiting

    把queuesize調(diào)大就好了?
    這里又有一個(gè)新的注意點(diǎn):
    上面舉例的是I/O密集型業(yè)務(wù),queuesize不是越大越好的,
    因?yàn)?#xff1a;

    線程池新創(chuàng)建的線程會(huì)優(yōu)先處理新請(qǐng)求進(jìn)來(lái)的任務(wù),而不是去處理隊(duì)列里的任務(wù),隊(duì)列里的任務(wù)只能等核心線程數(shù)忙完了才能被執(zhí)行,這樣可能造成隊(duì)列里的任務(wù)長(zhǎng)時(shí)間等待,導(dǎo)致隊(duì)列積壓,尤其是I/O密集場(chǎng)景

    慎用CallRunnerPolicy這個(gè)拒絕策略
    一定得理解這個(gè)策略會(huì)帶來(lái)什么影響,

    先看下這個(gè)拒絕策略的源碼


    如果你提交線程池的任務(wù)即時(shí)失敗也沒(méi)有關(guān)系的話,用這個(gè)拒絕策略是致命的,
    因?yàn)橐坏┏^(guò)線程池的負(fù)載后開(kāi)始吞噬tomcat線程。

    用future.get的方式慎用DiscardPolicy這個(gè)拒絕策略

    如果需要得到線程池里的線程執(zhí)行結(jié)果,使用future的方式,拒絕策略不建議使用DiscardPolicy,這種丟棄策略雖然不執(zhí)行子線程的任務(wù),

    但是還是會(huì)返回future對(duì)象(其實(shí)在這種情況下我們已經(jīng)不需要線程池返回的結(jié)果了),然后后續(xù)代碼即使判斷了future!=null也沒(méi)用,

    這樣的話還是會(huì)走到future.get()方法,如果get方法沒(méi)有設(shè)置超時(shí)時(shí)間會(huì)導(dǎo)致一直阻塞下去

    類(lèi)似下面的偽代碼:

    // 如果線程池已滿,新的請(qǐng)求會(huì)被直接執(zhí)行拒絕策略,此時(shí)如果拒絕策略設(shè)置的是DiscardPolicy丟棄任務(wù), // 則還是會(huì)返回future對(duì)象, 這樣的話后續(xù)流程還是可能會(huì)走到get獲取結(jié)果的邏輯 Future<String> future = executor.submit(() -> {// 業(yè)務(wù)邏輯,比如調(diào)用第三方接口等操作return result; });// 主流程調(diào)用邏輯 if(future != null) // 如果拒絕策略是DiscardPolicy還是會(huì)走到下面代碼future.get(超時(shí)時(shí)間); // 調(diào)用方阻塞等待結(jié)果返回,直到超時(shí)

    推薦解決方案

  • 用動(dòng)態(tài)線程池,可以動(dòng)態(tài)修改coresize,maxsize,queuesize,keepalivetime
    對(duì)線程池的核心指標(biāo)進(jìn)行埋點(diǎn)監(jiān)控,可以通過(guò)繼承 ThreadPoolExecutor 然后Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,進(jìn)行埋點(diǎn)記錄到es
    可以埋點(diǎn)的數(shù)據(jù)有:
    包括線程池運(yùn)行狀態(tài)、核心線程數(shù)、最大線程數(shù)、任務(wù)等待數(shù)、已完成任務(wù)數(shù)、線程池異常關(guān)閉等信息

    基于以上數(shù)據(jù),我們可以實(shí)時(shí)監(jiān)控和排查定位問(wèn)題
  • 參考代碼:

    /*** 自定義線程池<p>* 1.監(jiān)控線程池狀態(tài)及異常關(guān)閉等情況<p>* 2.監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo), 比如:任務(wù)執(zhí)行時(shí)間、任務(wù)等待數(shù)、已完成任務(wù)數(shù)、任務(wù)異常信息、核心線程數(shù)、最大線程數(shù)等<p>* author: maoyingxu*/ public class ThreadPoolExt extends ThreadPoolExecutor{private TimeUnit timeUnit;public ThreadPoolExt(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.timeUnit = unit;} //加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天@Overrideprotected void beforeExecute(Thread t, Runnable r) {monitor("ThreadPool monitor data:"); // 監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo)}@Overrideprotected void afterExecute(Runnable r, Throwable ex) {// 記錄線程池執(zhí)行任務(wù)的時(shí)間ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:{0}", executeTime));if (ex != null) { // 監(jiān)控線程池中的線程執(zhí)行是否異常LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);}}@Overridepublic void shutdown() {monitor("ThreadPool will be shutdown:"); // 線程池將要關(guān)閉事件,此方法會(huì)等待線程池中正在執(zhí)行的任務(wù)和隊(duì)列中等待的任務(wù)執(zhí)行完畢再關(guān)閉super.shutdown();}@Overridepublic List<Runnable> shutdownNow() {monitor("ThreadPool going to immediately be shutdown:"); // 線程池立即關(guān)閉事件,此方法會(huì)立即關(guān)閉線程池,但是會(huì)返回隊(duì)列中等待的任務(wù)// 記錄被丟棄的任務(wù), 目前只記錄日志, 后續(xù)可根據(jù)業(yè)務(wù)場(chǎng)景做進(jìn)一步處理List<Runnable> dropTasks = null;try {dropTasks = super.shutdownNow();ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("{0}ThreadPool discard task count:{1}{2}",System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));} catch (Exception e) {LogUtils.addClogException("ThreadPool shutdownNow error", e);}//加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天return dropTasks;}/*** 監(jiān)控線程池運(yùn)行時(shí)的各項(xiàng)指標(biāo), 比如:任務(wù)等待數(shù)、任務(wù)異常信息、已完成任務(wù)數(shù)、核心線程數(shù)、最大線程數(shù)等* @param title*/private void monitor(String title){try {// 線程池監(jiān)控信息記錄, 這里需要注意寫(xiě)ES的時(shí)機(jī),尤其是多個(gè)子線程的日志合并到主流程的記錄方式String threadPoolMonitor = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +"thread name:{13}{14}",System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);LogUtils.info(title, threadPoolMonitor);ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋點(diǎn)線程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100Cat.logEvent(key, String.valueOf(useRate)); // 報(bào)警設(shè)置} catch (Exception e) {LogUtils.addClogException("ThreadPool monitor error", e);}}}
  • 重寫(xiě)線程池拒絕策略, 拒絕策略主要參考了 Dubbo的線程池拒絕策略
  • public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {// 省略部分代碼@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "+ "%d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg); // 記錄最大負(fù)載情況下線程池的核心線程數(shù),活躍數(shù),最大線程數(shù)等參數(shù)dumpJStack(); // 記錄線程堆棧信息包括鎖爭(zhēng)用信息throw new RejectedExecutionException(msg);}private void dumpJStack() {long now = System.currentTimeMillis();//dump every 10 minutes 每隔10分鐘記錄一次if (now - lastPrintTime < TEN_MINUTES_MILLS) {return;}//加入Java開(kāi)發(fā)交流君樣:756584822一起吹水聊天if (!guard.tryAcquire()) { // 加鎖訪問(wèn)return;}ExecutorService pool = Executors.newSingleThreadExecutor(); // 這里單獨(dú)開(kāi)啟一個(gè)新的線程去執(zhí)行(阿里的Java開(kāi)發(fā)規(guī)范不允許直接調(diào)用Executors.newSingleThreadExecutor, 估計(jì)dubbo那時(shí)候還沒(méi)出開(kāi)發(fā)規(guī)范...)pool.execute(() -> {String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));SimpleDateFormat sdf;String os = System.getProperty(OS_NAME_KEY).toLowerCase();// window system don't support ":" in file nameif (os.contains(OS_WIN_PREFIX)) {sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);} else {sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);}String dateStr = sdf.format(new Date());//try-with-resourcestry (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {JVMUtil.jstack(jStackStream);} catch (Throwable t) {logger.error("dump jStack error", t);} finally {guard.release();}lastPrintTime = System.currentTimeMillis();});//must shutdown thread pool ,if not will lead to OOMpool.shutdown();}}

    最后,祝大家早日學(xué)有所成,拿到滿意offer

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

    總結(jié)

    以上是生活随笔為你收集整理的面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。