RabbitMQ自学之路(九)——RabbitMQ实现延时队列的两种方式
一、什么是延時隊列
延時隊列顧名思義,即放置在該隊列里面的消息是不需要立即消費的,而是等待一段時間之后取出消費。
二、延時隊列應用于什么場景
場景一:在訂單系統中,一個用戶下單之后通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延時隊列將訂單信息發送到延時隊列。
場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發送到延時隊列,當指令設定的時間到了再將指令推送到智能設備。
Rabbitmq實現延時隊列一般而言有兩種形式:
第一種方式:利用兩個特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
第二種方式:利用rabbitmq中的插件x-delay-message
三、第一種:利用TTL DLX實現延時隊列的方式
AMQP協議和RabbitMQ隊列本身沒有直接支持延遲隊列功能,但是可以通過以下特性模擬出延遲隊列的功能。
1、Time To Live(TTL)
RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為準),則消息變為dead letter(死信)
A: 通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
B: 對消息進行單獨設置,每條消息TTL可以不同。
2、Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。
x-dead-letter-exchange:出現dead letter之后將dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現dead letter之后將dead letter重新按照指定的routing-key發送
用一個具體案例來實現第一種方式:用戶下訂單后,如何在一分鐘沒有支付就取消訂單
package com.springboot.rabbitmq.example.demo5.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
/**
?* @method
?* @author Mr yi
?* @time 2019年6月23日
?*/
@Configuration
@Slf4j
public class RabbitConfigDemo5 ? ?{
?
?? ?//隊列名稱
?? ?final static String queue = "queue_demo5";
?? ?//交換機名稱
?? ?final static String exchangeName = "deom5Exchange";
?? ?
?? ?// routingKey
?? ?final static String routingKey ?= "keyDemo5";
?? ?
?? ?//死信消息隊列名稱
?? ?final static String deal_queue = "deal_queue_demo5";
?? ?//死信交換機名稱
?? ?final static String deal_exchangeName = "deal_deom5Exchange";
?? ?
?? ?//死信 routingKey
?? ?final static String dead_RoutingKey ?= "dead_routing_key";
?? ?
?? ?//死信隊列 交換機標識符
? ? public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
? ??
? ? //死信隊列交換機綁定鍵標識符
? ? public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
? ? @Autowired
? ? private CachingConnectionFactory connectionFactory;
? ??
? ? /**
? ? ?*?
? ? ?* @method 定義隊列(隊列 綁定一個死信交換機,并指定routing_key)
? ? ?* @author Mr yi
? ? ?* @time 2019年6月29日
? ? ?* @return
? ? ?*/
?? ?@Bean
?? ?public Queue queueDemo5() {
?? ??? ?// 將普通隊列綁定到死信隊列交換機上
? ? ? ? Map<String, Object> args = new HashMap<>(2);
? ? ? ? //args.put("x-message-ttl", 5 * 1000);//直接設置 Queue 延遲時間 但如果直接給隊列設置過期時間,這種做法不是很靈活
? ? ? ? //這里采用發送消息動態設置延遲時間,這樣我們可以靈活控制
? ? ? ? args.put(DEAD_LETTER_QUEUE_KEY, deal_exchangeName);
? ? ? ? args.put(DEAD_LETTER_ROUTING_KEY, dead_RoutingKey);
? ? ? ? return new Queue(RabbitConfigDemo5.queue, true, false, false, args);
?? ?}
?? ?//聲明一個direct類型的交換機
?? ?@Bean
?? ?DirectExchange exchangeDemo5() {
?? ??? ?return new DirectExchange(RabbitConfigDemo5.exchangeName);
?? ?}
?? ?//綁定Queue隊列到交換機,并且指定routingKey?
?? ?@Bean
?? ?Binding bindingDirectExchangeDemo5( ? ) {
?? ??? ?return BindingBuilder.bind(queueDemo5()).to(exchangeDemo5()).with(routingKey);
?? ?}
?? ?
?? ?//創建配置死信隊列
? ? @Bean
? ? public Queue deadQueue5() {
? ? ? ? Queue queue = new Queue(deal_queue, true);
? ? ? ? return queue;
? ? }
?? ?
? ? //創建死信交換機
? ? ?@Bean
? ? ?public DirectExchange deadExchange5() {
? ? ? ? ?return new DirectExchange(deal_exchangeName);
? ? ?}
?? ?
? ? ?//死信隊列與死信交換機綁定
? ? ? @Bean
? ? ? public Binding bindingDeadExchange5() {
? ? ? ? ? return BindingBuilder.bind(deadQueue5()).to(deadExchange5()).with(dead_RoutingKey);
? ? ? }
/** ? ? ?@Bean
? ? ? public RabbitTemplate rabbitTemplate(){
? ?? ??? ?//若使用confirm-callback ,必須要配置publisherConfirms 為true
? ?? ??? ?connectionFactory.setPublisherConfirms(true);
? ?? ??? ?//若使用return-callback,必須要配置publisherReturns為true
? ? ? ? ? connectionFactory.setPublisherReturns(true);
? ? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ? ? //使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true
? ? ? ? ?// rabbitTemplate.setMandatory(true);
? ?
? ? ? ? ? // 如果消息沒有到exchange,則confirm回調,ack=false; 如果消息到達exchange,則confirm回調,ack=true
? ? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? ? ? ? if(ack){
? ? ? ? ? ? ? ? ? ? ? log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
? ? ? ? ? ? ? ? ? }else{
? ? ? ? ? ? ? ? ? ? ? log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? }
? ? ? ? ? });
? ? ? ? ??
? ? ? ? ? //如果exchange到queue成功,則不回調return;如果exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
? ? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ? ? log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
? ? ? ? ? ? ? }
? ? ? ? ? });
? ? ? ? ? return rabbitTemplate;
? ? ? }
**/
?? ??
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
生產者產生訂單后,將訂單信息發送到rabbitmq 服務段,設置TTL 時間,如果超過了這個時間,還沒有消費這個消息,那么就變為死信,發送到死信隊列中。
這里利用死信的機制來巧妙的實現延時,我這里沒有設置正常消費者,即生產者發送消息后,消息不會被消費,那么在指定時間后,變為死信,有與死信隊列綁定的消費者來消費消息(判斷訂單是否已經成功支付)
package com.springboot.rabbitmq.example.demo5.producers;
import java.util.Date;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
/**
?*?
?* @method 生產者
?* @author Mr yi
?* @time 2019年6月19日
?*/
@Component
@Slf4j
public class ProducersDemo5 ?{
?
? ? @Autowired
? ? private AmqpTemplate rabbitTemplate;
? ??
?? ?/**
?? ? * @method 生產者發送消息,direct模式下需要傳遞一個routingKey
?? ? * @author Mr yi
?? ? * @time 2019年6月19日
?? ? * @throws Exception
?? ? */
?? ?public void send( ) throws Exception {
?? ??? ?
?? ??? ?log.info("【訂單生成時間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經支付】" ?);
? ? ? ??
? ? ? ? this.rabbitTemplate.convertAndSend("deom5Exchange", "keyDemo5", "訂單實體類對象信息", message -> {
? ? ? ? ? ? // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據業務需要是聲明 Queue 的時候就指定好延遲時間還是在發送自己控制時間
? ? ? ? ? ? message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
? ? ? ? ? ? return message;
? ? ? ? });
?? ??? ??
?? ?}
?? ??
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
死信消息消費者
package com.springboot.rabbitmq.example.demo5.consumers;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
?*?
?* @method ?死信消費者,消費從死信隊列傳來的消息
?* @author Mr yi
?* @time 2019年6月19日
?*/
@Component
@Slf4j
public class ConsumersDemo5Deal {
?? ?
?? ?@RabbitListener(queues = "deal_queue_demo5")
? ? public void process(String order, ?Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
?? ?
?? ??? ?log.info("【 監聽到延時隊列消息】 - 【消費時間】 - [{}]- 【訂單內容】 - [{}]", ?new Date(), order);?
?? ??? ?// 判斷訂單是否已經支付,如果支付則;否則,取消訂單(邏輯代碼省略)
?? ??? ?
? ? ? ? // 手動ack
? ? ? ? Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
? ? ? ? // 手動簽收
? ? ? ? channel.basicAck(deliveryTag, false);
? ? ? ? System.out.println("執行結束....");
? ? ? ??
? ? }
?? ?
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
測試
@Autowired
?? ?private ProducersDemo5 producers;
?? ?
?? ?@RequestMapping("/send")
? ? public String send() throws Exception {
? ? ?? ?producers.send();
? ? ?? ?return "success";
? ? }
1
2
3
4
5
6
7
8
啟動程序,測試
發現queue_demo5正常隊列有一條消息處于待續狀態
等待一分鐘后,控制臺輸出
發現queue_demo5 消息已經被消費(發送到deal_queue_demo5死信隊列了)
使用死信隊列實現延時消息的缺點:
1) 如果統一用隊列來設置消息的TTL,當延時時間梯度比較多的話,比如1分鐘,2分鐘,5分鐘,10分鐘,20分鐘,30分鐘……需要創建很多交換機和隊列來路由消息。
2) 如果單獨設置消息的TTL,則可能會造成隊列中的消息阻塞——前一條消息沒有出隊(沒有被消費),后面的消息無法投遞。
3) 可能存在一定的時間誤差。
四、第二種:利用rabbitmq-delayed-message-exchange插件來實現延遲隊列功能
插件下載地址:注意下載插件要和安裝的rabbitmq版本一致,我這里下載的是3.7的
https://www.rabbitmq.com/community-plugins.html
下載解壓后,得到一個.ez的壓縮文件,找到rabbitmq安裝目錄的plugins文件夾,將解壓的文件復制進去
重新啟動rabbitmq ,輸入命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
停止:net stop RabbitMQ
啟動:net start RabbitMQ
1
2
配置類
package com.springboot.rabbitmq.example.demo6.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import lombok.extern.slf4j.Slf4j;
/**
?* @method
?* @author Mr yi
?* @time 2019年6月23日
?*/
@Configuration
@Slf4j
public class RabbitConfigDemo6 ? ?{
?
?? ?//隊列名稱
?? ?final static String queue = "queue_demo6";
?? ?//交換機名稱
?? ?final static String exchangeName = "deom6Exchange";
?? ?
?? ?// routingKey
?? ?final static String routingKey ?= "keyDemo6";
?? ?
?? ??
? ? @Autowired
? ? private CachingConnectionFactory connectionFactory;
? ??
?? ?@Bean
?? ?public Queue queueDemo6() {
?? ??? ?// 第一個參數是創建的queue的名字,第二個參數是是否支持持久化
? ? ? ? return new Queue(RabbitConfigDemo6.queue, true);
?? ?}
?? ?@Bean
? ? public CustomExchange delayExchange6() {
? ? ? ? Map<String, Object> args = new HashMap<String, Object>();
? ? ? ? args.put("x-delayed-type", "direct");
? ? ? ? return new CustomExchange(RabbitConfigDemo6.exchangeName, "x-delayed-message", true, false, args);
? ? }
?
?? ?@Bean
? ? public Binding bindingNotify6() {
? ? ? ? return BindingBuilder.bind(queueDemo6()).to(delayExchange6()).with(RabbitConfigDemo6.routingKey).noargs();
? ? }
?
?? ??
/** ? ? ?@Bean
? ? ? public RabbitTemplate rabbitTemplate(){
? ?? ??? ?//若使用confirm-callback ,必須要配置publisherConfirms 為true
? ?? ??? ?connectionFactory.setPublisherConfirms(true);
? ?? ??? ?//若使用return-callback,必須要配置publisherReturns為true
? ? ? ? ? connectionFactory.setPublisherReturns(true);
? ? ? ? ? RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ? ? //使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true
? ? ? ? ?// rabbitTemplate.setMandatory(true);
? ?
? ? ? ? ? // 如果消息沒有到exchange,則confirm回調,ack=false; 如果消息到達exchange,則confirm回調,ack=true
? ? ? ? ? rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void confirm(CorrelationData correlationData, boolean ack, String cause) {
? ? ? ? ? ? ? ? ? if(ack){
? ? ? ? ? ? ? ? ? ? ? log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
? ? ? ? ? ? ? ? ? }else{
? ? ? ? ? ? ? ? ? ? ? log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? }
? ? ? ? ? });
? ? ? ? ??
? ? ? ? ? //如果exchange到queue成功,則不回調return;如果exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
? ? ? ? ? rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
? ? ? ? ? ? ? ? ? log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
? ? ? ? ? ? ? }
? ? ? ? ? });
? ? ? ? ? return rabbitTemplate;
? ? ? }
**/
?? ??
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
生產者,設置setDelay(1 * 1000 * 60 ); 延時 1分鐘
package com.springboot.rabbitmq.example.demo6.producers;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
/**
?*?
?* @method 生產者
?* @author Mr yi
?* @time 2019年6月19日
?*/
@Component
@Slf4j
public class ProducersDemo6 {
?
? ? @Autowired
? ? private AmqpTemplate rabbitTemplate;
? ??
?? ?/**
?? ? * @method 生產者發送消息,direct模式下需要傳遞一個routingKey
?? ? * @author Mr yi
?? ? * @time 2019年6月19日
?? ? * @throws Exception
?? ? */
?? ?public void send( ) throws Exception {
?? ??? ?
?? ??? ?log.info("【訂單生成時間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經支付】" ?);
?? ??? ?
?? ??? ?this.rabbitTemplate.convertAndSend("deom6Exchange", "keyDemo6", "訂單實體類對象信息", new MessagePostProcessor() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public Message postProcessMessage(Message message) throws AmqpException {
? ? ? ? ? ? ? ? message.getMessageProperties().setDelay(1 * 1000 * 60 );
? ? ? ? ? ? ? ? return message;
? ? ? ? ? ? }
? ? ? ? });
?
?? ??? ??
?? ?}
?? ??
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
消費者,消費者一分鐘后得到生產者發送的消息
package com.springboot.rabbitmq.example.demo6.consumers;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
?*?
?* @method ?消費者
?* @author Mr yi
?* @time 2019年6月19日
?*/
@Component
@Slf4j
public class ConsumersDemo6 {
?? ?
?? ?@RabbitListener(queues = "queue_demo6")
? ? public void process(String order, ?Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
?? ?
?? ??? ?log.info("【 監聽到延時隊列消息】 - 【消費時間】 - [{}]- 【訂單內容】 - [{}]", ?new Date(), order);?
?? ??? ?// 判斷訂單是否已經支付,如果支付則;否則,取消訂單(邏輯代碼省略)
?? ??? ?
? ? ? ? // 手動ack
? ? ? ? Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
? ? ? ? // 手動簽收
? ? ? ? channel.basicAck(deliveryTag, false);
? ? ? ? System.out.println("執行結束....");
? ? ? ??
? ? }
?? ?
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
測試
@Autowired
?? ?private ProducersDemo6 producers;
?? ?
?? ?@RequestMapping("/send")
? ? public String send() throws Exception {
? ? ?? ?producers.send();
? ? ?? ?return "success";
? ? }
1
2
3
4
5
6
7
8
啟動程序,執行方法
控制臺輸出
rabbitmq服務端,queue_demo6 其中并沒有消息進入就緒狀態,這一點也是和第一種方式(使用死信)的區別優勢所在。
等待一分鐘后,消費者接受到消息控制臺
源碼下載:https://download.csdn.net/download/qq_29914837/11264460
如果你覺得本篇文章對你有所幫助的話,麻煩請點擊頭像右邊的關注按鈕,謝謝!
技術在交流中進步,知識在分享中傳播
————————————————
版權聲明:本文為CSDN博主「互聯網叫獸」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_29914837/article/details/94070677
總結
以上是生活随笔為你收集整理的RabbitMQ自学之路(九)——RabbitMQ实现延时队列的两种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【RabbitMQ】一文带你搞定Rabb
- 下一篇: 新能源汽车充电电表,家用电也可以用吗新能