Netty时间轮调度原理分析,再不了解你就out啦
一、時間輪介紹
之前公司內部搭建的延遲隊列服務有用到時間輪,但是一直沒有了解過它的實現原理。
最近有個和支付寶對接的項目,支付寶接口有流量控制,一定的時間內只允許 N 次接口調用,針對一些業務我們需要頻繁調用支付寶開放平臺接口,如果不對請求做限制,很容易觸發流控告警。
為了避免這個問題,我們按照一定延遲規則將任務加載進時間輪內,通過時間輪的調度來實現接口異步調用。
很多開源框架都實現了時間輪算法,這里以 Netty 為例,看下 Netty 中時間輪是怎么實現的。
1.1 快速入門
下面是一個 API 使用例子。
public class WheelTimerSamples {private static final HashedWheelTimerInstance INSTANCE = HashedWheelTimerInstance.INSTANCE;public static void main(String[] args) throws IOException {INSTANCE.getWheelTimer().newTimeout(new PrintTimerTask(), 3, TimeUnit.SECONDS);System.in.read();}static class PrintTimerTask implements TimerTask {@Overridepublic void run(Timeout timeout) {System.out.println("Hello world");}}enum HashedWheelTimerInstance {INSTANCE;private final HashedWheelTimer wheelTimer;HashedWheelTimerInstance() {wheelTimer = new HashedWheelTimer(r -> {Thread t = new Thread(r);t.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName() + e.getMessage()));t.setName("-HashedTimerWheelInstance-");return t;}, 100, TimeUnit.MILLISECONDS, 64);}public HashedWheelTimer getWheelTimer() {return wheelTimer;}} }上面的例子中我們自定義了一個 HashedWheelTimer,然后自定義了一個 TimerTask,將一個任務加載進時間輪,3s 后執行這個任務,怎么樣是不是很簡單。
在定義時間輪時建議按照業務類型進行區分,將時間輪定義為多個單例對象。
PS:因為時間輪是異步執行的,在任務執行之前 JVM 不能退出,所以 System.in.read(); 這一行代碼不能刪除。
1.2 原理圖解
二、原理分析
2.1 時間輪狀態
時間輪有以下三種狀態:
- WORKER_STATE_INIT:初始化狀態,此時時間輪內的工作線程還沒有開啟
- WORKER_STATE_STARTED:運行狀態,時間輪內的工作線程已經開啟
- WORKER_STATE_SHUTDOWN:終止狀態,時間輪停止工作
狀態轉換如下,轉換原理會在下面講到:
2.2 構造函數
public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// 初始化時間輪數組,時間輪大小為大于等于 ticksPerWheel 的第一個 2 的冪,和 HashMap 類似wheel = createWheel(ticksPerWheel);// 取模用,用來定位數組中的槽mask = wheel.length - 1;// 為了保證精度,時間輪內的時間單位為納秒long duration = unit.toNanos(tickDuration);// 時間輪內的時鐘撥動頻率不宜太大也不宜太小if (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}if (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}// 創建工作線程workerThread = threadFactory.newThread(worker);// 非守護線程且 leakDetection 為 true 時檢測內存是否泄漏leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;// 初始化最大等待任務數this.maxPendingTimeouts = maxPendingTimeouts;// 如果創建的時間輪實例大于 64,打印日志,并且這個日志只會打印一次if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}構造函數中的參數相當重要,當自定義時間輪時,我們應該根據業務的范圍設置合理的參數:
- threadFactory:創建時間輪任務線程的工廠,通過這個工廠可以給我們的線程自定義一些屬性(線程名、異常處理等)
- tickDuration:時鐘多長時間撥動一次,值越小,時間輪精度越高
- unit:tickDuration 的單位
- ticksPerWheel:時間輪數組大小
- leakDetection:是否檢測內存泄漏
- maxPendingTimeouts:時間輪內最大等待的任務數
時間輪的時鐘撥動時長應該根據業務設置恰當的值,如果設置的過大,可能導致任務觸發時間不準確。如果設置的過小,時間輪轉動頻繁,任務少的情況下加載不到任務,屬于一直空轉的狀態,會占用 CPU 線程資源。
為了防止時間輪占用過多的 CPU 資源,當創建的時間輪對象大于 64 時會以日志的方式提示。
構造函數中只是初始化了輪線程,并沒有開啟,當第一次往時間輪內添加任務時,線程才會開啟。
2.3 往時間輪內添加任務
@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}// 等待的任務數 +1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();// 如果時間輪內等待的任務數大于最大值,任務會被拋棄if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 開啟時間輪內的線程start();// 計算當前添加任務的執行時間long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}// 將任務加入隊列HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}任務會先保存在隊列中,當時間輪的時鐘撥動時才會判斷是否將隊列中的任務加載進時間輪。
public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:// 這里存在并發,通過 CAS 操作保證最終只有一個線程能開啟時間輪的工作線程if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}while (startTime == 0) {try {// startTimeInitialized 是一個 CountDownLatch,目的是為了保證工作線程的 startTime 屬性初始化startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}這里通過 CAS 加鎖的方式保證線程安全,避免多次開啟。
工作線程開啟后,start() 方法會被阻塞,等工作線程的 startTime 屬性初始化完成后才被喚醒。為什么只有等 startTime 初始化后才能繼續執行呢?因為上面的 newTimeout 方法在線程開啟后,需要計算當前添加進來任務的執行時間,而這個執行時間是根據 startTime 計算的。
2.4 時間輪調度
@Overridepublic void run() {// 初始化 startTime.startTime = System.nanoTime();if (startTime == 0) {startTime = 1;}// 用來喚醒被阻塞的 HashedWheelTimer#start() 方法,保證 startTime 初始化startTimeInitialized.countDown();do {// 時鐘撥動final long deadline = waitForNextTick();if (deadline > 0) {int idx = (int) (tick & mask);// 處理過期的任務processCancelledTasks();HashedWheelBucket bucket =wheel[idx];// 將任務加載進時間輪transferTimeoutsToBuckets();// 執行當前時間輪槽內的任務bucket.expireTimeouts(deadline);tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// 時間輪關閉,將還未執行的任務以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去// 還未執行的任務可能會在兩個地方,一:時間輪數組內,二:隊列中for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 處理過期的任務processCancelledTasks();}時間輪每撥動一次 tick 就會 +1,根據這個值與(時間輪數組長度 - 1)進行 & 運算,可以定位時間輪數組內的槽。因為 tick 值一直在增加,所以時間輪數組看起來就像一個不斷循環的圓。
- 先初始化 startTime 值,因為后面任務執行的時間是根據 startTime 計算的
- 時鐘撥動,如果時間未到,則 sleep 一會兒
- 處理過期的任務
- 將任務加載進時間輪
- 執行當前時鐘對應時間輪內的任務
- 時間輪關閉,將所有未執行的任務封裝到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
- 處理過期的任務
上面簡單羅列了下 run 方法的大概執行步驟,下面是具體方法的分析。
2.5 時鐘撥動
如果時間輪設置的 tickDuration 為 100ms 撥動一次,當時鐘撥動一次后,應該計算下一次時鐘撥動的時間,如果還沒到就 sleep 一會兒,等到撥動時間再醒來。
private long waitForNextTick() {// 計算時鐘下次撥動的相對時間long deadline = tickDuration * (tick + 1);for (;;) {// 獲取當前時間的相對時間final long currentTime = System.nanoTime() - startTime;// 計算距離時鐘下次撥動的時間// 這里之所以加 999999 后再除 10000000, 是為了保證足夠的 sleep 時間// 例如:當 deadline - currentTime = 2000002 的時候,如果不加 999999,則只睡了 2ms// 而 2ms 其實是未到達 deadline 時間點的,所以為了使上述情況能 sleep 足夠的時間,加上 999999 后,會多睡 1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// <=0 說明可以撥動時鐘了if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// 這里是為了兼容 Windows 平臺,因為 Windows 平臺的調度最小單位為 10ms,如果不是 10ms 的倍數,可能會引起 sleep 時間不準確// See https://github.com/Netty/Netty/issues/356if (PlatformDependent.isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {// sleep 到下次時鐘撥動Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}如果時間不到就 sleep 等待一會兒,為了使任務時鐘準確,可以從上面的代碼中看出 Netty 做了一些優化,比如 sleepTimeMs 的計算,Windows 平臺的處理等。
2.6 將任務從隊列加載進時間輪
private void transferTimeoutsToBuckets() {// 一次最多只處理隊列中的 100000 個任務for (int i = 0; i < 100000; i++) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}// 過濾已經取消的任務if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {continue;}// 計算當前任務到執行還需要經過幾次時鐘撥動// 假設時間輪數組大小是 10,calculated 為 12,需要時間輪轉動一圈加兩次時鐘撥動后后才能執行這個任務,因此還需要計算一下圈數long calculated = timeout.deadline / tickDuration;// 計算當前任務到執行還需要經過幾圈時鐘撥動timeout.remainingRounds = (calculated - tick) / wheel.length;// 有的任務可能在隊列里很長時間,時間過期了也沒有被調度,將這種情況的任務放在當前輪次內執行final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.// 計算任務在時間輪數組中的槽int stopIndex = (int) (ticks & mask);HashedWheelBucket bucket = wheel[stopIndex];// 將任務放到時間輪的數組中,多個任務可能定位時間輪的同一個槽,這些任務通過以鏈表的形式鏈接bucket.addTimeout(timeout);}}void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;// 任務構成雙向鏈表timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}在上面也提到過,任務剛加進來不會立即到時間輪中去,而是暫時保存到一個隊列中,當時間輪時鐘撥動時,會將任務從隊列中加載進時間輪內。
時間輪每次最大處理 100000 個任務,因為任務的執行時間是用戶自定義的,所以需要計算任務到執行需要經過多少次時鐘撥動,并計算時間輪撥動的圈數。接著將任務加載進時間輪對應的槽內,可能有多個任務經過 hash 計算后定位到同一個槽,這些任務會以雙向鏈表的結構保存,有點類似 HashMap 處理碰撞的情況。
2.7 執行任務
public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;while (timeout != null) {HashedWheelTimeout next = timeout.next;// 任務執行的圈數 > 0,表示任務還需要經過 remainingRounds 圈時鐘循環才能執行if (timeout.remainingRounds <= 0) {// 從鏈表中移除當前任務,并返回鏈表中下一個任務next = remove(timeout);if (timeout.deadline <= deadline) {// 執行任務timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {// 過濾取消的任務next = remove(timeout);} else {// 圈數 -1timeout.remainingRounds --;}timeout = next;}}public void expire() {// 任務狀態校驗if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}}時間輪槽內的任務以鏈表形式存儲,這些任務執行的時間可能會不一樣,有的在當前時鐘執行,有的在下一圈或者下兩圈對應的時鐘執行。當任務在當前時鐘執行時,需要將這個任務從鏈表中刪除,重新維護鏈表關系。
2.8 終止時間輪
@Overridepublic Set<Timeout> stop() {// 終止時間輪的線程不能是時間輪的工作線程if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() +".stop() cannot be called from " +TimerTask.class.getSimpleName());}// 將時間輪的狀態修改為 WORKER_STATE_SHUTDOWN,這里有兩種情況// 一:時間輪是 WORKER_STATE_INIT 狀態,表明時間輪從創建到終止一直沒有任務進來// 二:時間輪是 WORKER_STATE_STARTED 狀態,多個線程嘗試終止時間輪,只有一個操作成功if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {// 代碼走到這里,時間輪只能是兩種狀態中的一個,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN// 為 WORKER_STATE_INIT 表示時間輪沒有任務,因此不用返回未處理的任務,但是需要將時間輪實例 -1// 為 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失敗,什么都不用做,因為 CAS 成功的線程會處理if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {// 時間輪實例對象 -1INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// CAS 操作失敗,或者時間輪沒有處理過任務,返回空的任務列表return Collections.emptySet();}try {boolean interrupted = false;while (workerThread.isAlive()) {// 中斷時間輪工作線程workerThread.interrupt();try {// 終止時間輪的線程等待時間輪工作線程 100ms,這個過程主要是為了時間輪工作線程處理未執行的任務workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}// 返回未處理的任務return worker.unprocessedTimeouts();}當終止時間輪時,時間輪狀態有兩種情況:
- WORKER_STATE_INIT:時間輪初始化,前面我們說過,當初始化時間輪對象時并不會立即開啟時間輪工作線程,而是第一次添加任務時才開啟,為 WORKER_STATE_INIT 表示時間輪沒有處理過任務
- WORKER_STATE_STARTED:時間輪在工作,這里也有兩種情況,存在并發與不存在并發,如果多個線程都嘗試終止時間輪,肯定只能有一個成功
時間輪停止運行后會將未執行的任務返回出去,至于怎么處理這些任務,由業務方自己定義,這個流程和線程池的 shutdownNow 方法是類似的。
如果時間輪在運行,怎么才能獲取到未執行的任務呢,答案就在上面的 run() 方法中,如果時間輪處于非運行狀態,會把時間輪數組與隊列中未執行且未取消的任務保存到 unprocessedTimeouts 集合中。而終止時間輪成功的線程只需要等待一會兒即可,這個等待是通過 workerThread.join(100); 實現的。
取消時間輪內的任務相對比較簡單,這里就不概述了,想要了解的自行查看即可。
上面就是時間輪運行的基本原理了。
三、總結
這里以問答的形式進行總結,大家也可以看下這些問題,自己能不能很好的回答出來?
3.1 時間輪是不是在初始化完成后就啟動了?
不是,初始化完成時間輪的狀態是 WORKER_STATE_INIT,此時時間輪內的工作線程還沒有運行,只有第一次往時間輪內添加任務時,才會開啟時間輪內的工作線程。時間輪線程開啟后會初始化 startTime,任務的執行時間會根據這個字段計算,而且時間輪中時間的概念是相對的。
3.2 如果時間輪內還有任務未執行,服務重啟了怎么辦?
時間輪內的任務都在內存中,服務重啟數據肯定都丟了,所以當服務重啟時需要業務方自己做兼容處理。
3.3 如何自定義合適的時間輪參數?
自定義時間輪時有兩個比較重要的參數需要我們注意:
- tickDuration:時鐘撥動頻率,假設一個任務在 10s 后執行,tickDuration 設置為 3min 那肯定是不行的,tickDuration 值越小,任務觸發的精度越高,但是沒有任務時,工作線程會一直自旋嘗試從隊列中拿任務,比較消耗 CPU 資源
- ticksPerWheel:時間輪數組大小,假設當時間輪時鐘撥動時,有 10000 個任務處理,但是我們定義時間輪數組的大小為 8,這時平均一個時間輪槽內有 1250 個任務,如果這 1250 個任務都在當前時鐘執行,任務執行是同步的,由于每個任務執行都會消耗時間,可能會導致后面的任務觸發時間不準確。反之如果數組長度設置的過大,任務比較少的情況下,時間輪數組很多槽都是空的
所以當使用自定義時間輪時,一定要評估自己的業務后再設置參數。
3.4 Netty 的時間輪有什么缺陷?
Netty 中的時間輪是通過單線程實現的,如果在執行任務的過程中出現阻塞,會影響后面任務執行。除此之外,Netty 中的時間輪并不適合創建延遲時間跨度很大的任務,比如往時間輪內丟成百上千個任務并設置 10 天后執行,這樣可能會導致鏈表過長 round 值很大,而且這些任務在執行之前會一直占用內存。
3.5 時間輪要設置成單例的嗎?
強烈建議按照業務模塊區分,每個模塊都創建一個單例的時間輪對象。在上面的代碼中我們看到了,當時間輪對象大于 64 時會以日志的形式提示。如果時間輪是非單例對象,那時間輪算法完全就失去了作用。
3.6 時間輪與 ScheduledExecutorService 的區別?
ScheduledExecutorService 中的任務維護了一個堆,當有大量任務時,需要調整堆結構導致性能下降,而時間輪通過時鐘調度,可以不受任務量的限制。
當任務量比較少時時間輪會一直自旋空轉撥動時鐘,相比 ScheduledExecutorService 會占用一定 CPU 資源。
參考
netty源碼解讀之時間輪算法實現-HashedWheelTimer
HashedWheelTimer 使用及源碼分析創建
定時器的幾種實現方式
總結
以上是生活随笔為你收集整理的Netty时间轮调度原理分析,再不了解你就out啦的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 营业外收支净额怎么计算
- 下一篇: 业务总结004:检验项目时间轮实践与库存