延时消息_手把手实现一条延时消息
前言
近期在維護公司的調度平臺,其中有個關鍵功能那就是定時任務;定時任務大家平時肯定接觸的不少,比如 JDK 中的 Timer、ScheduledExecutorService、調度框架 Quartz 等。
通常用于實現 XX 時間后的延時任務,或周期性任務;
比如一個常見的業務場景:用戶下單 N 分鐘未能支付便自動取消訂單。
實現這類需求通常有兩種方式:
- 輪詢定時任務:給定周期內掃描所有未支付的訂單,查看時間是否到期。
- 延時消息:訂單創建的時候發送一條 N 分鐘到期的信息,一旦消息消費后便可判斷訂單是否可以取消。
先看第一種,這類方式實現較為簡單,只需要啟動一個定時任務即可;但缺點同樣也很明顯,這個間隔掃描的時間不好控制。
給短了會造成很多無意義的掃描,增大數據庫壓力,給長了又會使得誤差較大。
當然最大的問題還是效率較低,隨著訂單增多耗時會呈線性增長,最差的情況甚至會出現上一波輪詢還沒有掃描完,下一波調度又來了。
這時第二種方案就要顯得靠譜多了,通過延時消息可以去掉不必要的訂單掃描,實時性也比較高。
延時消息
這里我們不過多討論這類需求如何實現;重點聊聊這個延時消息,看它是如何實現的,基于實現延時消息的數據結構還能實現定時任務。
我在之前的開源 IM 項目中也加入了此類功能,可以很直觀的發送一條延時消息,效果如下:
使用 :delay hahah 2 發送了一條兩秒鐘的延時消息,另外一個客戶端將會在兩秒鐘之后收到該消息。
具體的實現步驟會在后文繼續分析。
時間輪
要實現延時消息就不得不提到一種數據結構【時間輪】,時間輪聽這名字可以很直觀的抽象出它的數據結構。
其實本質上它就是一個環形的數組,如圖所示,假設我們創建了一個長度為 8 的時間輪。
task0 = 當我們需要新建一個 5s 延時消息,則只需要將它放到下標為 5 的那個槽中。
task1 = 而如果是一個 10s 的延時消息,則需要將它放到下標為 2 的槽中,但同時需要記錄它所對應的圈數,不然就和 2 秒的延時消息重復了。
task2= 當創建一個 21s 的延時消息時,它所在的位置就和 task0 相同了,都在下標為 5 的槽中,所以為了區別需要為他加上圈數為 2。
通過這張圖可以更直觀的理解。
當我們需要取出延時消息時,只需要每秒往下移動這個指針,然后取出該位置的所有任務即可。
當然取出任務之前還得判斷圈數是否為 0 ,不為 0 時說明該任務還得再輪幾圈,同時需要將圈數 -1 。
這樣就可避免輪詢所有的任務,不過如果時間輪的槽比較少,導致某一個槽上的任務非常多那效率也比較低,這就和 HashMap 的 hash 沖突是一樣的。
編碼實現
理論講完后我們來看看實際的編碼實現,為此我創建了一個 RingBufferWheel 類。
它的主要功能如下:
- 可以添加指定時間的延時任務,在這個任務中可以實現自己的業務邏輯。
- 停止運行(包含強制停止和所有任務完成后停止)。
- 查看待執行任務數量。
首先直接看看這個類是如何使用的。
我在這里創建了 65 個延時任務,每個任務都比前一個延后 1s 執行;同時自定義了一個 Job 類來實現自己的業務邏輯,最后調用 stop(false) 會在所有任務執行完畢后退出。
構造函數
先來看看其中的構造函數,這里一共有兩個構造函數,用于接收一個線程池及時間輪的大小。
線程池的作用會在后面講到。
這里的時間輪大小也是有講究的,它的長度必須得是 2∧n,至于為什么有這個要求后面也會講到。
默認情況下會初始化一個長度為 64 的數組。
添加任務
下面來看看添加任務的邏輯,根據我們之前的那張抽象圖其實很容易實現。
首先我們要定義一個 Task 類,用于抽象任務;它本身也是一個線程,一旦延時到期便會執行其中的 run 函數,所以使用時便可繼承該類,將業務邏輯寫在 run() 中即可。
它其中還有兩個成員變量,也很好理解。
- cycleNum 用于記錄該任務所在時間輪的圈數。
- key 在這里其實就是延時時間。
//通過 key 計算應該存放的位置 private Set get(int key) { int index = mod(key, bufferSize); return (Set) ringBuffer[index]; } private int mod(int target, int mod) { // equals target % mod target = target + tick.get() ; return target & (mod - 1); }
首先是根據延時時間 (key) 計算出所在的位置,其實就和 HashMap 一樣的取模運算,只不過這里使用了位運算替代了取模,同時效率會高上不少。
這樣也解釋了為什么數組長度一定得是 2∧n。
然后查看該位置上是否存在任務,不存在就新建一個;存在自然就是將任務寫入這個集合并更新回去。
private int cycleNum(int target, int mod) { //equals target/mod return target >> Integer.bitCount(mod - 1); }其中的 cycleNum() 自然是用于計算該任務所處的圈數,也是考慮到效率問題,使用位運算替代了除法。
private void put(int key, Set tasks) { int index = mod(key, bufferSize); ringBuffer[index] = tasks; }而 put() 函數就非常簡單了,就是將任務寫入指定數組下標即可。
啟動時間輪
任務寫進去后下一步便是啟動這個時間輪了,我這里定義了一個 start() 函數。
其實本質上就是開啟了一個后臺線程來做這個事情:
它會一直從時間輪中取出任務來運行,而運行這些任務的線程便是我們在初始化時傳入的線程池;所以所有的延時任務都是由自定義的線程池調度完成的,這樣可以避免時間輪的阻塞。
這里調用的 remove(index) 很容易猜到是用于獲取當前數組中的所有任務。
邏輯很簡單就不再贅述,不過其中的 size2Notify() 倒是值得說一下。
他是用于在停止任務時,主線程等待所有延時任務執行完畢的喚醒條件。這類用法幾乎是所有線程間通信的常規套路,值得收入技能包。
停止時間輪
剛才提到的喚醒主線程得配合這里的停止方法使用:
如果是強制停止那便什么也不管,直接更新停止標志,同時關閉線程池即可。
但如果是軟停止(等待所有任務執行完畢)時,那就得通過上文提到的方式阻塞主線程,直到任務執行完畢后被喚醒。
CIM 中的應用
介紹了核心原理和基本 API 后,我們來看看實際業務場景如何結合使用(背景是一個即時通訊項目)。
我這里所使用的場景在文初也提到了,就是真的發送一條延時消息;
現有的消息都是實時消息,所以要實現一個延時消息便是在現有的發送客戶端處將延時消息放入到這個時間輪中,在任務到期時再執行真正的消息發送邏輯。
由于項目本身結合了 Spring,所以第一步自然是配置 bean。
bean 配置好后其實就可以使用了。
每當發送的是延時消息時,只需要將這個消息封裝為一個 Job 放到時間輪中,然后在自己的業務類中完成業務即可。
后續可以優化下 api,不用每次新增任務都要調用 start() 方法。
這樣一個延時消息的應用便完成了。
總結
時間輪這樣的應用還非常多,比如 Netty 中的 HashedWheelTimer 工具原理也差不多,可以用于維護長連接心跳信息。
甚至 Kafka 在這基礎上還優化出了層級時間輪,這些都是后話了,大家感興趣的話可以自行搜索資料或者抽時間我再完善一次。
這篇文章從前期準備到擼碼實現還是花了不少時間,如果對你有幫助的話還請點贊轉發。
本文的所有源碼都可在此處查閱:
https://github.com/crossoverJie/cim
你的點贊與分享是對我最大的支持
原文:https://my.oschina.net/crossoverjie/blog/3111640
總結
以上是生活随笔為你收集整理的延时消息_手把手实现一条延时消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: apache camel 相关配置_My
- 下一篇: YYUC输出联动select标签