日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

手把手实现一条延时消息

發布時間:2025/3/16 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 手把手实现一条延时消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

近期在維護公司的調度平臺,其中有個關鍵功能那就是定時任務;定時任務大家平時肯定接觸的不少,比如?JDK?中的?Timer、ScheduledExecutorService、調度框架?Quartz?等。

通常用于實現 XX 時間后的延時任務,或周期性任務;

比如一個常見的業務場景:用戶下單 N 分鐘未能支付便自動取消訂單。

實現這類需求通常有兩種方式:

  • 輪詢定時任務:給定周期內掃描所有未支付的訂單,查看時間是否到期。

  • 延時消息:訂單創建的時候發送一條 N 分鐘到期的信息,一旦消息消費后便可判斷訂單是否可以取消。

先看第一種,這類方式實現較為簡單,只需要啟動一個定時任務即可;但缺點同樣也很明顯,這個間隔掃描的時間不好控制。

給短了會造成很多無意義的掃描,增大數據庫壓力,給長了又會使得誤差較大。

當然最大的問題還是效率較低,隨著訂單增多耗時會呈線性增長,最差的情況甚至會出現上一波輪詢還沒有掃描完,下一波調度又來了。


這時第二種方案就要顯得靠譜多了,通過延時消息可以去掉不必要的訂單掃描,實時性也比較高。

延時消息

這里我們不過多討論這類需求如何實現;重點聊聊這個延時消息,看它是如何實現的,基于實現延時消息的數據結構還能實現定時任務。

我在之前的開源 IM 項目中也加入了此類功能,可以很直觀的發送一條延時消息,效果如下:

使用?:delay hahah 2?發送了一條兩秒鐘的延時消息,另外一個客戶端將會在兩秒鐘之后收到該消息。

具體的實現步驟會在后文繼續分析。

時間輪

要實現延時消息就不得不提到一種數據結構【時間輪】,時間輪聽這名字可以很直觀的抽象出它的數據結構。

其實本質上它就是一個環形的數組,如圖所示,假設我們創建了一個長度為 8 的時間輪。


task0?= 當我們需要新建一個 5s 延時消息,則只需要將它放到下標為 5 的那個槽中。

task1?= 而如果是一個 10s 的延時消息,則需要將它放到下標為 2 的槽中,但同時需要記錄它所對應的圈數,不然就和 2 秒的延時消息重復了。

task2= 當創建一個 21s 的延時消息時,它所在的位置就和?task0?相同了,都在下標為 5 的槽中,所以為了區別需要為他加上圈數為 2。

通過這張圖可以更直觀的理解。

當我們需要取出延時消息時,只需要每秒往下移動這個指針,然后取出該位置的所有任務即可。

當然取出任務之前還得判斷圈數是否為 0 ,不為 0 時說明該任務還得再輪幾圈,同時需要將圈數 -1 。

這樣就可避免輪詢所有的任務,不過如果時間輪的槽比較少,導致某一個槽上的任務非常多那效率也比較低,這就和?HashMap?的?hash?沖突是一樣的。

編碼實現

理論講完后我們來看看實際的編碼實現,為此我創建了一個?RingBufferWheel?類。

它的主要功能如下:

  • 可以添加指定時間的延時任務,在這個任務中可以實現自己的業務邏輯。

  • 停止運行(包含強制停止和所有任務完成后停止)。

  • 查看待執行任務數量。

首先直接看看這個類是如何使用的。

我在這里創建了 65 個延時任務,每個任務都比前一個延后 1s 執行;同時自定義了一個?Job?類來實現自己的業務邏輯,最后調用?stop(false)?會在所有任務執行完畢后退出。

構造函數

先來看看其中的構造函數,這里一共有兩個構造函數,用于接收一個線程池及時間輪的大小。

線程池的作用會在后面講到。

這里的時間輪大小也是有講究的,它的長度必須得是?2∧n,至于為什么有這個要求后面也會講到。

默認情況下會初始化一個長度為 64 的數組。

添加任務

下面來看看添加任務的邏輯,根據我們之前的那張抽象圖其實很容易實現。


首先我們要定義一個?Task?類,用于抽象任務;它本身也是一個線程,一旦延時到期便會執行其中的 run 函數,所以使用時便可繼承該類,將業務邏輯寫在?run()?中即可。

它其中還有兩個成員變量,也很好理解。

  • cycleNum?用于記錄該任務所在時間輪的圈數。

  • key?在這里其實就是延時時間。

//通過 key 計算應該存放的位置private Set<Task> get(int key) {int index = mod(key, bufferSize);return (Set<Task>) ringBuffer[index];}private int mod(int target, int mod) {// equals target % modtarget = target + tick.get() ;return target & (mod - 1);}

首先是根據延時時間 (key) 計算出所在的位置,其實就和?HashMap?一樣的取模運算,只不過這里使用了位運算替代了取模,同時效率會高上不少。

這樣也解釋了為什么數組長度一定得是?2∧n。

然后查看該位置上是否存在任務,不存在就新建一個;存在自然就是將任務寫入這個集合并更新回去。

private int cycleNum(int target, int mod) {//equals target/modreturn target >> Integer.bitCount(mod - 1);}

其中的?cycleNum()?自然是用于計算該任務所處的圈數,也是考慮到效率問題,使用位運算替代了除法。

private void put(int key, Set<Task> tasks) {int index = mod(key, bufferSize);ringBuffer[index] = tasks;}

而?put()?函數就非常簡單了,就是將任務寫入指定數組下標即可。

啟動時間輪

任務寫進去后下一步便是啟動這個時間輪了,我這里定義了一個?start()?函數。

其實本質上就是開啟了一個后臺線程來做這個事情:

它會一直從時間輪中取出任務來運行,而運行這些任務的線程便是我們在初始化時傳入的線程池;所以所有的延時任務都是由自定義的線程池調度完成的,這樣可以避免時間輪的阻塞。

這里調用的?remove(index)?很容易猜到是用于獲取當前數組中的所有任務。

邏輯很簡單就不再贅述,不過其中的?size2Notify()?倒是值得說一下。

他是用于在停止任務時,主線程等待所有延時任務執行完畢的喚醒條件。這類用法幾乎是所有線程間通信的常規套路,值得收入技能包。

停止時間輪

剛才提到的喚醒主線程得配合這里的停止方法使用:

如果是強制停止那便什么也不管,直接更新停止標志,同時關閉線程池即可。

但如果是軟停止(等待所有任務執行完畢)時,那就得通過上文提到的方式阻塞主線程,直到任務執行完畢后被喚醒。

CIM 中的應用

介紹了核心原理和基本?API?后,我們來看看實際業務場景如何結合使用(背景是一個即時通訊項目)。

我這里所使用的場景在文初也提到了,就是真的發送一條延時消息;

現有的消息都是實時消息,所以要實現一個延時消息便是在現有的發送客戶端處將延時消息放入到這個時間輪中,在任務到期時再執行真正的消息發送邏輯。

由于項目本身結合了?Spring,所以第一步自然是配置?bean。

bean?配置好后其實就可以使用了。

每當發送的是延時消息時,只需要將這個消息封裝為一個?Job?放到時間輪中,然后在自己的業務類中完成業務即可。

后續可以優化下?api,不用每次新增任務都要調用?start()?方法。

這樣一個延時消息的應用便完成了。

總結

時間輪這樣的應用還非常多,比如?Netty?中的?HashedWheelTimer?工具原理也差不多,可以用于維護長連接心跳信息。

甚至?Kafka?在這基礎上還優化出了層級時間輪,這些都是后話了,大家感興趣的話可以自行搜索資料或者抽時間我再完善一次。

這篇文章從前期準備到擼碼實現還是花了不少時間,如果對你有幫助的話還請點贊轉發。

本文的所有源碼都可在此處查閱:

https://github.com/crossoverJie/cim

有道無術,術可成;有術無道,止于術

歡迎大家關注Java之道公眾號

好文章,我在看??

總結

以上是生活随笔為你收集整理的手把手实现一条延时消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。