RabbitMQ(五)死信队列和延迟队列
1.1 概念
先從概念解釋上搞清楚這個定義,死信,顧名思義就是無法被消費的消息,字面意思可以這樣理解,一般來說,producer 將消息投遞到 broker 或者直接到 queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由于特定的原因導致 queue 中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,有死信自然就有了死信隊列。
應用場景:為了保證訂單業務的消息數據不丟失,需要使用到 RabbitMQ 的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中.還有比如說: 用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效。
1.2 死信的來源
-
消息 TTL 過期
-
隊列達到最大長度(隊列滿了,無法再添加數據到 mq 中)
-
消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false
1.3 死信實戰
1.3.1 代碼架構圖
1.3.2 ttl過期
生產者代碼
public class Producer {private static final String NORMAL_EXCHANGE = "normal_exchange"; ?public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//設置消息的 TTL 時間AMQP.BasicProperties properties = newAMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties,message.getBytes());System.out.println("生產者發送消息:" + message);}}} }消費者代碼
public class Consumer01 {//普通交換機名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機名稱private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊列綁定死信交換機與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊列綁定死信隊列信息Map<String, Object> params = new HashMap<>();//正常隊列設置死信交換機 參數 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊列設置死信 routing-key 參數 key 是固定值params.put("x-dead-letter-routing-key", "lisi"); ?String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer01 接收到消息" + message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {});} }死信消費者
public class Consumer02 {private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");System.out.println("等待接收死信隊列消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Consumer02 接收死信隊列的消息" + message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});} }1.3.3 隊列達到最大長度
1)生產者發送消息去掉ttl屬性
2)普通消費者增加如下屬性
params.put("x-max-length",6);此時我們發送10條,就會有4條消息被發送到死信隊列。
重啟Consumer01,另外6條也會被消費。
1.3.4 消息被拒
public class Consumer03 {//普通交換機名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機名稱private static final String DEAD_EXCHANGE = "dead_exchange"; ?public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();//聲明死信和普通交換機 類型為 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//聲明死信隊列String deadQueue = "dead-queue";channel.queueDeclare(deadQueue, false, false, false, null);//死信隊列綁定死信交換機與 routingkeychannel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");//正常隊列綁定死信隊列信息Map<String, Object> params = new HashMap<>();//正常隊列設置死信交換機 參數 key 是固定值params.put("x-dead-letter-exchange", DEAD_EXCHANGE);//正常隊列設置死信 routing-key 參數 key 是固定值params.put("x-dead-letter-routing-key", "lisi");String normalQueue = "normal-queue";channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.equals("info5")) {System.out.println("Consumer01 接收到消息" + message + "并拒絕簽收該消息");//requeue 設置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發送到死信隊列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println("Consumer01 接收到消息" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false;channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {});} }2. 延遲隊列
2.1概念
延時隊列,隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。
2.2 使用場景
1.訂單在十分鐘之內未支付則自動取消。
2.新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
3.用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
4.用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
5.預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議。
總結
以上是生活随笔為你收集整理的RabbitMQ(五)死信队列和延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Typo: In word 拼写检查
- 下一篇: 蠕虫病毒查杀