日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列

發(fā)布時(shí)間:2024/8/23 数据库 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、延遲隊(duì)列

延遲隊(duì)列,也就是一定時(shí)間之后將消息體放入隊(duì)列,然后消費(fèi)者才能正常消費(fèi)。比如1分鐘之后發(fā)送短信,發(fā)送郵件,檢測(cè)數(shù)據(jù)狀態(tài)等。

二、Redisson Delayed Queue

如果你項(xiàng)目中使用了redisson,那么恭喜你,使用延遲隊(duì)列將非常的簡(jiǎn)單。

基于Redis的Redisson分布式延遲隊(duì)列(Delayed Queue)結(jié)構(gòu)的RDelayedQueue Java對(duì)象在實(shí)現(xiàn)了RQueue接口的基礎(chǔ)上提供了向隊(duì)列按要求延遲添加項(xiàng)目的功能。該功能可以用來(lái)實(shí)現(xiàn)消息傳送延遲按幾何增長(zhǎng)或幾何衰減的發(fā)送策略。

RQueue<String> distinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue); // 10秒鐘以后將消息發(fā)送到指定隊(duì)列 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 一分鐘以后將消息發(fā)送到指定隊(duì)列 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在該對(duì)象不再需要的情況下,應(yīng)該主動(dòng)銷毀。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷毀。

三、Java DelayQueue

DelayQueue它本質(zhì)上是一個(gè)隊(duì)列,而這個(gè)隊(duì)列里也只有存放Delayed的子類才有意義。

延遲隊(duì)列demo

public class DelayTask implements Delayed {private long startDate;public DelayTask(Long delayMillions) {this.startDate = System.currentTimeMillis() + delayMillions;}@Overridepublic int compareTo(Delayed o) {Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));}@Overridepublic long getDelay(TimeUnit unit) {return this.startDate - System.currentTimeMillis();}public static void main(String[] args) throws Exception {BlockingQueue<DelayTask> queue = new DelayQueue<>();DelayTask delayTask = new DelayTask(1000 * 5L);queue.put(delayTask);while (queue.size()>0){queue.take();}} }

延遲隊(duì)列消費(fèi)原理

源碼中出現(xiàn)了三次await字眼:

  • 第一次是當(dāng)隊(duì)列為空時(shí),等待;
  • 第二次等待是因?yàn)?#xff0c;發(fā)現(xiàn)有任務(wù),沒(méi)有到執(zhí)行時(shí)間,并且有準(zhǔn)備執(zhí)行的線程(leader),那不好意思,還得接續(xù)等待直到下一個(gè)可執(zhí)行的任務(wù)。
  • 第三次是真正延時(shí)的地方了,available.awaitNanos(delay),此時(shí)也沒(méi)有別的線程要執(zhí)行,也就是我將要執(zhí)行,等待剩下的延遲時(shí)間即可。

延遲隊(duì)列生產(chǎn)原理

為保證消費(fèi)者正常消費(fèi),如果優(yōu)先隊(duì)列頭元素和當(dāng)前放入元素相等,則說(shuō)明當(dāng)前元素消費(fèi)的優(yōu)先級(jí)高,重置準(zhǔn)備消費(fèi)的線程(leader)為null,喚醒消費(fèi)者線程重新執(zhí)行take方法邏輯。

四、手寫一個(gè)Redis延遲隊(duì)列

Redis延遲隊(duì)列設(shè)計(jì)

延遲消息體設(shè)計(jì)

延遲消息體Message實(shí)現(xiàn)了Delayed接口,這樣Java DelayQueue就知道什么時(shí)候取出消息體。

Redis延遲隊(duì)列實(shí)現(xiàn)

RedisDelayQueue構(gòu)造函數(shù)依賴redis操作緩存服務(wù)對(duì)象和目標(biāo)隊(duì)列名稱(redis key)。

offer方法傳入member(具體消息),delay(延遲時(shí)間),timeUnit(時(shí)間單位),然后封裝成延遲消息體Message對(duì)象,放入Java DelayQueue中。

run方法是一個(gè)循環(huán)體,不斷的從Java DelayQueue對(duì)象中獲取消息體,然后放入redis對(duì)應(yīng)的目標(biāo)隊(duì)列里。

延遲隊(duì)列測(cè)試demo

控制臺(tái)打印效果

五、思考

這種方案實(shí)現(xiàn)比較簡(jiǎn)單,使用的時(shí)候一定要謹(jǐn)慎,應(yīng)用于延遲小,消息量不大的場(chǎng)景是沒(méi)問(wèn)題的,畢竟Java DelayQueue是占用內(nèi)存的。另外也可以考慮利用Redis的sorted set 結(jié)構(gòu)實(shí)現(xiàn)延遲隊(duì)列,使用TimeStamp作為score,比如你的任務(wù)是要延遲5分鐘,那么就在當(dāng)前時(shí)間上加5分鐘作為 score ,輪詢?nèi)蝿?wù)每秒只輪詢 score 小于等于 當(dāng)前時(shí)間的 key即可,如果任務(wù)支持有誤差,那么當(dāng)沒(méi)有掃描到有效數(shù)據(jù)的時(shí)候可以休眠對(duì)應(yīng)時(shí)間再繼續(xù)輪詢。

總結(jié)

以上是生活随笔為你收集整理的redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。