日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 延迟队列详解

發布時間:2023/12/20 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 延迟队列详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、延遲隊列概念

延遲隊列存儲的對象是對應的延遲消息,所謂“延遲消息”是指當消息被發送以后,并不想讓消費者立刻拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。

二、延遲隊列使用場景

1、訂單在十分鐘之內未支付則自動取消
2、新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
3、用戶注冊成功后,如果三天內沒有登錄則進行短信提醒。
4、用戶發起退款后,如果三天內沒有得到處理則通知相關運營人員。
5、預定會議后,需要在預定時間點前十分鐘通知各個與會人員參加會議。

這些場景都有一個特點,需要在某個時間發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鐘之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉。看起來似乎使用定時任務,一直輪詢數據,每秒查一次,然后取出需要被處理的數據進行處理就可以了。如果數據量比較少,確實可以這樣做,比如:對于“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案。但對于數據量比較大,并且時效性較強的場景,如:“訂單十分鐘內未支付則關閉”,短期內未支付的訂單數據可能會很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的數據量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給數據庫帶來很大壓力,無法滿足業務要求而且性能低下。

三、RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。目前有兩種方法可以設置消息的 TTL。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息本身進行單獨設置,每條消息的 TTL 可以不同。如果兩種方法一起使用,則消息的 TTL 以兩者之間較小的那個數值為準。消息在隊列中的生存時間一旦超過設置的 TTL 值時,就會變成“死信”。

3.1 消息設置 TTL

針對每條消息設置 TTL 的方法時在 channel.basicPublish 方法中加入 expiration 的屬性參數,單位為毫秒。

代碼示例:

// 設置消息 TTL 過期時間為 10s AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); String message = "info"; channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());

3.2 設置隊列 TTL

通過 channel.queueDeclare 方法中的 x-expires 參數可以控制隊列被自動刪除前處于未使用狀態的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,并且在過期時間段內也未調用過 Basic.get 命令。

RabbitMQ 會確保在過期時間到達后將隊列刪除,但是不保障刪除的動作有多及時。在 RabbitMQ 重啟后,持久化的隊列的過期時間會被重新計算。

用于表示過期時間的 x-expires 參數以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,不過不能設置為 0。比如該參數設置為 1000,則表示該隊列如果在 1 秒鐘之內未使用則會被刪除。

代碼示例:

Map<String,Object> args = new HashMap<>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue",false, false, false, args);

3.3 兩者的區別

如果設置了隊列的 TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列則會被丟到死信隊列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需要注意一點是,如果不設置 TTL,表示消息永遠不會過期,如果將 TTL 設置為 0,則表示除非此時可以直接投遞該消息到消費者,否則該消息將會別丟棄。

四、SpringBoot 整合 RabbitMQ

4.1 添加依賴

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency> </dependencies>

4.2 修改配置文件

spring:rabbitmq:host: IP地址username: adminpassword: adminport: 5672virtual-host: /test

五、隊列 TTL

5.1 代碼框架圖

創建兩個隊列 QA 和 QB,兩個隊列 TTL 分別設置為 10s 和 40s,然后再創建一個交換機 X 和死信交換機 Y,它們的類型都是 direct,創建一個死信隊列 QD,它們的綁定關系如下:

5.2 配置文件類代碼

@Configuration public class TtlQueueConfig {/*** 普通交換機名稱*/public static final String X_EXCHANGE = "X";/*** 死信交換機名稱*/public static final String Y_DEAD_LETTER_EXCHANGE = "Y";/*** 普通隊列名稱*/public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";/*** 死信隊列名稱*/public static final String DEAD_LETTER_QUEUE = "QD";/*** 聲明 XExchange*/@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}/*** 聲明 yExchange*/@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}/*** 聲明隊列QA*/@Beanpublic Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 設置死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 設置死信路由鍵arguments.put("x-dead-letter-routing-key", "YD");// 設置過期時間arguments.put("x-message-ttl", 10000);return new Queue(QUEUE_A, true, false, false, arguments);}/*** 聲明隊列QB*/@Beanpublic Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 設置死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 設置死信路由鍵arguments.put("x-dead-letter-routing-key", "YD");// 設置過期時間arguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 死信隊列QD*/@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 綁定*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(){return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }

5.3 消息生產者代碼

@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable String message){log.info("當前時間:{}發送一條消息{}給兩個隊列", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息來自TTL為10s隊列QA:"+message);rabbitTemplate.convertAndSend("X", "XB", "消息來自TTL為40s隊列QB:"+message);return "發送成功";} }

5.4 消息消費者代碼

@Slf4j @Component public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message){String msg = new String(message.getBody());log.info("當前時間{},收到死信隊列的消息:{}", new Date(), msg);} }

發送一個請求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻

第一條消息在 10s 后變成了死信消息,然后被消費者消費掉了,第二條消息在 40s 之后變成了死信消息,然后被消費掉,這樣一個延時隊列就完成了。

不過,如果這樣使用的話,豈不是每增加一個新的時間需求,就要新增一個隊列,這里只有 10s 和 40s 兩個時間選項,如果需要一個小時后處理,那么就需要增加 TTL 為一個小時的隊列,如果是預定會議室,然后提前通知這樣的場景,豈不是要增加無數個隊列才能滿足需求?

六、延時隊列優化

6.1 代碼架構圖

在這里新增了一個隊列 QC,綁定關系如下,該隊列不設置 TTL 時間

6.2 配置類文件代碼

@Component public class MsgTtlQueueConfig {private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String QUEUE_C = "QC";@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>(2);// 聲明當前隊列綁定的死信交換機arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 聲明當前隊列的私信路由keyarguments.put("x-dead-letter-routing-key", "YD");return new Queue(QUEUE_C, false, false, false, arguments);}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");} }

6.3 消息生產者代碼

@GetMapping("/sendMsg/{message}/{ttlTime}") public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(ttlTime);return message;}};rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);return "發送成功"; }

可以改為 Lambda 表達式:

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){rabbitTemplate.convertAndSend("X", "XC", message, (messagePostProcessor) -> {messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return "發送成功";}

MessagePostProcessor 是一個函數式接口,通常使用 lambda 表達式來實現:

amqpTemplate.convertAndSend(routingKey, m -> {m.getMessageProperties().setDeliveryMode(DeliveryMode.NON_PERSISTENT);return m; });

該接口會在框架中的幾個地方使用,例如 AmqpTemplateconvertAndSend(Object, MessagePostProcessor) ,它可用于在執行消息轉換后添加或者修改標頭或屬性。它還可用于在監聽器容器和 AmqpTemplate 中接收消息時修改入站消息。

@FunctionalInterface public interface MessagePostProcessor {/*** 用于修改或替換消息*/Message postProcessMessage(Message message) throws AmqpException;/*** 用于修改或替換消息,也可修改消息的相關數據。僅適用于出站消息*/default Message postProcessMessage(Message message, Correlation correlation) {return postProcessMessage(message);}/*** 用于修改或替換消息,也可修改消息的相關數據。僅適用于出站消息*/default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {return postProcessMessage(message, correlation);}}

將程序執行,然后發送請求:
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000


兩條消息的過期時間一致,過期時間短的那條消息,在過期時間到了以后并沒有立即被消費,而是和過期時間長的那條消息一起被消費了。所以,如果使用在消息屬性上設置 TTL 的方式,消息可能并不會按時“死亡”,因為 RabbitMQ 只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行。

七、Rabbitmq 插件實現延遲隊列

上面提到的問題,確實是一個問題,如果不能實現在消息粒度上的 TTL,并使其在設置的 TTL 時間及時死亡,就無法設計成一個通用的延時隊列。

7.1 Docker 安裝延時隊列插件

安裝教程可以參照這位大佬的文章:docker 安裝 rabbitmq并添加延遲隊列插件

注意:我在安裝完成延時隊列插件后無法登錄 RabbitMQ 的后臺管理系統,一直提示不是私密連接,懷疑是安裝插件后,賬戶被清除了,重新創建一下就可以了。

進入docker:docker exec -it rabbitmq /bin/bash添加賬號:rabbitmqctl add_user admin admin設置角色:rabbitmqctl set_user_tags admin administrator設置權限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

登錄后就可以看到了:

7.2 代碼架構圖

在這里新增了一個隊列 delayed.queue,一個自定義交換機 delayed.exchange,綁定關系如下:

7.3 配置文件類代碼

在我們自定義的交換機中,這是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在 mnesia(一個分布式數據系統)表中,當達到投遞時間時,才投遞到目標隊列中。

@Configuration public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@Bean("delayedQueue")public Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}/*** 自定義交換機 定義一個延遲交換機* 不需要死信交換機和死信隊列,支持消息延遲投遞,消息投遞之后沒有到達投遞時間,是不會投遞給隊列* 而是存儲在一個分布式表,當投遞時間到達,才會投遞到目標隊列* @return*/@Bean("delayedExchange")public CustomExchange delayedExchange(){Map<String, Object> args = new HashMap<>(1);// 自定義交換機的類型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }

7.4 消息生產者代碼

@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@GetMapping("/sendDelayMsg/{message}/{delayTime}")public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;});log.info("當前時間:{},發送一條延遲{}毫秒的信息給隊列delay.queue:{}", new Date(), delayTime, message);return "發送成功";} }

7.5 消息消費者代碼

@Slf4j @Component public class DeadLetterConsumer {public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("當前時間:{},收到延時隊列的消息:{}", new Date(), msg);} }

發起請求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

第二條消息被先消費掉了,符合預期

八、總結

延時隊列在需要延時處理的場景下非常有用,使用 RabbitMQ 來實現延時隊列可以很好地利用 RabbitMQ 的特性,如:消息可靠發送、消息可靠投遞、死信隊列,來保證消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好要的解決單點故障問題,不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。

總結

以上是生活随笔為你收集整理的RabbitMQ 延迟队列详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。