redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列
一、延遲隊(duì)列
延遲隊(duì)列,也就是一定時(shí)間之后將消息體放入隊(duì)列,然后消費(fèi)者才能正常消費(fèi)。比如1分鐘之后發(fā)送短信,發(fā)送郵件,檢測(cè)數(shù)據(jù)狀態(tài)等。
二、Redisson Delayed Queue
如果你項(xiàng)目中使用了redisson,那么恭喜你,使用延遲隊(duì)列將非常的簡(jiǎn)單。
基于Redis的Redisson分布式延遲隊(duì)列(Delayed Queue)結(jié)構(gòu)的RDelayedQueue Java對(duì)象在實(shí)現(xiàn)了RQueue接口的基礎(chǔ)上提供了向隊(duì)列按要求延遲添加項(xiàng)目的功能。該功能可以用來(lái)實(shí)現(xiàn)消息傳送延遲按幾何增長(zhǎng)或幾何衰減的發(fā)送策略。
RQueue<String> distinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue); // 10秒鐘以后將消息發(fā)送到指定隊(duì)列 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 一分鐘以后將消息發(fā)送到指定隊(duì)列 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);在該對(duì)象不再需要的情況下,應(yīng)該主動(dòng)銷毀。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷毀。
三、Java DelayQueue
DelayQueue它本質(zhì)上是一個(gè)隊(duì)列,而這個(gè)隊(duì)列里也只有存放Delayed的子類才有意義。
延遲隊(duì)列demo
public class DelayTask implements Delayed {private long startDate;public DelayTask(Long delayMillions) {this.startDate = System.currentTimeMillis() + delayMillions;}@Overridepublic int compareTo(Delayed o) {Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));}@Overridepublic long getDelay(TimeUnit unit) {return this.startDate - System.currentTimeMillis();}public static void main(String[] args) throws Exception {BlockingQueue<DelayTask> queue = new DelayQueue<>();DelayTask delayTask = new DelayTask(1000 * 5L);queue.put(delayTask);while (queue.size()>0){queue.take();}} }延遲隊(duì)列消費(fèi)原理
源碼中出現(xiàn)了三次await字眼:
- 第一次是當(dāng)隊(duì)列為空時(shí),等待;
- 第二次等待是因?yàn)?#xff0c;發(fā)現(xiàn)有任務(wù),沒(méi)有到執(zhí)行時(shí)間,并且有準(zhǔn)備執(zhí)行的線程(leader),那不好意思,還得接續(xù)等待直到下一個(gè)可執(zhí)行的任務(wù)。
- 第三次是真正延時(shí)的地方了,available.awaitNanos(delay),此時(shí)也沒(méi)有別的線程要執(zhí)行,也就是我將要執(zhí)行,等待剩下的延遲時(shí)間即可。
延遲隊(duì)列生產(chǎn)原理
為保證消費(fèi)者正常消費(fèi),如果優(yōu)先隊(duì)列頭元素和當(dāng)前放入元素相等,則說(shuō)明當(dāng)前元素消費(fèi)的優(yōu)先級(jí)高,重置準(zhǔn)備消費(fèi)的線程(leader)為null,喚醒消費(fèi)者線程重新執(zhí)行take方法邏輯。
四、手寫一個(gè)Redis延遲隊(duì)列
Redis延遲隊(duì)列設(shè)計(jì)
延遲消息體設(shè)計(jì)
延遲消息體Message實(shí)現(xiàn)了Delayed接口,這樣Java DelayQueue就知道什么時(shí)候取出消息體。
Redis延遲隊(duì)列實(shí)現(xiàn)
RedisDelayQueue構(gòu)造函數(shù)依賴redis操作緩存服務(wù)對(duì)象和目標(biāo)隊(duì)列名稱(redis key)。
offer方法傳入member(具體消息),delay(延遲時(shí)間),timeUnit(時(shí)間單位),然后封裝成延遲消息體Message對(duì)象,放入Java DelayQueue中。
run方法是一個(gè)循環(huán)體,不斷的從Java DelayQueue對(duì)象中獲取消息體,然后放入redis對(duì)應(yīng)的目標(biāo)隊(duì)列里。
延遲隊(duì)列測(cè)試demo
控制臺(tái)打印效果
五、思考
這種方案實(shí)現(xiàn)比較簡(jiǎn)單,使用的時(shí)候一定要謹(jǐn)慎,應(yīng)用于延遲小,消息量不大的場(chǎng)景是沒(méi)問(wèn)題的,畢竟Java DelayQueue是占用內(nèi)存的。另外也可以考慮利用Redis的sorted set 結(jié)構(gòu)實(shí)現(xiàn)延遲隊(duì)列,使用TimeStamp作為score,比如你的任務(wù)是要延遲5分鐘,那么就在當(dāng)前時(shí)間上加5分鐘作為 score ,輪詢?nèi)蝿?wù)每秒只輪詢 score 小于等于 當(dāng)前時(shí)間的 key即可,如果任務(wù)支持有誤差,那么當(dāng)沒(méi)有掃描到有效數(shù)據(jù)的時(shí)候可以休眠對(duì)應(yīng)時(shí)間再繼續(xù)輪詢。
總結(jié)
以上是生活随笔為你收集整理的redis延迟队列 实现_灵感来袭,基于Redis的分布式延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 搜索引擎技术之网络爬虫
- 下一篇: opencv2 取二进制数据_百亿数据量