日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用 Redis 如何实现延迟队列?

發布時間:2025/3/11 数据库 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Redis 如何实现延迟队列? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

延遲消息隊列在我們的日常工作中經常會被用到,比如支付系統中超過 30 分鐘未支付的訂單,將會被取消,這樣就可以保證此商品庫存可以釋放給其他人購買,還有外賣系統如果商家超過 5 分鐘未接單的訂單,將會被自動取消,以此來保證用戶可以更及時的吃到自己點的外賣,等等諸如此類的業務場景都需要使用到延遲消息隊列,又因為它在業務中比較常見,因此這個知識點在面試中也會經常被問到。

我們本文的面試題是,使用 Redis 如何實現延遲消息隊列?

典型回答

延遲消息隊列的常見實現方式是通過 ZSet 的存儲于查詢來實現,它的核心思想是在程序中開啟一個一直循環的延遲任務的檢測器,用于檢測和調用延遲任務的執行,如下圖所示: ZSet 實現延遲任務的方式有兩種,第一種是利用 zrangebyscore 查詢符合條件的所有待處理任務,循環執行隊列任務;第二種實現方式是每次查詢最早的一條消息,判斷這條信息的執行時間是否小于等于此刻的時間,如果是則執行此任務,否則繼續循環檢測。

方式一:zrangebyscore 查詢所有任務 此實現方式是一次性查詢出所有的延遲任務,然后再進行執行,實現代碼如下:

import redis.clients.jedis.Jedis; import utils.JedisUtils;import java.time.Instant; import java.util.Set;/*** 延遲隊列*/ public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續添加測試數據jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue(jedis);}/*** 延遲隊列消費* @param jedis Redis 客戶端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 當前時間Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間long nowSecond = nowInstant.getEpochSecond();// 查詢當前時間的所有任務Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消費任務System.out.println("消費:" + item);}// 刪除已經執行的任務jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒輪詢一次}} }

以上程序執行結果如下:

消費:order2 消費:order3 消費:order4 消費:order5 消費:order_1

方式二:判斷最早的任務 此實現方式是每次查詢最早的一條任務,再與當前時間進行判斷,如果任務執行時間大于當前時間則表示應該立即執行延遲任務,實現代碼如下:

import redis.clients.jedis.Jedis; import utils.JedisUtils;import java.time.Instant; import java.util.Set;/*** 延遲隊列*/ public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續添加測試數據jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue2(jedis);}/*** 延遲隊列消費(方式 2)* @param jedis Redis 客戶端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 當前時間long nowSecond = Instant.now().getEpochSecond();// 每次查詢一條消息,判斷此消息的執行時間Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息執行時間Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消費消息(業務功能處理)System.out.println("消費消息:" + firstValue);// 刪除已經執行的任務jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 執行間隔}} }

以上程序執行結果和實現方式一相同,結果如下:

消費:order2 消費:order3 消費:order4 消費:order5 消費:order_1

其中,執行間隔代碼 Thread.sleep(100) 可根據實際的業務情況刪減或配置。

考點分析

延遲消息隊列的實現方法有很多種,不同的公司可能使用的技術也是不同的,我上面是從 Redis 的角度出發來實現了延遲消息隊列,但一般面試官不會就此罷休,會借著這個問題來問關于更多的延遲消息隊列的實現方法,因此除了 Redis 實現延遲消息隊列的方式,我們還需要具備一些其他的常見的延遲隊列的實現方法。

和此知識點相關的面試題還有以下這些:

  • 使用 Java 語言如何實現一個延遲消息隊列?
  • 你還知道哪些實現延遲消息隊列的方法?

知識擴展

Java 中的延遲消息隊列

我們可以使用 Java 語言中自帶的 DelayQueue 數據類型來實現一個延遲消息隊列,實現代碼如下:

public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延遲截止時間(單面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 獲取剩余時間public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 隊列里元素的排序依據public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}} }

以上程序執行的結果如下:

開始時間:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 結束時間:2019-6-13 20:40:43

此實現方式的優點是開發比較方便,可以直接在代碼中使用,實現代碼也比較簡單,但它缺點是數據保存在內存中,因此可能存在數據丟失的風險,最大的問題是它無法支持分布式系統。

使用 MQ 實現延遲消息隊列

我們使用主流的 MQ 中間件也可以方便的實現延遲消息隊列的功能,比如 RabbitMQ,我們可以通過它的 rabbitmq-delayed-message-exchange 插件來實現延遲隊列。

首先我們需要配置并開啟 rabbitmq-delayed-message-exchange 插件,然后再通過以下代碼來實現延遲消息隊列。

配置消息隊列:

import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默認的交換機@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//參數二為類型:必須是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 綁定隊列到交換器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();} }

發送者實現代碼如下:

import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發送時間:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});} }

從上述代碼我們可以看出,我們配置 3s 之后再進行任務執行。

消費者實現代碼如下:

import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:" + sdf.format(new Date()));System.out.println("消息內容:" + msg);} }

測試代碼如下:

import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序執行之后,再退出測試} }

以上程序的執行結果為:

發送時間:2020-06-11 20:47:51 接收時間:2018-06-11 20:47:54 消息內容:Hi Admin.

從上述結果中可以看出,當消息進入延遲隊列 3s 之后才被正常消費,執行結果符合我的預期,RabbitMQ 成功的實現了延遲消息隊列。

總結

本文我們講了延遲消息隊列的兩種使用場景:支付系統中的超過 30 分鐘未支付的訂單,將會被自動取消,以此來保證此商品的庫存可以正常釋放給其他人購買,還有外賣系統如果商家超過 5 分鐘未接單的訂單,將會被自動取消,以此來保證用戶可以更及時的吃到自己點的外賣。并且我們講了延遲隊列的 4 種實現方式,使用 ZSet 的 2 種實現方式,以及 Java 語言中的 DelayQueue 的實現方式,還有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的實現方式。

總結

以上是生活随笔為你收集整理的使用 Redis 如何实现延迟队列?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。