日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

多线程:当你提交任务时,线程队列已经满了,这时会发生什么?

發(fā)布時(shí)間:2025/3/21 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程:当你提交任务时,线程队列已经满了,这时会发生什么? 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?

首先我調(diào)用Executors創(chuàng)建的線程池出來(lái)的對(duì)象是ThreadPoolExecutor,ScheduledThreadPoolExecutor,DelegatedExecutorService這三個(gè)類中的一個(gè)! 而ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類, DelegatedExecutorService是對(duì)ExecutorService進(jìn)行一層包裝.

今天我們這里不討論每一種線程池是干什么的, 只討論RejectedExecutionException的原由, 所以我只拿ThreadPoolExecutor這個(gè)作為例子說(shuō)了.

本文基于JDK1.8源碼進(jìn)行分析

當(dāng)我們調(diào)用Executors.newFixedThreadPool(int?nThreads )時(shí)會(huì)創(chuàng)建一個(gè)線程池給我. 源碼這個(gè)方法的實(shí)現(xiàn)是:

public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
? ? }


那么我跟進(jìn)去這個(gè)構(gòu)造方法


public ThreadPoolExecutor(int corePoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue) {
? ? ? ? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
? ? ? ? ? ? ?Executors.defaultThreadFactory(), defaultHandler);
? ? }
?
public ThreadPoolExecutor(int corePoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? TimeUnit unit,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? BlockingQueue<Runnable> workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {
? ? ? ? if (corePoolSize < 0 ||
? ? ? ? ? ? maximumPoolSize <= 0 ||
? ? ? ? ? ? maximumPoolSize < corePoolSize ||
? ? ? ? ? ? keepAliveTime < 0)
? ? ? ? ? ? throw new IllegalArgumentException();
? ? ? ? if (workQueue == null || threadFactory == null || handler == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? this.corePoolSize = corePoolSize;
? ? ? ? this.maximumPoolSize = maximumPoolSize;
? ? ? ? this.workQueue = workQueue;
? ? ? ? this.keepAliveTime = unit.toNanos(keepAliveTime);
? ? ? ? this.threadFactory = threadFactory;
? ? ? ? this.handler = handler;
? ? }

Ok,當(dāng)我看到這里的時(shí)候, 我已經(jīng)看到我想要的了.RejectedExecutionHandler這個(gè)是拒絕執(zhí)行者. 那么我看看這個(gè)我傳進(jìn)去的defaultHandler是什么, 它就是終止策略
private static final RejectedExecutionHandler defaultHandler =
? ? ? ? new AbortPolicy();
這個(gè)類的實(shí)現(xiàn)
public static class AbortPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates an <tt>AbortPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public AbortPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Always throws RejectedExecutionException.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?* @throws RejectedExecutionException always.
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? throw new RejectedExecutionException();
? ? ? ? }
? ? }

直接拋這個(gè)異常出去. 那么我明白了, 就是因?yàn)槲业木€程池的拒絕策略是AbortPolicy,所以會(huì)導(dǎo)致拋異常!??好, 那么現(xiàn)在我知道為什么拋異常了, 是不是就夠了呢? 一般來(lái)說(shuō)用來(lái)解決問(wèn)題是夠了, 但是我還想研究下什么情況下會(huì)拋這個(gè)異常, 和它的終極解決方案!

我們先來(lái)看看線程池的四種拒絕策略:

?注:?當(dāng)線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。注:默認(rèn)策略!!!!!!

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)

ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
?

1:解決方案的探索非常簡(jiǎn)單,??無(wú)非就是RejectedExecutionHandler嘛, 我Ctrl+T看看他有什么實(shí)現(xiàn)類就Ok了嘛,??它有四個(gè)實(shí)現(xiàn)類.AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy.?

關(guān)于AbortPolicy我剛才已經(jīng)說(shuō)了,它的拒絕策略就是拋異常. 說(shuō)說(shuō)下面三個(gè).

CallerRunsPolicy這個(gè)的拒絕策略是如果線程池沒(méi)有shutDown,就會(huì)執(zhí)行需要執(zhí)行的Runnable

?

public static class CallerRunsPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>CallerRunsPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public CallerRunsPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Executes task r in the caller's thread, unless the executor
? ? ? ? ?* has been shut down, in which case the task is discarded.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? if (!e.isShutdown()) {
? ? ? ? ? ? ? ? r.run();
? ? ? ? ? ? }
? ? ? ? }
? ? }

所以,我們解決拋異常的就直接用CallerRunsPolicy這個(gè)策略就可以了.
再來(lái)DiscardPolicy, 這個(gè)策略就是忽略, 即你隨意提交任務(wù), 我不鳥你就完了.

public static class DiscardPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>DiscardPolicy</tt>.
? ? ? ? ?*/
? ? ? ? public DiscardPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Does nothing, which has the effect of discarding task r.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? }
? ? }
DiscardOldestPolicy策略是說(shuō)忽略最老的任務(wù),然后執(zhí)行我們提交的.
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
? ? ? ? /**
? ? ? ? ?* Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
? ? ? ? ?*/
? ? ? ? public DiscardOldestPolicy() { }
?
? ? ? ? /**
? ? ? ? ?* Obtains and ignores the next task that the executor
? ? ? ? ?* would otherwise execute, if one is immediately available,
? ? ? ? ?* and then retries execution of task r, unless the executor
? ? ? ? ?* is shut down, in which case task r is instead discarded.
? ? ? ? ?* @param r the runnable task requested to be executed
? ? ? ? ?* @param e the executor attempting to execute this task
? ? ? ? ?*/
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
? ? ? ? ? ? if (!e.isShutdown()) {
? ? ? ? ? ? ? ? e.getQueue().poll();
? ? ? ? ? ? ? ? e.execute(r);
? ? ? ? ? ? }
? ? ? ? }
? ? }

從實(shí)現(xiàn)也可以看出來(lái), 把隊(duì)列里面頂部的彈掉, 再執(zhí)行我們的任務(wù)
大家可以根據(jù)不同所需, 在創(chuàng)建線程池之后, 用你們的ThreadPoolExecutor.setRejectedExecutionHandler(RejectedExecutionHandler handler)去設(shè)置你們的拒絕策略

2:那么本文還要繼續(xù)探索何時(shí)這些拒絕策略會(huì)被調(diào)用呢? 我以ThreadPoolExecutor的execute()方法說(shuō)事了, ScheduledThreadPoolExecutor的submit方法是大同小異的,請(qǐng)大家自己跟代碼去吧.

execute方法的javadoc中有這么一句話:

If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached,
the task is handled by the current RejectedExecutionHandler

看看execute的實(shí)現(xiàn)先

public void execute(Runnable command) {
? ? ? ? if (command == null)
? ? ? ? ? ? throw new NullPointerException();
? ? ? ? if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
? ? ? ? ? ? if (runState == RUNNING && workQueue.offer(command)) {
? ? ? ? ? ? ? ? if (runState != RUNNING || poolSize == 0)
? ? ? ? ? ? ? ? ? ? ensureQueuedTaskHandled(command);
? ? ? ? ? ? }
? ? ? ? ? ? else if (!addIfUnderMaximumPoolSize(command))
? ? ? ? ? ? ? ? reject(command); // is shutdown or saturated
? ? ? ? }
? ? }


這里一共涉及到三個(gè)方法,addIfUnderCorePoolSize,ensureQueuedTaskHandled,addIfUnderMaximumPoolSize
只要執(zhí)行到reject(command)這里就會(huì)調(diào)用我們那個(gè)Handler的rejectedExecution()方法.

那三個(gè)方法以及javadoc都告訴我們,??如果這個(gè)線程池已經(jīng)shutdown或者容量滿了之后, 就會(huì)調(diào)用拒絕策略.. 請(qǐng)注意,. 從實(shí)現(xiàn)來(lái)看,??這個(gè)容量滿了是指當(dāng)前的線程池以及其緩沖隊(duì)列的容量都滿了 才是滿了, 而不是線程池的數(shù)量。


既然提到阻塞隊(duì)列已滿說(shuō)明不是LinkedBlockingQueue!因?yàn)長(zhǎng)inkedBlockingQueue是近似無(wú)界的!

由于默認(rèn)是AbortPolicy拒絕策略,因此極可能是拋出RejectedExecutionException異常,如果是其它的策略就要另當(dāng)別論!面試是要闡述清楚!


?

總結(jié)

以上是生活随笔為你收集整理的多线程:当你提交任务时,线程队列已经满了,这时会发生什么?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。