日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ScheduledThreadPoolExecutor定时任务线程池执行原理分析

發(fā)布時(shí)間:2025/3/19 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ScheduledThreadPoolExecutor定时任务线程池执行原理分析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、示例代碼

?

@Slf4j public class ScheduleThreadPoolTest {private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);int nCnt = 0;public void testScheduleThread(){log.debug(" fixed time--> start." );executor.scheduleWithFixedDelay(()->{log.debug(" fixed time--> nCnt:" + (nCnt++));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}},3000,2000,TimeUnit.MILLISECONDS);try {executor.awaitTermination(100000,TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}log.debug(" fixed time--> end." );}public static void main(String[] args) {ScheduleThreadPoolTest scheduleThreadPoolTest = new ScheduleThreadPoolTest();scheduleThreadPoolTest.testScheduleThread();} }

二、通用線程池ThreadPoolExecutor執(zhí)行原理

1.構(gòu)造函數(shù)和成員變量定義

corePoolSize

線程池的基本大小,即在沒(méi)有任務(wù)需要執(zhí)行的時(shí)候線程池的大小并且只有在工作隊(duì)列滿了的情況下才會(huì)創(chuàng)建超出這個(gè)數(shù)量的線程。這里需要注意的是:在剛剛創(chuàng)建ThreadPoolExecutor的時(shí)候,線程并不會(huì)立即啟動(dòng),而是要等到有任務(wù)提交時(shí)才會(huì)啟動(dòng),除非調(diào)用了prestartCoreThread/prestartAllCoreThreads事先啟動(dòng)核心線程。再考慮到keepAliveTime和allowCoreThreadTimeOut超時(shí)參數(shù)的影響,所以沒(méi)有任務(wù)需要執(zhí)行的時(shí)候,線程池的大小不一定是corePoolSize。

maximumPoolSize

線程池中允許的最大線程數(shù),線程池中的當(dāng)前線程數(shù)目不會(huì)超過(guò)該值。如果隊(duì)列中任務(wù)已滿,并且當(dāng)前線程個(gè)數(shù)小于maximumPoolSize,那么會(huì)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。這里值得一提的是largestPoolSize,該變量記錄了線程池在整個(gè)生命周期中曾經(jīng)出現(xiàn)的最大線程個(gè)數(shù)。為什么說(shuō)是曾經(jīng)呢?因?yàn)榫€程池創(chuàng)建之后,可以調(diào)用setMaximumPoolSize()改變運(yùn)行的最大線程的數(shù)目。

poolSize

線程池中當(dāng)前線程的數(shù)量,當(dāng)該值為0的時(shí)候,意味著沒(méi)有任何線程,線程池會(huì)終止;同一時(shí)刻,poolSize不會(huì)超過(guò)maximumPoolSize。

private final BlockingQueue<Runnable> workQueue; 阻塞任務(wù)隊(duì)列。 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

新提交一個(gè)任務(wù)時(shí)的處理流程很明顯:

1、如果當(dāng)前線程池的線程數(shù)還沒(méi)有達(dá)到基本大小(poolSize < corePoolSize),無(wú)論是否有空閑的線程新增一個(gè)線程處理新提交的任務(wù);

2、如果當(dāng)前線程池的線程數(shù)大于或等于基本大小(poolSize >= corePoolSize)?且任務(wù)隊(duì)列未滿時(shí),就將新提交的任務(wù)提交到阻塞隊(duì)列排隊(duì),等候處理workQueue.offer(command);

3、如果當(dāng)前線程池的線程數(shù)大于或等于基本大小(poolSize >= corePoolSize)?且任務(wù)隊(duì)列滿時(shí)

3.1、當(dāng)前poolSize<maximumPoolSize,那么就新增線程來(lái)處理任務(wù);

3.2、當(dāng)前poolSize=maximumPoolSize,那么意味著線程池的處理能力已經(jīng)達(dá)到了極限,此時(shí)需要拒絕新增加的任務(wù)。至于如何拒絕處理新增的任務(wù),取決于線程池的飽和策略RejectedExecutionHandler。

2.提交任務(wù)時(shí),創(chuàng)建ThreadPoolExecutor的Worker類對(duì)象(實(shí)現(xiàn)runnable接口),并運(yùn)行此線程,把此worker添加到works中。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}

3.添加WORK的代碼和流程,會(huì)新建一個(gè)worker對(duì)象,并且運(yùn)行線程。

private boolean addWorker(Runnable firstTask, boolean core) {boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {if (workerAdded) {t.start();workerStarted = true;}}} return workerStarted;}

4.循環(huán)從阻塞任務(wù)隊(duì)列取出任務(wù),然后執(zhí)行任務(wù)。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} finally {afterExecute(task, thrown);}} } finally {processWorkerExit(w, completedAbruptly);}}

?5.取任務(wù)的過(guò)程

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

三、ScheduledThreadPoolExecutor從隊(duì)列取任務(wù)和存任務(wù)的過(guò)程

1.DelayedWorkQueue為ScheduledThreadPoolExecutor的阻塞任務(wù)隊(duì)列。 static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture[16];private final ReentrantLock lock = new ReentrantLock();private int size = 0;private Thread leader = null;private final Condition available;DelayedWorkQueue() {this.available = this.lock.newCondition();}

2.取任務(wù)的過(guò)程

public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 自循環(huán),實(shí)現(xiàn)對(duì)隊(duì)列的監(jiān)控 保證返回根節(jié)點(diǎn)for (;;) {// 獲取根節(jié)點(diǎn)任務(wù)RunnableScheduledFuture<?> first = queue[0];// 如果隊(duì)列為空,則通知其他線程等待if (first == null)available.await();else {// 獲取根節(jié)點(diǎn)任務(wù)等待時(shí)間與系統(tǒng)時(shí)間的差值long delay = first.getDelay(NANOSECONDS);// 如果等待時(shí)間已經(jīng)到,則返回根節(jié)點(diǎn)任務(wù)并重排序隊(duì)列if (delay <= 0)return finishPoll(first);// 如果等待時(shí)間還沒(méi)有到,則繼續(xù)等待且不擁有任務(wù)的引用first = null; // don't retain ref while waiting// 如果此時(shí)等待根節(jié)點(diǎn)的leader線程不為空則通知其他線程繼續(xù)等待if (leader != null)available.await();else {// 如果此時(shí)leader線程為空,則把當(dāng)前線程置為leaderThread thisThread = Thread.currentThread();leader = thisThread;try {// 當(dāng)前線程等待延遲的時(shí)間available.awaitNanos(delay); } finally {// 延遲時(shí)間已到 則把當(dāng)前線程變成非leader線程// 當(dāng)前線程繼續(xù)用于執(zhí)行for循環(huán)的邏輯if (leader == thisThread)leader = null;}}}} } finally {// 如果leader為null 則喚醒一個(gè)線程成為leaderif (leader == null && queue[0] != null)available.signal();lock.unlock();} }

3.?finishPoll(RunnableScheduledFuture)-獲取根節(jié)點(diǎn)后重排序

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {// 因?yàn)槿〕龈?jié)點(diǎn) 所以隊(duì)列深度減1 并賦值給sint s = --size;// 獲取隊(duì)列最后一個(gè)任務(wù)RunnableScheduledFuture<?> x = queue[s];queue[s] = null; // 該位置元素置空// 如果s已經(jīng)根節(jié)點(diǎn)則直接返回,否則堆重排序if (s != 0)siftDown(0, x);// 取出來(lái)的任務(wù) 設(shè)置其堆索引為-1setIndex(f, -1);return f; // 返回任務(wù) }

4.siftDown(int,RunnableScheduledFuture)-移除元素后重排序

private void siftDown(int k, RunnableScheduledFuture<?> key) {// 取隊(duì)列當(dāng)前深度的一半 相當(dāng)于size / 2int half = size >>> 1;// 索引k(初值為0)的值大于half時(shí) 退出循環(huán)while (k < half) {// 獲取左節(jié)點(diǎn)的索引int child = (k << 1) + 1;// 獲取左節(jié)點(diǎn)的任務(wù)RunnableScheduledFuture<?> c = queue[child];// 獲取右節(jié)點(diǎn)的索引int right = child + 1;// 如果右節(jié)點(diǎn)在范圍內(nèi) 且 左節(jié)點(diǎn)大于右節(jié)點(diǎn),if (right < size && c.compareTo(queue[right]) > 0)// 更新child的值為右節(jié)點(diǎn)索引值 且更新c為右節(jié)點(diǎn)的任務(wù)c = queue[child = right];// 如果任務(wù)key小于任務(wù)c 則退出循環(huán)(最小堆)if (key.compareTo(c) <= 0)break;// 否則把任務(wù)c放到k上(較小的任務(wù)放到父節(jié)點(diǎn)上)queue[k] = c;// 設(shè)置任務(wù)c的堆索引setIndex(c, k);// 更新k的值為childk = child;}// 任務(wù)key插入k的位置queue[k] = key;// 設(shè)置任務(wù)key的堆索引ksetIndex(key, k); }

執(zhí)行的流程圖為:

5.入隊(duì)列的過(guò)程??offer(Runnable)-新增元素

public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();// 只能存放RunnableScheduledFuture任務(wù)RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;// 為了保證隊(duì)列的線程安全,offer()方法為線程安全方法final ReentrantLock lock = this.lock;lock.lock();try {// 當(dāng)前隊(duì)列實(shí)際深度,即隊(duì)列中任務(wù)個(gè)數(shù)int i = size;// 如果任務(wù)數(shù)已經(jīng)超過(guò)數(shù)組長(zhǎng)度,則擴(kuò)容為原來(lái)的1.5倍if (i >= queue.length)grow();// 隊(duì)列實(shí)際深度+1size = i + 1;// 如果是空隊(duì)列 新增任務(wù)插入到數(shù)組頭部;if (i == 0) {queue[0] = e;// 設(shè)置該任務(wù)在堆中的索引,便于后續(xù)取消或者刪除任務(wù);免于查找setIndex(e, 0);} else {// 如果不是空隊(duì)列 則調(diào)用siftUp()插入任務(wù)siftUp(i, e);}// 如果作為首個(gè)任務(wù)插入到數(shù)組頭部if (queue[0] == e) {// 置空當(dāng)前l(fā)eader線程leader = null;// 喚醒一個(gè)等待的線程 使其成為leader線程available.signal();}} finally {lock.unlock();}return true; }

?這個(gè)方法理解的難點(diǎn)在于leader線程。若新增任務(wù)插入空隊(duì)列中,首先清空l(shuí)eader線程,并喚醒等待線程中的某一個(gè)線程,把喚醒的線程作為leader線程;若新增任務(wù)插入前,隊(duì)列中已經(jīng)存在任務(wù),則說(shuō)明已經(jīng)有l(wèi)eader線程在等待獲取根節(jié)點(diǎn),此時(shí)無(wú)需設(shè)置leader線程。leader線程的作用就是用來(lái)監(jiān)聽(tīng)隊(duì)列的根節(jié)點(diǎn)任務(wù),如果leader線程沒(méi)有獲取到根節(jié)點(diǎn)任務(wù)則通知其他線程等待,這表明leader線程決定著等待線程的狀態(tài)。
用leader-before這種機(jī)制,可以減少線程的等待時(shí)間,而每一個(gè)等待的線程都有可能成為leader線程。注意:這里還不太清除哪些線程會(huì)等待。

6.siftUp(int,RunnableScheduledFuture)-新增任務(wù)后重排
新增任務(wù)插入隊(duì)列(數(shù)組),首先插入到數(shù)組的尾部,然后對(duì)比其與該位置的父節(jié)點(diǎn)的大小,如果新增任務(wù)大于父節(jié)點(diǎn)任務(wù)(此處是最小堆),則新增任務(wù)位置不變,否則改變其與父節(jié)點(diǎn)的位置,并再比較父節(jié)點(diǎn)與父父節(jié)點(diǎn)的大小,直到根節(jié)點(diǎn)。插入的過(guò)程可以結(jié)合上面堆的二叉樹(shù)變化過(guò)程圖一起理解。
插入流程圖:

private void siftUp(int k, RunnableScheduledFuture<?> key) {// 循環(huán),當(dāng)k為根節(jié)點(diǎn)時(shí)結(jié)束循環(huán)while (k > 0) {// 獲取k的父節(jié)點(diǎn)索引,相當(dāng)于(k-1)/2int parent = (k - 1) >>> 1;// 獲取父節(jié)點(diǎn)位置的任務(wù)RunnableScheduledFuture<?> e = queue[parent];// 判斷key任務(wù)與父節(jié)點(diǎn)任務(wù)time屬性的大小,即延遲時(shí)間if (key.compareTo(e) >= 0)break; // 父節(jié)點(diǎn)任務(wù)延遲時(shí)間小于key任務(wù)延遲時(shí)間,則退出循環(huán)// 否則交換父節(jié)點(diǎn)parent與節(jié)點(diǎn)k的任務(wù)queue[k] = e;// 更新任務(wù)e在堆中的索引值setIndex(e, k);// 更新k的值 比較其與父父節(jié)點(diǎn)的大小k = parent;}// 任務(wù)key放入數(shù)組k的位置,k的值是不斷更新的queue[k] = key;// 設(shè)置任務(wù)key在堆中的索引值setIndex(key, k); }

7.scheduleWithFixedDelay方法:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t; }

任務(wù)添加到隊(duì)列后,工作線程會(huì)從隊(duì)列獲取并移除到期的元素,然后執(zhí)行run方法,所以下面看看ScheduledFutureTask的run方法如何實(shí)現(xiàn)定時(shí)調(diào)度的。

其中ScheduledFutureTask封裝定時(shí)任務(wù)內(nèi)部類,重點(diǎn)關(guān)注其run方法。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);//僅執(zhí)行一次else if (!periodic)ScheduledFutureTask.super.run();//定時(shí)任務(wù)else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();//重新加入該任務(wù)到delay隊(duì)列reExecutePeriodic(outerTask);}}

定時(shí)調(diào)度是先從隊(duì)列獲取任務(wù)然后執(zhí)行,然后在重新設(shè)置任務(wù)時(shí)間,在把任務(wù)放入隊(duì)列實(shí)現(xiàn)的。

如果任務(wù)執(zhí)行時(shí)間大于delay時(shí)間則等任務(wù)執(zhí)行完畢后的delay時(shí)間后在次調(diào)用任務(wù),不會(huì)同一個(gè)任務(wù)并發(fā)執(zhí)行。

四、上面的delayWorkQueue使用了堆的數(shù)據(jù)結(jié)構(gòu),

堆的一些屬性
堆都是滿二叉樹(shù).因?yàn)闈M二叉樹(shù)會(huì)充分利用數(shù)組的內(nèi)存空間;
最小堆是指父節(jié)點(diǎn)比左節(jié)點(diǎn)和右節(jié)點(diǎn)都小的結(jié)構(gòu),所以整個(gè)最小堆中,根節(jié)點(diǎn)是最小的節(jié)點(diǎn);
最大堆是指父節(jié)點(diǎn)比左節(jié)點(diǎn)和右節(jié)點(diǎn)都大的結(jié)構(gòu),所以整個(gè)最大堆中,根節(jié)點(diǎn)是最大的節(jié)點(diǎn);
最大堆和最小堆的左節(jié)點(diǎn)和右節(jié)點(diǎn)沒(méi)有關(guān)系,只能判斷父節(jié)點(diǎn)和左右兩節(jié)點(diǎn)的大小關(guān)系;
基于堆的這些屬性,堆適用于找到集合中的最大或者最小值;另外,堆結(jié)構(gòu)記錄任務(wù)及其索引的關(guān)系,便于插入數(shù)據(jù)或者刪除數(shù)據(jù)后重新排序,所以堆適用于優(yōu)先隊(duì)列。

參考鏈接:https://blog.csdn.net/nobody_1/article/details/99684009

總結(jié)

以上是生活随笔為你收集整理的ScheduledThreadPoolExecutor定时任务线程池执行原理分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。