Netty源码学习9——从Timer到ScheduledThreadPoolExecutor到HashedWheelTimer
系列文章目錄和關(guān)于我
一丶前言
之前在學(xué)習(xí)netty源碼的時候,經(jīng)常看netty hash時間輪(HashedWheelTimer)的出現(xiàn),時間輪作為一種定時調(diào)度機(jī)制,在jdk中還存在Timer和ScheduledThreadPoolExecutor。那么為什么netty要重復(fù)造*昵,HashedWheelTimer又是如何實(shí)現(xiàn)的,解決了什么問題?
這一篇將從Timer-->ScheduledThreadPoolExecutor-->HashedWheelTimer 依次進(jìn)行講解,學(xué)習(xí)其中優(yōu)秀的設(shè)計(jì)。
二丶Timer
1.基本結(jié)構(gòu)
Timer 始于java 1.3,原理和內(nèi)部結(jié)構(gòu)也相對簡單,
如上圖所示,Timer內(nèi)部存在一個線程(TimerThread實(shí)例)和一個數(shù)組實(shí)現(xiàn)的堆
TimerThread運(yùn)行時會不斷從數(shù)組中拿deadline最早的任務(wù),進(jìn)行執(zhí)行。為了更快的拿到dealline最早的任務(wù),Timer使用數(shù)組構(gòu)建了一個堆,堆排序的依據(jù)便是任務(wù)的執(zhí)行時間。
Timer中只存在一個線程TimerThread來執(zhí)行定時任務(wù),因此如果一個任務(wù)耗時太長會延后其他任務(wù)的執(zhí)行
并且TimerThread不會catch任務(wù)執(zhí)行產(chǎn)生的異常,也就是說如果一個任務(wù)執(zhí)行失敗了,那么TimerThread的執(zhí)行會終止
2.源碼
2.1 TimerThread 的執(zhí)行
如下是TimerThread 執(zhí)行的源碼
- 基于等待喚醒機(jī)制,避免無意義自旋
- 每次都拿任務(wù)隊(duì)列中ddl最早的任務(wù)
- 如果周期任務(wù),會計(jì)算下一次執(zhí)行時間,重新塞到任務(wù)隊(duì)列中
- 巧妙的使用了 period 等于0,小于0,大于0進(jìn)行非周期運(yùn)行任務(wù),fixed delay,fixed rate的區(qū)分
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
// 對隊(duì)列上鎖,也就是提交任務(wù)和拿任務(wù)是串行的
synchronized(queue) {
// 如果Timer被關(guān)閉newTasksMayBeScheduled會為false
// 這里使用等待喚醒機(jī)制來阻塞TimerThread直到存在任務(wù)
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
// 說明newTasksMayBeScheduled 為false 且沒任務(wù),那么TimerTask的死循環(huán)被break,
if (queue.isEmpty())
break;
long currentTime, executionTime;
task = queue.getMin();
// 對任務(wù)上鎖,避免并發(fā)執(zhí)行,TimerTask 使用state記錄任務(wù)狀態(tài)
synchronized(task.lock) {
// 任務(wù)取消
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
// 需要執(zhí)行
if (taskFired = (executionTime<=currentTime)) {
// task.period == 0 說明不是周期執(zhí)行的任務(wù)
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else {
// task.period 小于0 那么是fixed-delay ,
// task.period 大于0 那么是fixed-rate
// 如果是周期性的,會再次塞到任務(wù)隊(duì)列中
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// 沒到執(zhí)行的時間,那么等待
if (!taskFired)
queue.wait(executionTime - currentTime);
}
// 到這里會釋放鎖 ,因?yàn)槿蝿?wù)的執(zhí)行不需要鎖
// 任務(wù)執(zhí)行
if (taskFired)
task.run();
} catch(InterruptedException e) {
}
}
}
這段代碼筆者認(rèn)為有一點(diǎn)可以優(yōu)化的,那就是在判斷任務(wù)是否需要執(zhí)行,根據(jù)period計(jì)算執(zhí)行時間的時候,會在持有任務(wù)隊(duì)列鎖的情況下,拿任務(wù)鎖執(zhí)行——但是判斷任務(wù)是否需要執(zhí)行,根據(jù)period計(jì)算執(zhí)行時間 這段時間其實(shí)是可以釋放隊(duì)列鎖的!這樣并發(fā)的能力可以更強(qiáng)一點(diǎn),可能Timer的定位也不是應(yīng)用在高并發(fā)任務(wù)提交執(zhí)行的場景,畢竟內(nèi)部也只有一個線程,所以也無傷大雅。
2.2 任務(wù)的提交
任務(wù)的提交最終都調(diào)用到sched(TimerTask task, long time, long period)方法
這里比較有趣的是,加入到隊(duì)列后,會判斷當(dāng)前任務(wù)是不是調(diào)度時間最早的任務(wù),如果是那么進(jìn)行喚醒!這么處理的原因可見下圖解釋:
同樣我不太理解為什么,Timer的作者要拿到隊(duì)列鎖,后拿任務(wù)鎖,去復(fù)制TimerTask的屬性,完全可以將TimerTask的修改放在隊(duì)列鎖的外面,如下
2.3 隊(duì)列實(shí)現(xiàn)的堆
可以看到新增任務(wù)需要進(jìn)行fixUp,調(diào)整數(shù)組中的元素,實(shí)現(xiàn)小根堆,這里時間復(fù)雜度是logN
3.Timer的不足
- 單線程:如果存在多個定時任務(wù),那么后面的定時任務(wù)會由于前面任務(wù)的執(zhí)行而delay
- 錯誤傳播:一個定時任務(wù)執(zhí)行失敗,那么會導(dǎo)致Timer的結(jié)束
- 不友好的API:使用Timer執(zhí)行延遲任務(wù),需要程序員將任務(wù)保證為TimerTask,并且TimerTask無法獲取延遲任務(wù)結(jié)果
三丶ScheduledThreadPoolExecutor
java 1.5引入的juc工具包,其中ScheduledThreadPoolExecutor就提供了定時調(diào)度的能力
- 其繼承了ThreadPoolExecutor,具備多線程并發(fā)執(zhí)行任務(wù)的能力。
- 更強(qiáng)的錯誤恢復(fù):如果一個任務(wù)拋出異常,并不會影響調(diào)度器本身和其他任務(wù)
- 更友好的API:支持傳入Runnable,和Callable,調(diào)度線程將返回ScheduledFuture,我們可以通過ScheduledFuture來查看任務(wù)執(zhí)行狀態(tài),以及獲取任務(wù)結(jié)果
由于ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,其中執(zhí)行任務(wù)的線程運(yùn)行邏輯同ThreadPoolExecutor(《JUC源碼學(xué)習(xí)筆記5——1.5w字和你一起刨析線程池ThreadPoolExecutor源碼,全網(wǎng)最細(xì)doge》)
1.基本結(jié)構(gòu)
ScheduleThreadPoolExecutor內(nèi)部結(jié)構(gòu)和ThreadPoolExecutor類似,不同的是內(nèi)部的阻塞隊(duì)列是DelayedWorkQueue——基于數(shù)組實(shí)現(xiàn)的堆,依據(jù)延遲時間進(jìn)行排序,堆頂,依據(jù)Condition等待喚醒機(jī)制實(shí)現(xiàn)的阻塞隊(duì)列;另外堆中的元素是ScheduledFuture
2.源碼
2.1 ScheduledFutureTask的執(zhí)行
public void run() {
// 是否周期性,就是判斷period是否為0。
boolean periodic = isPeriodic();
// 檢查任務(wù)是否可以被執(zhí)行。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任務(wù)直接調(diào)用run運(yùn)行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest,則設(shè)置下次運(yùn)行時間并調(diào)用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
// 需要重新將任務(wù)放到工作隊(duì)列中
reExecutePeriodic(outerTask);
}
}
可以看到任務(wù)實(shí)現(xiàn)周期執(zhí)行的關(guān)鍵在于任務(wù)執(zhí)行完后會再次被放到延遲阻塞隊(duì)列中,ScheduledFutureTask的父類是FutureTask,其內(nèi)部使用volatile修飾的狀態(tài)字段來記錄任務(wù)運(yùn)行狀態(tài),使用cas避免任務(wù)重復(fù)執(zhí)行(詳細(xì)可看《JUC源碼學(xué)習(xí)筆記7——FutureTask源碼解析》)
2.2 DelayedWorkQueue
交給ScheduledThreadPoolExecutor執(zhí)行的任務(wù),都放在DelayedWorkQueue中,下面我們看看DelayedWorkQueue是如何接收任務(wù),以及獲取任務(wù)的邏輯
2.2.1 offer接收任務(wù)
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 容量擴(kuò)增50%。
grow();
size = i + 1;
// 第一個元素
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 插入堆尾。
siftUp(i, e);
}
// 如果新加入的元素成為了堆頂,則原先的leader就無效了。
if (queue[0] == e) {
leader = null;
// 那么進(jìn)行喚醒,因?yàn)榧尤氲娜蝿?wù)延遲時間是最短的,可能之前隊(duì)列存在一個延遲時間更長的任務(wù),導(dǎo)致有線程block了,這時候需要進(jìn)行喚醒
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
可以看到大致原理和Timer中的阻塞隊(duì)列類似,但是其中出現(xiàn)了leader(DelayedWorkQueue中的Thread類型屬性)目前我們還不直到此屬性的作用,需要我們結(jié)合take源碼進(jìn)行分析
2.2.2 take獲取任務(wù)
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 上鎖
lock.lockInterruptibly();
try {
for (;;) {
// 堆頂元素,也就是延遲最小的元素,馬上就要執(zhí)行的任務(wù)
RunnableScheduledFuture<?> first = queue[0];
// 如果當(dāng)前隊(duì)列無元素,則在available條件上無限等待直至有任務(wù)通過offer入隊(duì)并喚醒。
if (first == null)
available.await();
else {
// 延遲最小任務(wù)的延遲
long delay = first.getDelay(NANOSECONDS);
// 如果delay小于0說明任務(wù)該立刻執(zhí)行了。
if (delay <= 0)
// 從堆中移除元素并返回結(jié)果。
return finishPoll(first);
first = null;
// 如果目前有l(wèi)eader的話,當(dāng)前線程作為follower在available條件上無限等待直至喚醒。
if (leader != null)
available.await();
else {
// 如果沒用leader 那么當(dāng)前線程設(shè)置為leader,
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 進(jìn)行超時等待喚醒 ,等待直到可以執(zhí)行,or存在其他需要更早的任務(wù)被add進(jìn)行隊(duì)列
available.awaitNanos(delay);
} finally {
// 如果喚醒之后leader 還是自己那么設(shè)置為null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader為null ,隊(duì)列頭部存在任務(wù),那么喚醒一個線程來獲取任務(wù)
if (leader == null && queue[0] != null)
available.signal();
// 如果leader 不為null,或者隊(duì)列沒用元素,那么直接釋放鎖
lock.unlock();
}
}
整個原理看下來并不復(fù)雜,無非是以及Condition提供的等待喚醒機(jī)制實(shí)現(xiàn)任務(wù)的延遲的執(zhí)行。
但是代碼中存在leader相關(guān)的操作,這才是DelayedWorkQueue的精華,下面我們對這個leader機(jī)制進(jìn)行學(xué)習(xí)
2.2.3 Leader-Follower 模式
DelayedWorkQueue中的leader是一個Thread類型的屬性,它指向了用于在隊(duì)列頭等待任務(wù)的線程。用于最小化不必要的定時等待
當(dāng)一個線程成為leader線程時,它只等待下一個延遲過去,而其他線程則無限期地等待。在leader從take或poll返回之前,leader線程必須向其他線程發(fā)出信號,除非其他線程在此期間成為引導(dǎo)線程。每當(dāng)隊(duì)列的頭被一個過期時間較早的任務(wù)替換時,leader字段就會通過重置為null而無效,并向一些等待線程(但不一定是當(dāng)前的leader)發(fā)出信號。
這么說可能不是很好理解,我們結(jié)合代碼進(jìn)行分析,如下是take中的一段:
-
如果leader 不為null,讓前來拿任務(wù)的線程無限期等待
-
為什么要這么做——減少無意義的鎖競爭,最早執(zhí)行的任務(wù)已經(jīng)分配給leader了,
follower只需要等著即可
-
follower等什么?——等leader拿到任務(wù)后進(jìn)行喚醒,leader拿到任務(wù),那么接下來follower需要執(zhí)行后續(xù)的任務(wù)了;或者堆中插入了另外一個延遲時間更小的任務(wù)
-
-
如果leader為null,那么當(dāng)前線程成為leader
-
這意味著堆頂延遲時間最短的任務(wù)交由當(dāng)前線程執(zhí)行,當(dāng)前線程只需要等待堆頂任務(wù)延遲時間結(jié)束即可
-
leader什么時候被喚醒:
延遲時間到,或者堆中插入了另外一個延遲時間更小的任務(wù)
-
這里就可以看出Leader-Follower是怎么減少無意義的鎖競爭的,leader是身先士卒的將第一個任務(wù)攔在身上,讓自己的Follower可以進(jìn)行永久的睡眠(超時等待),只有l(wèi)eader拿到任務(wù)準(zhǔn)備執(zhí)行了,才會喚醒自己的Follower——太溫柔了,我哭死。下面我們看看leader喚醒Follower的代碼
上面展示了leader任務(wù)到時間后的代碼邏輯,可以看到leader任務(wù)到期后會設(shè)置leader為null(這象征了leader的交接,leader去執(zhí)行任務(wù)了,找一個follower做副手),然后如果堆中有任務(wù),那么喚醒一個follower,緊接著前l(fā)eader就可以執(zhí)行任務(wù)了
其實(shí)還存在另外一種case,那就是leader在awaitNanos的中途,存在另外一個更加緊急的任務(wù)被塞到堆中
可以看到這里的leader-follower模式,可以有效的減少鎖競爭,因?yàn)閘eader在拿到任務(wù)后會喚醒一個線程,從而讓follower可以await,而不是無意義的獲取DelayedWorkQueue的鎖,看有沒有任務(wù)需要執(zhí)行!
-
優(yōu)點(diǎn)
- 減少鎖競爭:通過減少同時嘗試獲取下一個到期任務(wù)的線程數(shù)量,降低了鎖競爭,從而提高了并發(fā)性能。
- 節(jié)省資源:避免多個線程在相同的時間點(diǎn)上喚醒,減少了因競爭而造成的資源浪費(fèi)。
- 更好的響應(yīng)性:由于 leader 線程是唯一等待到期任務(wù)的線程,因此它能夠快速響應(yīng)任務(wù)的到期并執(zhí)行它,而無需從多個等待線程中選擇一個來執(zhí)行任務(wù)。
-
缺點(diǎn)
- 潛在的延遲:如果 leader 線程因?yàn)槠渌虮蛔枞蛘邎?zhí)行緩慢,它可能會延遲其他任務(wù)的執(zhí)行,因?yàn)闆]有其他線程在等待那個特定的任務(wù)到期(比如leader倒霉的很久沒用獲得cpu時間片)。
- 復(fù)雜性增加:實(shí)現(xiàn) leader-follower 模式需要更多的邏輯來跟蹤和管理 leader 狀態(tài),這增加了代碼的復(fù)雜性。(代碼初看,完全看球不同)
- 故障點(diǎn):leader 線程可能成為單點(diǎn)故障。如果 leader 線程異常退出或被中斷,必須有機(jī)制來確保另一個線程能夠取代它成為新的 leader。(這里使用的finally關(guān)鍵字)
最后,在DelayQueue中也使用了leader-follower來進(jìn)行性能優(yōu)化
3.ScheduledThreadPoolExecutor優(yōu)缺點(diǎn)
-
優(yōu)點(diǎn)
- 任務(wù)調(diào)度: ScheduledThreadPoolExecutor 允許開發(fā)者調(diào)度一次性或重復(fù)執(zhí)行的任務(wù),這些任務(wù)可以基于固定的延遲或固定的頻率來運(yùn)行。
- 線程復(fù)用: 它維護(hù)了一個線程池,這樣線程就可以被復(fù)用來執(zhí)行多個任務(wù),避免了為每個任務(wù)創(chuàng)建新線程的開銷。
- 并發(fā)控制: 線程池提供了一個限制并發(fā)線程數(shù)量的機(jī)制,這有助于控制資源使用,提高系統(tǒng)穩(wěn)定性。
- 性能優(yōu)化: 使用內(nèi)部 DelayedWorkQueue 來管理延遲任務(wù),可以減少不必要的線程喚醒,從而提高性能。
- 任務(wù)同步: ScheduledThreadPoolExecutor 提供了一種機(jī)制來獲取任務(wù)的結(jié)果或取消任務(wù),通過返回的 ScheduledFuture對象可以控制任務(wù)的生命周期。
- 異常處理: 它提供了鉤子方法(如 afterExecute),可以用來處理任務(wù)執(zhí)行過程中未捕獲的異常。
-
缺點(diǎn)
-
資源限制: 如果任務(wù)執(zhí)行時間過長或者任務(wù)提交速度超過線程池的處理能力,那么線程池可能會飽和,導(dǎo)致性能下降或新任務(wù)被拒絕。
DelayedWorkQueue是*隊(duì)列,因此任務(wù)都會由核心線程執(zhí)行,大量提交的時候沒用辦法進(jìn)行線程的增加
-
存在大量定時任務(wù)提交的時候,性能較低:基于數(shù)組實(shí)現(xiàn)的堆,調(diào)整的時候需要logN的時間復(fù)雜度完成
-
四丶HashedWheelTimer 時間輪
1.引入
筆者學(xué)習(xí)HashedWheelTimer的時候,問chatgpt netty在哪里使用了時間輪,chatgpt說在IdleStateHandler(當(dāng)通道有一段時間未執(zhí)行讀取、寫入時,觸發(fā)IdleStateEvent,也就是空閑檢測機(jī)制),但是其實(shí)在netty的IdleStateHandler并不是使用HashedWheelTimer實(shí)現(xiàn)的空閑檢測,依舊是類似ScheduledThreadPoolExecutor的機(jī)制(內(nèi)部使用基于數(shù)組實(shí)現(xiàn)的堆)
筆者就質(zhì)問chagpt:"你放屁.jpg"
chatgpt承認(rèn)了錯誤,然后說它推薦這么做,因?yàn)镠ashedWheelTimer在處理大量延遲任務(wù)的時候性能優(yōu)于基于數(shù)組實(shí)現(xiàn)的堆。
下面我們就來學(xué)習(xí)為什么時間輪在處理大量延遲任務(wù)的時候性能優(yōu)于基于數(shù)組實(shí)現(xiàn)的堆。
2.時間輪算法
時間輪算法(Timewheel Algorithm)是一種用于管理定時任務(wù)的高效數(shù)據(jù)結(jié)構(gòu),它的核心思想是將時間劃分為一系列的槽(slots),每個槽代表時間輪上的一個基本時間單位。時間輪算法的主要作用是優(yōu)化計(jì)時器任務(wù)調(diào)度的性能,尤其是在處理大量短時任務(wù)時,相比于傳統(tǒng)的數(shù)據(jù)結(jié)構(gòu)(如最小堆),它能顯著降低任務(wù)調(diào)度的復(fù)雜度。
如下是時間輪的簡單示意,可以看到多個任務(wù)使用雙向鏈表進(jìn)行連接
還存在多層次的時間輪(模擬時針分針秒針)對于周期性很長的定時任務(wù),單層時間輪可能會導(dǎo)致槽的數(shù)量過多。為了解決這個問題,可以使用多層時間輪,即每個槽代表的時間跨度越來越大,較低層級代表短時間跨度,較高層級代表長時間跨度
從這里可以看出時間輪為什么在存在大量延遲任務(wù)的時候性能比堆更好: 時間輪的插入操作通常是常數(shù)時間復(fù)雜度(O(1)),因?yàn)樗ㄟ^計(jì)算定時任務(wù)的執(zhí)行時間與當(dāng)前時間的差值,將任務(wù)放入相應(yīng)的槽中,這個操作與定時任務(wù)的總數(shù)無關(guān)。 在堆結(jié)構(gòu)中,插入操作的時間復(fù)雜度是O(log N),其中N是堆中元素的數(shù)量。這是因?yàn)椴迦胄略睾螅枰ㄟ^上浮(或下沉)操作來維持堆的性質(zhì)
3.HashedWheelTimer基本結(jié)構(gòu)
-
時間輪(Wheel):
時間輪是一個固定大小的數(shù)組,數(shù)組中的每個元素都是一個槽(bucket)。
每個槽對應(yīng)一個時間間隔,這個間隔是時間輪的基本時間單位。
所有的槽合起來構(gòu)成了整個時間輪的范圍,例如,如果每個槽代表一個毫秒,那么一個大小為1024的時間輪可以表示1024毫秒的時間范圍。 -
槽(Bucket):每個槽是一個鏈表,用于存儲所有計(jì)劃在該槽時間到期的定時任務(wù)。
任務(wù)通過計(jì)算它們的延遲時間來確定應(yīng)該放入哪個槽中。
-
指針(Cursor or Hand):
時間輪中有一個指針,代表當(dāng)前的時間標(biāo)記。這個指針會周期性地移動到下一個槽,模擬時間的前進(jìn)。每次指針移動都會檢查相應(yīng)槽中的任務(wù),執(zhí)行到期的任務(wù)。
-
任務(wù)(TimerTask):
任務(wù)通常是實(shí)現(xiàn)了TimerTask接口的對象,其中包含了到期執(zhí)行的邏輯。
任務(wù)還包含了延遲時間和周期性信息,這些信息使得時間輪可以正確地調(diào)度每個任務(wù) -
工作線程(Worker Thread):
HashedWheelTimer通常包含一個工作線程,它負(fù)責(zé)推進(jìn)時間輪的指針,并處理到期的定時任務(wù)。
4.使用demo
public class HashedWheelTimerDemo {
public static void main(String[] args) {
// 創(chuàng)建HashedWheelTimer
HashedWheelTimer timer = new HashedWheelTimer();
// 提交一個延時任務(wù),將在3秒后執(zhí)行
TimerTask task1 = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("Task 1 executed after 3 seconds");
}
};
timer.newTimeout(task1, 3, TimeUnit.SECONDS);
// 提交一個周期性執(zhí)行的任務(wù),每5秒執(zhí)行一次
TimerTask task2 = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("Task 2 executed periodically every 5 seconds");
// 重新提交任務(wù),實(shí)現(xiàn)周期性執(zhí)行
timer.newTimeout(this, 5, TimeUnit.SECONDS);
}
};
timer.newTimeout(task2, 5, TimeUnit.SECONDS);
// 注意:在實(shí)際應(yīng)用中,不要忘記最終停止計(jì)時器,釋放資源
// timer.stop();
}
}
5.源碼
5.1 創(chuàng)建時間輪
HashedWheelTimer構(gòu)造方法參數(shù)有
- threadFactory:負(fù)責(zé)new一個thread,這個thread負(fù)責(zé)推動時鐘指針旋轉(zhuǎn)。
- taskExecutor:Executor負(fù)責(zé)任務(wù)到期后任務(wù)的執(zhí)行
- tickDuration 和 timeUnit 定義了一格的時間長度,默認(rèn)的就是 100ms。
- ticksPerWheel 定義了一圈有多少格,默認(rèn)的就是 512;
- leakDetection:用于追蹤內(nèi)存泄漏。
- maxPendingTimeouts:最大允許等待的 Timeout 實(shí)例數(shù),也就是我們可以設(shè)置不允許太多的任務(wù)等待。如果未執(zhí)行任務(wù)數(shù)達(dá)到閾值,那么再次提交任務(wù)會拋出RejectedExecutionException 異常。默認(rèn)不限制。
構(gòu)造方法主要的工作:
-
創(chuàng)建HashedWheelBucket數(shù)組
每一個元素都是一個雙向鏈表,鏈表中的元素是HashedWheelTimeout
默認(rèn)情況下HashedWheelTimer中有512個這樣的元素
-
創(chuàng)建workerThread,此Thread負(fù)責(zé)推動時鐘的旋轉(zhuǎn),但是并沒用啟動該線程,當(dāng)?shù)谝粋€提交任務(wù)的時候會進(jìn)行workerThread線程的啟動
5.2 提交延時任務(wù)到HashedWheelTimer
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 統(tǒng)計(jì)等待的任務(wù)數(shù)量
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 大于閾值,拋出異常
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 啟動workerThread ,只啟動一次
start();
// 計(jì)算任務(wù)ddl
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// new一個Timeout 加入到timeouts
// timeouts 是PlatformDependent.newMpscQueue()————多生產(chǎn),單消費(fèi)者的阻塞隊(duì)列
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
其中workerThread的啟動如下
至此我們直到延時任務(wù)被加入到timeouts,timeouts是一個mpsc隊(duì)列,之所以使用mpsc,是因?yàn)榭赡艽嬖诙鄠€生產(chǎn)者提交任務(wù),但是消費(fèi)任務(wù)的只有workerThread,mpsc在這種場景下性能更好。
那么workerThread的工作邏輯是什么昵
5.3 workerThread工作
-
waitForNextTick類似于模擬時鐘上指針的走動,依賴Thread#sleep
-
當(dāng)?shù)较乱粋€刻度的時候,會先處理下取消的任務(wù),其實(shí)就是對應(yīng)bucket中刪除(雙向鏈表的刪除)
-
然后將mpsc隊(duì)列中的任務(wù)都放到buckets中去
這里使用了mpsc主要是考慮如果沒加一個任務(wù)都直接放到時間輪,那么鎖競爭太激烈了,可能會導(dǎo)致?lián)屾i阻塞了一段時間導(dǎo)致任務(wù)超時。有點(diǎn)消息隊(duì)列削峰的意思。
-
接下來就是找到當(dāng)tick對應(yīng)的bucket的,然后執(zhí)行這個bucket中所有需要執(zhí)行的任務(wù)
可以看到其實(shí)就是遍歷雙向鏈表,找到需要執(zhí)行任務(wù),任務(wù)的執(zhí)行調(diào)用expire方法,邏輯如下:
直接交給線程池執(zhí)行,之前之前還會嘗試修改狀態(tài),這里其實(shí)和用戶取消任務(wù)由競爭關(guān)系,也就是說如果任務(wù)提交到線程池,那么取消也無濟(jì)于事了。
6.品一品優(yōu)秀的設(shè)計(jì)
筆者認(rèn)為這里優(yōu)秀的設(shè)計(jì)主要是在于MPSC的應(yīng)用
- 線程安全: HashedWheelTimer通常由一個工作線程來管理時間輪的推進(jìn)和執(zhí)行任務(wù)。如果允許多個線程直接在時間輪的桶(bucket)中添加任務(wù),就必須處理并發(fā)修改的問題,這將大大增加復(fù)雜性和性能開銷。MPSC隊(duì)列允許多個生產(chǎn)者線程安全地添加任務(wù),而消費(fèi)者線程(也就是HashedWheelTimer的工作線程)則負(fù)責(zé)將這些任務(wù)從隊(duì)列中取出并放入正確的時間槽中。
- 性能優(yōu)化: 使用MPSC隊(duì)列可以減少鎖的競爭,從而提高性能。由于任務(wù)首先被放入隊(duì)列中,工作線程可以在合適的時間批量處理這些任務(wù),這減少了對時間輪數(shù)據(jù)結(jié)構(gòu)的頻繁鎖定和同步操作。
7.時間輪的優(yōu)點(diǎn)和缺點(diǎn)
7.1優(yōu)點(diǎn)
- 高效的插入和過期檢查: 添加新任務(wù)到時間輪的操作是常數(shù)時間復(fù)雜度(O(1)),而檢查過期任務(wù)也是常數(shù)時間復(fù)雜度,因?yàn)橹恍枰獧z查當(dāng)前槽位的任務(wù)列表。
- 可配置的時間粒度: 時間輪的槽數(shù)量(時間粒度)是可配置的,可以根據(jù)應(yīng)用程序的需要調(diào)整定時器的精度和資源消耗。
- 處理大量定時任務(wù): HashedWheelTimer尤其適合于需要處理大量定時任務(wù)的場景,例如網(wǎng)絡(luò)應(yīng)用中的超時監(jiān)測。
7.2缺點(diǎn)
- 有限的時間精度: 由于時間輪是以固定的時間間隔來劃分的,所以它的時間精度受到槽數(shù)量和槽間隔的限制,不能提供非常高精度的定時(如毫秒級以下)。這是小根堆優(yōu)于時間輪的地方
- 槽位溢出: 單個槽位可能會有多個任務(wù)同時過期,如果過期任務(wù)的數(shù)量非常大,可能會導(dǎo)致任務(wù)處理的延遲。這里netty使用線程去執(zhí)行任務(wù),但是線程池可能存在沒用可用線程帶來的延遲
- 系統(tǒng)負(fù)載敏感: 當(dāng)系統(tǒng)負(fù)載較高時,定時器的準(zhǔn)確性可能會降低,因?yàn)镠ashedWheelTimer的工作線程可能無法準(zhǔn)確地按照預(yù)定的時間間隔推進(jìn)時間輪。
- 任務(wù)延遲執(zhí)行: 如果任務(wù)在其預(yù)定的執(zhí)行時間點(diǎn)添加到時間輪,可能會出現(xiàn)任務(wù)執(zhí)行時間稍微延后的情況,因?yàn)闀热組PSC然后等下一個tick才被放到bucket然后才能被執(zhí)行。
在選擇使用HashedWheelTimer時,需要根據(jù)應(yīng)用場景的具體需求權(quán)衡這些優(yōu)缺點(diǎn)。對于需要處理大量網(wǎng)絡(luò)超時檢測的場景,HashedWheelTimer常常是一個合適的選擇。然而,如果應(yīng)用程序需要高度精確的定時器,或者對任務(wù)執(zhí)行的實(shí)時性有嚴(yán)格的要求,可能需要考慮ScheduledThreadPoolExecutor(Timer就是個垃圾doge)。
五丶思考
ScheduledThreadPoolExecutor和HashedWheelTimer 各有優(yōu)劣,需要根據(jù)使用場景進(jìn)行權(quán)衡
- 關(guān)注任務(wù)調(diào)度的及時性:選擇ScheduledThreadPoolExecutor
- 存在大量調(diào)度任務(wù):選擇HashedWheelTimer
二者的特性又是由其底層數(shù)據(jù)結(jié)構(gòu)決定
- 為了維持小根堆的特性,每次向ScheduledThreadPoolExecutor中新增任務(wù)都需要進(jìn)行調(diào)整,在存在大量任務(wù)的時候,這個調(diào)整的開銷maybe很大(都是內(nèi)存操作,感覺應(yīng)該還好)
- 為了讓任務(wù)的新增時間復(fù)雜度是o(1),HashedWheelTimer 利用hash和數(shù)組o(1)的尋址能力,但是也是因?yàn)閿?shù)組的設(shè)計(jì),導(dǎo)致任務(wù)的執(zhí)行需要依賴workerThread每隔一個tick進(jìn)行調(diào)度,喪失了一點(diǎn)任務(wù)執(zhí)行的及時性
這一篇最大的收獲還是ScheduleThreadPoolExecutor中使用的leader-follower模式,以及HashedWheelTimer中mpsc 運(yùn)用,二者都是在減少無意義的鎖競爭!
總結(jié)
以上是生活随笔為你收集整理的Netty源码学习9——从Timer到ScheduledThreadPoolExecutor到HashedWheelTimer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 财会类期刊发表论文收费标准
- 下一篇: Golang实现JAVA虚拟机-运行时数