日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

借助Redis完成延时任务

發布時間:2023/12/4 数据库 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 借助Redis完成延时任务 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景

相信我們或多或少的會遇到類似下面這樣的需求:

第三方給了一批數據給我們處理,我們處理好之后就通知他們處理結果。

大概就是下面這個圖說的。

本來在處理完數據之后,我們就會馬上把處理結果返回給對方,但是對方要求我們處理速度不能過快,要有一種人為處理的效果。

換句話就是說,就算是處理好了,也要晚一點再執行通知操作。

這就是一個典型的延時任務。

延時,那還不簡單,執行完之后,讓它Sleep一下就好了,這樣就達到目標了。

Sleep一下確定是最容易實現的一種方案,但是試想一下,數據的數量不斷的增加,這樣Sleep真的好嗎?答案是否定的。

延時隊列,是處理這個場景最為妥當的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達到相應的效果。

如果不具備隊列條件,又要怎么處理呢?還可以借助Redis來完成這項工作。

MQ不一定每個公司都會用,但Redis應該80%以上的都會用吧。

處理方案

Redis這邊,可用的方案有兩種,下面分別來介紹一下。

#1 鍵的過期時間

在設置緩存的時候,我們比較多情況下都會設置一個緩存的過期時間,這個時間過期后,會重新去數據源拿數據回來。

可以基于這個過期時間結合Redis的keyspace notifications共同完成。

keyspace notifications里面包含了非常多的事件,這里只關注EXPIRE,這個是和過期有關的。

只要訂閱了__keyevent@0__:expired這個主題,當有key過期的時候,就會收到對應的信息。

主題@后面的0,指的是db 0.

要想使用這個特性,必不可少的一步是修改Redis默認的配置,把notify-keyspace-events設置成Ex。

############################# Event notification ############################### Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # ......... # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events "Ex"

其中 E 指的是鍵事件通知,x 指的是過期事件。

根據這個特性,重新調整一下流程圖:

應該也比較好懂,下面通過簡單的代碼來實現一下這種方案。

首先是處理完數據及往Redis寫數據。

public async Task DoTaskAsync() {// 數據處理// ...// 后續操作要延時,把Id記錄下來var taskId = new Random().Next(1, 10000);// 要延遲的時間int sec = new Random().Next(1, 5);// 可以加個重試機制,預防單次執行失敗。await RedisHelper.SetAsync($"task:{taskId}", "1", sec); }

還需要回傳結果的后臺任務,這個任務就是去訂閱上面說的鍵過期事件,然后回傳結果。

這里可以借助BackgroundService來訂閱處理。

public class SubscribeTaskBgTask : BackgroundService {protected override Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var keyPrefix = "task:";RedisHelper.Subscribe(("__keyevent@0__:expired", arg =>{var msg = arg.Body;Console.WriteLine($"recive {msg}");if (msg.StartsWith(keyPrefix)){// 取到任務Idvar val = msg.Substring(keyPrefix.Length);Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回傳處理結果給第三方,這里可以考慮這個并發鎖,避免多實例都處理了這個任務。// ....}}));return Task.CompletedTask;} }

這里有一個要注意的地方,要在key里面包含任務的Id,因為訂閱處理的時候,只能拿到一個key,后續能做的操作也只是基于這個key。

上面的例子,是用了task:任務Id的形式,所以在訂閱處理的時候,只處理以task:開頭的那些key。

效果如下:

這種方案,直觀上是非常簡單的,不過這種方案會遇到一個小問題。

當一個key過期后,并不一定會馬上收到通知,這個也是會有一定的延時的,取決于Redis的內部機制。

Redis Keyspace Notifications文檔的最后一段也提到了這個問題。

所以用這種方案的時候,要考慮一下,你的延時是不是要及時~~

#2 有序集合

有序集合是Redis中一種十分有用的數據結構,它的本質其實就是集合加了一個排序的功能,每個集合里面的元素還會有一個分值的屬性。

它提供了一個可以獲取指定分值范圍內的元素,這個也就是我們的出發點。

在這個場景下,什么東西可能作為這個分值呢?現在只有一個處理任務的Id還有一個延遲的時間,Id肯定不行,那么也只能是延遲時間來作這個分值了。

延遲1秒,5秒,1分鐘,這個都是比較大粒度的時間,這里要轉化一下,用時間戳來代替這些延遲的時間。

假設現在的時間戳是 1584171520, 要延遲5秒執行,那么執行任務的時間就是 1584171525,在當前時間戳的基礎上加個5秒,就是最終要執行的了。

到時有序集合中存的元素就會是這樣的

任務Id-1 1584171525 任務Id-2 1584171528 任務Id-3 1584171530

接下來就是要怎么取出這些任務的問題了!

把當前時間戳當成是取數的最大分值,0作為最小分值,這個時候取出的元素就是應該要執行回傳的任務了。

根據這個方案,重新調整一下流程圖:

交代清楚了思路,再來點代碼,加深一下理解。

首先還是處理完數據后往Redis寫數據。

public async Task DoTaskAsync() {// 數據處理// ...// 后續操作要延時,把Id記錄下來var taskId = new Random().Next(1, 10000);var cacheKey = "task:delay";int sec = new Random().Next(1, 5);// 要執行這個任務的時間戳var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();await RedisHelper.ZAddAsync(cacheKey, (time, taskId));Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}"); }

后面就是輪訓有序集合里面的元素了,這里同樣是借助BackgroundService來處理。

public class SubscribeTaskBgTask : BackgroundService {protected override async Task ExecuteAsync(CancellationToken stoppingToken){stoppingToken.ThrowIfCancellationRequested();var cacheKey = "task:delay";while (true){// 先取,后刪,不具備原子性,可考慮用lua腳本來保證原子性。var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);if (vals != null && vals.Length > 0){var val = vals[0];var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);if (rmCount > 0){// 要把這個元素先刪除成功了,再執行任務,不然會重復Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");// 回傳處理結果給第三方,這里可以考慮這個并發鎖,避免多實例都處理了這個任務。// ....}}else{// 沒有數據,休眠500ms,避免CPU空轉await Task.Delay(500);}}} }

效果如下:

參考文章

https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913

總結

以上是生活随笔為你收集整理的借助Redis完成延时任务的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。