实现延时任务的 4 种实现方案!
一、應用場景
在需求開發過程中,我們經常會遇到一些類似下面的場景:
a. 外賣訂單超過15分鐘未支付,自動取消
b. 使用搶票軟件訂到車票后,1小時內未支付,自動取消
c. 待處理申請超時1天,通知審核人員經理,超時2天通知審核人員總監
d. 客戶預定自如房子后,24小時內未支付,房源自動釋放?
那么針對這類場景的需求應該如果實現呢,我們最先想到的一般是啟個定時任務,來掃描數據庫里符合條件的數據,并對其進行更新操作。一般來說spring-quartz 、elasticjob 就可以實現,甚至自己寫個 Timer 也可以。
但是這種方式有個弊端,就是需要不停的掃描數據庫,如果數據量比較大,并且任務執行間隔時間比較短,對數據庫會有一定的壓力。另外定時任務的執行間隔時間的粒度也不太好設置,設置長會影響時效性,設置太短又會增加服務壓力。我們來看一下有沒有更好的實現方式。
二、JDK 延時隊列實現
DelayQueue 是 JDK 中 java.util.concurrent 包下的一種無界阻塞隊列,底層是優先隊列 PriorityQueue。對于放到隊列中的任務,可以按照到期時間進行排序,只需要取已經到期的元素處理即可。
具體的步驟是,要放入隊列的元素需要實現 Delayed 接口并實現 getDelay 方法來計算到期時間,compare 方法來對比到期時間以進行排序。一個簡單的使用例子如下:
package com.lyqiang.delay.jdk;import java.time.LocalDateTime; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;/*** @author lyqiang*/ public class TestDelayQueue {public static void main(String[] args) throws InterruptedException {// 新建3個任務,并依次設置超時時間為 20s 10s 30sDelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L);DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L);DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L);DelayQueue<DelayTask> queue = new DelayQueue<>();queue.add(d1);queue.add(d2);queue.add(d3);int size = queue.size();System.out.println("當前時間是:" + LocalDateTime.now());// 從延時隊列中獲取元素, 將輸出 d2 、d1 、d3for (int i = 0; i < size; i++) {System.out.println(queue.take() + " ------ " + LocalDateTime.now());}} }class DelayTask implements Delayed {private Integer taskId;private long exeTime;DelayTask(Integer taskId, long exeTime) {this.taskId = taskId;this.exeTime = exeTime;}@Overridepublic long getDelay(TimeUnit unit) {return exeTime - System.currentTimeMillis();}@Overridepublic int compareTo(Delayed o) {DelayTask t = (DelayTask) o;if (this.exeTime - t.exeTime <= 0) {return -1;} else {return 1;}}@Overridepublic String toString() {return "DelayTask{" +"taskId=" + taskId +", exeTime=" + exeTime +'}';} }代碼的執行結果如下:
使用 DelayQueue, 只需要有一個線程不斷從隊列中獲取數據即可,它的優點是不用引入第三方依賴,實現也很簡單,缺點也很明顯,它是內存存儲,對分布式支持不友好,如果發生單點故障,可能會造成數據丟失,無界隊列還存在 OOM 的風險。
三、時間輪算法實現
1996 年 George Varghese 和 Tony Lauck 的論文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一種時間輪管理 Timeout 事件的方式。其設計非常巧妙,并且類似時鐘的運行,如下圖的原始時間輪有 8 個格子,假定指針經過每個格子花費時間是 1 個時間單位,當前指針指向 0,一個 17 個時間單位后超時的任務則需要運轉 2 圈再通過一個格子后被執行,放在相同格子的任務會形成一個鏈表。
Netty 包里提供了一種時間輪的實現——HashedWheelTimer,其底層使用了數組+鏈表的數據結構,使用方式如下:
package com.lyqiang.delay.wheeltimer;import io.netty.util.HashedWheelTimer; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit;/*** @author lyqiang*/ public class WheelTimerTest {public static void main(String[] args) {//設置每個格子是 100ms, 總共 256 個格子HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);//加入三個任務,依次設置超時時間是 10s 5s 20sSystem.out.println("加入一個任務,ID = 1, time= " + LocalDateTime.now());hashedWheelTimer.newTimeout(timeout -> {System.out.println("執行一個任務,ID = 1, time= " + LocalDateTime.now());}, 10, TimeUnit.SECONDS);System.out.println("加入一個任務,ID = 2, time= " + LocalDateTime.now());hashedWheelTimer.newTimeout(timeout -> {System.out.println("執行一個任務,ID = 2, time= " + LocalDateTime.now());}, 5, TimeUnit.SECONDS);System.out.println("加入一個任務,ID = 3, time= " + LocalDateTime.now());hashedWheelTimer.newTimeout(timeout -> {System.out.println("執行一個任務,ID = 3, time= " + LocalDateTime.now());}, 20, TimeUnit.SECONDS);System.out.println("等待任務執行===========");} }代碼執行結果如下:
相比 DelayQueue 的數據結構,時間輪在算法復雜度上有一定優勢,但用時間輪來實現延時任務同樣避免不了單點故障。
四、Redis ZSet 實現
Redis 里有 5 種數據結構,最常用的是 String 和 Hash,而 ZSet 是一種支持按 score 排序的數據結構,每個元素都會關聯一個 double 類型的分數,Redis 通過分數來為集合中的成員進行從小到大的排序,借助這個特性我們可以把超時時間作為 score 來將任務進行排序。
使用?zadd key score member?命令向 redis 中放入任務,超時時間作為 score, 任務 ID 作為 member, 使用?zrange key start stop withscores?命令從 redis 中讀取任務,使用?zrem key member?命令從 redis 中刪除任務。代碼如下:
package com.lyqiang.delay.redis;import java.time.LocalDateTime; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;/*** @author lyqiang*/ public class TestRedisDelay {public static void main(String[] args) {TaskProducer taskProducer = new TaskProducer();//創建 3個任務,并設置超時間為 10s 5s 20staskProducer.produce(1, System.currentTimeMillis() + 10000);taskProducer.produce(2, System.currentTimeMillis() + 5000);taskProducer.produce(3, System.currentTimeMillis() + 20000);System.out.println("等待任務執行===========");//消費端從redis中消費任務TaskConsumer taskConsumer = new TaskConsumer();taskConsumer.consumer();} }class TaskProducer {public void produce(Integer taskId, long exeTime) {System.out.println("加入任務, taskId: " + taskId + ", exeTime: " + exeTime + ", 當前時間:" + LocalDateTime.now());RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));} }class TaskConsumer {public void consumer() {Executors.newSingleThreadExecutor().submit(new Runnable() {@Overridepublic void run() {while (true) {Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);if (taskIdSet == null || taskIdSet.isEmpty()) {//System.out.println("沒有任務");} else {taskIdSet.forEach(id -> {long result = RedisOps.getJedis().zrem(RedisOps.key, id);if (result == 1L) {System.out.println("從延時隊列中獲取到任務,taskId:" + id + " , 當前時間:" + LocalDateTime.now());}});}try {TimeUnit.MILLISECONDS.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}});} }執行結果如下:
相比前兩種實現方式,使用 Redis 可以將數據持久化到磁盤,規避了數據丟失的風險,并且支持分布式,避免了單點故障。
五、MQ 延時隊列實現
以 RabbitMQ 為例,它本身并沒有直接支持延時隊列的功能,但是通過一些特性,我們可以達到實現延時隊列的效果。
RabbitMQ 可以為 Queue 設置 TTL,,到了過期時間沒有被消費的消息將變為死信——Dead Letter。我們還可以為Queue 設置死信轉發 x-dead-letter-exchange,過期的消息可以被路由到另一個 Exchange。下圖說明了這個流程,生產者通過不同的 RoutingKey 發送不同過期時間的消息,多個隊列分別消費并產生死信后被路由到 exe-dead-exchange,再有一些隊列綁定到這個 exchange,從而進行不同業務邏輯的消費。
在 RabbitMQ 界面操作如下:
1、在?g_normal_exchange?發送測試消息
2. 隊列?g_queue_10s?綁定到?g_normal_exchange,并設置 x-message-ttl 為 10s 過期,x-dead-letter-exchange 為?g_exe_dead_exchange,可以看到消息到達后,過了 10s 之后消息被路由到g_exe_dead_exchange
3. 綁定到?g_exe_dead_exchange?的隊列?g_exe_10s_queue?消費到了這條消息
使用 MQ 實現的方式,支持分布式,并且消息支持持久化,在業內應用比較多,它的缺點是每種間隔時間的場景需要分別建立隊列。
六、總結
通過上面不同實現方式的比較,可以很明顯的看出各個方案的優缺點,在分布式系統中我們會優先考慮使用 Redis 和 MQ 的實現方式。
在需求開發中實現一個功能的方式多種多樣,需要我們進行多維度的比較,才能選擇出合理的、可靠的、高效的并且適合自己業務的解決方案。
總結
以上是生活随笔為你收集整理的实现延时任务的 4 种实现方案!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring 天天用,bean 懒加载原
- 下一篇: 大环境下瑟瑟发抖辞职的第二天,拿了两个