Redis之时间轮机制(五)
🚀 優(yōu)質(zhì)資源分享 🚀
| 🧡 Python實(shí)戰(zhàn)微信訂餐小程序 🧡 | 進(jìn)階級(jí) | 本課程是python flask+微信小程序的完美結(jié)合,從項(xiàng)目搭建到騰訊云部署上線,打造一個(gè)全棧訂餐系統(tǒng)。 |
| 💛Python量化交易實(shí)戰(zhàn)💛 | 入門級(jí) | 手把手帶你打造一個(gè)易擴(kuò)展、更安全、效率更高的量化交易系統(tǒng) |
一、什么是時(shí)間輪
時(shí)間輪這個(gè)技術(shù)其實(shí)出來很久了,在kafka、zookeeper等技術(shù)中都有時(shí)間輪使用的方式。 時(shí)間輪是一種高效利用線程資源進(jìn)行批量化調(diào)度的一種調(diào)度模型。把大批量的調(diào)度任務(wù)全部綁定到同一個(gè)調(diào)度器上,使用這一個(gè)調(diào)度器來進(jìn)行所有任務(wù)的管理、觸發(fā)、以及運(yùn)行。所以時(shí)間輪的模型能夠高效管理各種延時(shí)任務(wù)、周期任務(wù)、通知任務(wù)。 以后大家在工作中遇到類似的功能,可以采用時(shí)間輪機(jī)制。如下圖所示,時(shí)間輪,從圖片上來看,就和手表的表圈是一樣,所以稱為時(shí)間輪,是因?yàn)樗且詴r(shí)間作為刻度組成的一個(gè)環(huán)形隊(duì)列,這個(gè)環(huán)形隊(duì)列采用數(shù)組來實(shí)現(xiàn),數(shù)組的每個(gè)元素稱為槽,每個(gè)槽可以放一個(gè)定時(shí)任務(wù)列表,叫HashedWheelBucket,它是一個(gè)雙向鏈表,量表的每一項(xiàng)表示一個(gè)定時(shí)任務(wù)項(xiàng)(HashedWhellTimeout),其中封裝了真正的定時(shí)任務(wù)TimerTask。時(shí)間輪是由多個(gè)時(shí)間格組成,下圖中有8個(gè)時(shí)間格,每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度(tickDuration),其中時(shí)間輪的時(shí)間格的個(gè)數(shù)是固定的。在下圖中,有8個(gè)時(shí)間格(槽),假設(shè)每個(gè)時(shí)間格的單位為1s,那么整個(gè)時(shí)間輪走完一圈需要8s鐘。每秒鐘指針會(huì)沿著順時(shí)針方向移動(dòng)一個(gè),這個(gè)單位可以設(shè)置,比如以秒為單位,可以以一小時(shí)為單位,這個(gè)單位可以代表時(shí)間精度。通過指針移動(dòng),來獲得每個(gè)時(shí)間格中的任務(wù)列表,然后遍歷這一個(gè)時(shí)間格中的雙向鏈表來執(zhí)行任務(wù),以此循環(huán)。
二、時(shí)間輪案例使用
這里使用的時(shí)間輪是Netty這個(gè)包中提供的,使用方法比較簡(jiǎn)單。
- 先構(gòu)建一個(gè)HashedWheelTimer時(shí)間輪。
- tickDuration: 100 ,表示每個(gè)時(shí)間格代表當(dāng)前時(shí)間輪的基本時(shí)間跨度,這里是100ms,也就是指針100ms跳動(dòng)一次,每次跳動(dòng)一個(gè)窗格
- ticksPerWheel:1024,表示時(shí)間輪上一共有多少個(gè)窗格,分配的窗格越多,占用內(nèi)存空間就越大
- leakDetection:是否開啟內(nèi)存泄漏檢測(cè)。
- maxPendingTimeouts[可選參數(shù)],最大允許等待的任務(wù)數(shù),默認(rèn)沒有限制。
- 通過newTimeout()把需要延遲執(zhí)行的任務(wù)添加到時(shí)間輪中
三、時(shí)間輪的原理解析
時(shí)間輪的整體原理,分為幾個(gè)部分。
創(chuàng)建時(shí)間輪
時(shí)間輪本質(zhì)上是一個(gè)環(huán)狀數(shù)組,比如我們初始化時(shí)間輪時(shí):ticksPerWheel=8,那么意味著這個(gè)環(huán)狀數(shù)組的長(zhǎng)度是8,如圖3-12所示。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];添加任務(wù)
- 當(dāng)通過newTimeout()方法添加一個(gè)延遲任務(wù)時(shí),該任務(wù)首先會(huì)加入到一個(gè)阻塞隊(duì)列中中。然后會(huì)有一個(gè)定時(shí)任務(wù)從該隊(duì)列獲取任務(wù),添加到時(shí)間輪的指定位置,計(jì)算方法如下。
任務(wù)執(zhí)行
- Worker線程按照每次間隔時(shí)間轉(zhuǎn)動(dòng)后,得到該時(shí)間窗格中的任務(wù)鏈表,然后從鏈表的head開始逐個(gè)取出任務(wù),有兩個(gè)判斷條件
- 當(dāng)前任務(wù)需要轉(zhuǎn)動(dòng)的圈數(shù)為0,表示任務(wù)是當(dāng)前圈開始執(zhí)行
- 當(dāng)前任務(wù)達(dá)到了delay時(shí)間,也就是timeout.deadline <= deadline
- 最終調(diào)用timeout.expire()方法執(zhí)行任務(wù)。
四、時(shí)間輪的源碼分析
HashedWheelTimer的構(gòu)造
- 調(diào)用 createWheel 創(chuàng)建一個(gè)時(shí)間輪,時(shí)間輪數(shù)組一定是 2 的冪次方,比如傳入的 ticksPerWheel=6,那么初始化的 wheel 長(zhǎng)度一定是 8,這樣是便于時(shí)間格的計(jì)算。
- tickDuration,表示時(shí)間輪的跨度,代表每個(gè)時(shí)間格的時(shí)間精度,以納秒的方式來表現(xiàn)。
- 把工作線程 Worker 封裝成 WorkerThread,從名字可以知道,它就是最終那個(gè)負(fù)責(zé)干活的線程。
- 對(duì)傳入的 ticksPerWheel 進(jìn)行整形
- 初始化固定長(zhǎng)度的 HashedWheelBucket
添加任務(wù)到時(shí)間輪
調(diào)用 newTimeout 方法,把任務(wù)添加進(jìn)來。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}//統(tǒng)計(jì)任務(wù)個(gè)數(shù)long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//判斷最大任務(wù)數(shù)量是否超過限制if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}//如果時(shí)間輪沒有啟動(dòng),則通過start方法進(jìn)行啟動(dòng)start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.//計(jì)算任務(wù)的延遲時(shí)間,通過當(dāng)前的時(shí)間+當(dāng)前任務(wù)執(zhí)行的延遲時(shí)間-時(shí)間輪啟動(dòng)的時(shí)間。long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;//在delay為正數(shù)的情況下,deadline是不可能為負(fù)數(shù)//如果為負(fù)數(shù),那么說明超過了long的最大值if (delay > 0 && deadline < 0) {deadline = Long.MAX\_VALUE;}//創(chuàng)建一個(gè)Timeout任務(wù),理論上來說,這個(gè)任務(wù)應(yīng)該要加入到時(shí)間輪的時(shí)間格子中,但是這里并不是先添加到時(shí)間格,而是先//加入到一個(gè)阻塞隊(duì)列,然后等到時(shí)間輪執(zhí)行到下一個(gè)格子時(shí),再從隊(duì)列中取出最多100000個(gè)任務(wù)添加到指定的時(shí)間格(槽)中。HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout; }start
任務(wù)添加到阻塞隊(duì)列之后,我們?cè)賮砜磫?dòng)方法。
start 方法會(huì)根據(jù)當(dāng)前的 workerState 狀態(tài)來啟動(dòng)時(shí)間輪。并且用了 startTimeInitialized 來控制線程的運(yùn)行,如果 workerThread 沒有啟動(dòng)起來,那么 newTimeout 方法會(huì)一直阻塞在運(yùn)行 start 方法中。如果不阻塞,newTimeout 方法會(huì)獲取不到 startTime。
public void start() {//workerState一開始的時(shí)候是0(WORKER\_STATE\_INIT),然后才會(huì)設(shè)置為1(WORKER\_STATE\_STARTED)switch (WORKER\_STATE\_UPDATER.get(this)) {case WORKER\_STATE\_INIT:if (WORKER\_STATE\_UPDATER.compareAndSet(this, WORKER\_STATE\_INIT, WORKER\_STATE\_STARTED)) {workerThread.start();}break;case WORKER\_STATE\_STARTED:break;case WORKER\_STATE\_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// 等待worker線程初始化時(shí)間輪的啟動(dòng)時(shí)間while (startTime == 0) {try {//這里使用countDownLauch來確保調(diào)度的線程已經(jīng)被啟動(dòng)startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}} }啟動(dòng)時(shí)間輪
調(diào)用 start()方法, 會(huì)調(diào)用?workerThread.start();?來啟動(dòng)一個(gè)工作線程,這個(gè)工作線程是在構(gòu)造方法中初始化的,包裝的是一個(gè) Worker 內(nèi)部線程類。
所以直接進(jìn)入到 Worker 這個(gè)類的 run 方法,了解下它的設(shè)計(jì)邏輯:
public void run() {// 初始化startTime,表示時(shí)間輪的啟動(dòng)時(shí)間startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;} // 喚醒被阻塞的start()方法。startTimeInitialized.countDown();do {//返回每tick一次的時(shí)間間隔final long deadline = waitForNextTick();if (deadline > 0) {//計(jì)算時(shí)間輪的槽位int idx = (int) (tick & mask);//移除掉CancelledTaskprocessCancelledTasks();//得到當(dāng)前指針位置的時(shí)間槽HashedWheelBucket bucket =wheel[idx];//將newTimeout()方法中加入到待處理定時(shí)任務(wù)隊(duì)列中的任務(wù)加入到指定的格子中transferTimeoutsToBuckets();//運(yùn)行目前指針指向的槽中的bucket鏈表中的任務(wù)bucket.expireTimeouts(deadline);tick++;}} while (WORKER\_STATE\_UPDATER.get(HashedWheelTimer.this) == WORKER\_STATE\_STARTED);//如果Worker\_State一只是started狀態(tài),就一直循環(huán)// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket : wheel) {bucket.clearTimeouts(unprocessedTimeouts); //清除時(shí)間輪中不需要處理的任務(wù)}for (; ; ) {//遍歷任務(wù)隊(duì)列,發(fā)現(xiàn)如果有任務(wù)被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊(duì)列中。HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}//處理被取消的任務(wù).processCancelledTasks(); }時(shí)間輪指針跳動(dòng)
這個(gè)方法的主要作用就是返回下一個(gè)指針指向的時(shí)間間隔,然后進(jìn)行 sleep 操作。
大家可以想象一下,一個(gè)鐘表上秒與秒之間是有時(shí)間間隔的,那么 waitForNextTick 就是根據(jù)當(dāng)前時(shí)間計(jì)算出跳動(dòng)到下個(gè)時(shí)間的時(shí)間間隔,然后進(jìn)行 sleep,然后再返回當(dāng)前時(shí)間距離時(shí)間輪啟動(dòng)時(shí)間的時(shí)間間隔。
說得再直白一點(diǎn):,假設(shè)當(dāng)前的 tickDuration 的間隔是 1s,tick 默認(rèn) = 0, 此時(shí)第一次進(jìn)來,得到的 deadline=1,也就是下一次跳動(dòng)的時(shí)間間隔是 1s。假設(shè)當(dāng)前處于:
private long waitForNextTick() {//tick表示總的tick數(shù)//tickDuration表示每個(gè)時(shí)間格的跨度,所以deadline返回的是下一次時(shí)間輪指針跳動(dòng)的時(shí)間long deadline = tickDuration * (tick + 1);for (; ; ) {//計(jì)算當(dāng)前時(shí)間距離啟動(dòng)時(shí)間的時(shí)間間隔final long currentTime = System.nanoTime() - startTime;//通過下一次指針跳動(dòng)的延遲時(shí)間距離當(dāng)前時(shí)間的差額,這個(gè)作為sleep時(shí)間使用。// 其實(shí)線程是以睡眠一定的時(shí)候再來執(zhí)行下一個(gè)ticket的任務(wù)的long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//sleepTimeMs小于零表示走到了下一個(gè)時(shí)間槽位置if (sleepTimeMs <= 0) {if (currentTime == Long.MIN\_VALUE) {return -Long.MAX\_VALUE;} else {return currentTime;}}if (isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}//進(jìn)入到這里進(jìn)行sleep,表示當(dāng)前時(shí)間距離下一次tick時(shí)間還有一段距離,需要sleep。try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER\_STATE\_UPDATER.get(HashedWheelTimer.this) == WORKER\_STATE\_SHUTDOWN) {return Long.MIN\_VALUE;}}} }transferTimeoutsToBuckets
轉(zhuǎn)移任務(wù)到時(shí)間輪中,前面我們講過,任務(wù)添加進(jìn)來時(shí),是先放入到阻塞隊(duì)列。
而在現(xiàn)在這個(gè)方法中,就是把阻塞隊(duì)列中的數(shù)據(jù)轉(zhuǎn)移到時(shí)間輪的指定位置。
在這個(gè)轉(zhuǎn)移方法中,寫死了一個(gè)循環(huán),每次都只轉(zhuǎn)移 10 萬個(gè)任務(wù)。
然后根據(jù) HashedWheelTimeout 的 deadline 延遲時(shí)間計(jì)算出時(shí)間輪需要運(yùn)行多少次才能運(yùn)行當(dāng)前的任務(wù),如果當(dāng)前的任務(wù)延遲時(shí)間大于時(shí)間輪跑一圈所需要的時(shí)間,那么就計(jì)算需要跑幾圈才能到這個(gè)任務(wù)運(yùn)行。
最后計(jì)算出該任務(wù)在時(shí)間輪中的槽位,添加到時(shí)間輪的鏈表中。
private void transferTimeoutsToBuckets() {// 循環(huán)100000次,也就是每次轉(zhuǎn)移10w個(gè)任務(wù)for (int i = 0; i < 100000; i++) {//從阻塞隊(duì)列中獲得具體的任務(wù)HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST\_CANCELLED) {// Was cancelled in the meantime.continue;}//計(jì)算tick次數(shù),deadline表示當(dāng)前任務(wù)的延遲時(shí)間,tickDuration表示時(shí)間槽的間隔,兩者相除就可以計(jì)算當(dāng)前任務(wù)需要tick幾次才能被執(zhí)行l(wèi)ong calculated = timeout.deadline / tickDuration;// 計(jì)算剩余的輪數(shù), 只有 timer 走夠輪數(shù), 并且到達(dá)了 task 所在的 slot, task 才會(huì)過期.(被執(zhí)行)timeout.remainingRounds = (calculated - tick) / wheel.length;//如果任務(wù)在timeouts隊(duì)列里面放久了, 以至于已經(jīng)過了執(zhí)行時(shí)間, 這個(gè)時(shí)候就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會(huì)被執(zhí)行final long ticks = Math.max(calculated, tick);// 算出任務(wù)應(yīng)該插入的 wheel 的 slot, stopIndex = tick 次數(shù) & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);//把timeout任務(wù)插入到指定的bucket鏈中。HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);} }運(yùn)行時(shí)間輪中的任務(wù)
當(dāng)指針跳動(dòng)到某一個(gè)時(shí)間槽中時(shí),會(huì)就觸發(fā)這個(gè)槽中的任務(wù)的執(zhí)行。該功能是通過 expireTimeouts 來實(shí)現(xiàn)
這個(gè)方法的主要作用是: 過期并執(zhí)行格子中到期的任務(wù)。也就是當(dāng) tick 進(jìn)入到指定格子時(shí),worker 線程會(huì)調(diào)用這個(gè)方法
HashedWheelBucket 是一個(gè)鏈表,所以我們需要從 head 節(jié)點(diǎn)往下進(jìn)行遍歷。如果鏈表沒有遍歷到鏈表尾部那么就繼續(xù)往下遍歷。
獲取的 timeout 節(jié)點(diǎn)節(jié)點(diǎn),如果剩余輪數(shù) remainingRounds 大于 0,那么就說明要到下一圈才能運(yùn)行,所以將剩余輪數(shù)減一;
如果當(dāng)前剩余輪數(shù)小于等于零了,那么就將當(dāng)前節(jié)點(diǎn)從 bucket 鏈表中移除,并判斷一下當(dāng)前的時(shí)間是否大于 timeout 的延遲時(shí)間,如果是則調(diào)用 timeout 的 expire 執(zhí)行任務(wù)。
void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;// 遍歷當(dāng)前時(shí)間槽中的所有任務(wù)while (timeout != null) {HashedWheelTimeout next = timeout.next;//如果當(dāng)前任務(wù)要被執(zhí)行,那么remainingRounds應(yīng)該小于或者等于0if (timeout.remainingRounds <= 0) {//從bucket鏈表中移除當(dāng)前timeout,并返回鏈表中下一個(gè)timeoutnext = remove(timeout);//如果timeout的時(shí)間小于當(dāng)前的時(shí)間,那么就調(diào)用expire執(zhí)行taskif (timeout.deadline <= deadline) {timeout.expire();} else {//不可能發(fā)生的情況,就是說round已經(jīng)為0了,deadline卻>當(dāng)前槽的deadline// The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {//因?yàn)楫?dāng)前的槽位已經(jīng)過了,說明已經(jīng)走了一圈了,把輪數(shù)減一timeout.remainingRounds--;}//把指針放置到下一個(gè)timeouttimeout = next;} }總結(jié)
以上是生活随笔為你收集整理的Redis之时间轮机制(五)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Yolov5训练自己的数据集+Tenso
- 下一篇: 【云和恩墨业务介绍】之数据库性能优化服务