javascript
RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列
搭建SpringBoot項目,用于演示
springboot版本
<!-- spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><!-- import 導入父工程的配置--><scope>import</scope></dependency>消費與提供方的pom.xml
<dependencies><!-- spring-boot-starter-web spring-boot-starter-actuator綁定在一塊 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>提供端的application.yml
# 配置RabbitMQ的基本信息 ip 端口 username password 虛擬機.. spring:rabbitmq:host: 192.168.93.132 # ipport: 5672username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開啟 confirm 確認模式 # publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: true消費端的application.yml
spring:rabbitmq:host: 192.168.93.132 #主機ipport: 5672 #端口username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開啟 confirm 確認模式# publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: truelistener:# RabbitMQ模式使用simple simple支持事務的simple:# Consumer ACK機制:設置為手動簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費端每次向MQ拉取最大一條消息# direct 是不支持事務的 # direct: # acknowledge-mode: manual # ACK機制:設置為手動簽收 # retry: # enabled: true # 是否支持重試 # max-attempts: 3 # 重試機制,3次1.消息的可靠投遞
在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提
供了兩種方式用來控制消息的投遞可靠性模式。
? confirm 確認模式
? return 退回模式
rabbitmq 整個消息投遞的路徑為:
producer—>rabbitmq broker—>exchange—>queue—>consumer
? 消息從 producer 到 exchange 則會返回一個 confirmCallback 。
?== 消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback 。==
我們將利用這兩個 callback 控制消息的可靠性投遞
? exchange要持久化
? queue要持久化
? message要持久化
1.1配置confirm 確認模式與return 退回模式
# 舊版本 開啟 confirm 確認模式 # publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: true1.2創建用于測試消息的可靠投遞的交換機與隊列
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*消息的可靠投遞創建測試交換機與隊列測試交換機可靠性記得在application.yml中配置#開啟 confirm 確認模式 設置為默認的自動確認模式publisher-confirm-type: none*/ @Configuration public class RabbitMQConfigConfirmAndReturn {//創建交換機@Beanpublic Exchange exchangeConfirm(){//創建一個Direct:定向,把消息交給符合指定routing key 的隊列的交換機return ExchangeBuilder.directExchange("test_Exchange_Confirm").build();}//創建一個隊列@Beanpublic Queue queueConfirm(){//創建一個隊列而且是持久的return QueueBuilder.durable("test_Queue_Confirm").build();}// 隊列和交換機綁定關系 Binding/*1. 指定哪個隊列2. 指定哪個交換機3. routing key*/@Beanpublic Binding bindingConfirm(@Qualifier("exchangeConfirm") Exchange exchange,@Qualifier("queueConfirm") Queue queue){//把隊列綁定在交換機上指定routingKey沒有參數return BindingBuilder.bind(queue).to(exchange).with("testConfirm").noargs();}// return 退回模式//使用test_Exchange_Confirm這個交換機}1.3 ProducerTest測試類中編寫測試confirm與return測試方法
1.3.1首先在測試類中注入RabbitTemplate
@SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate; }1.3.2測試方法
/*** 確認模式: 該模式是來校驗消息是否發送成功到交換機中* 步驟:* 1.確認開啟: publisher-confirm-type: none* 2.在rabbitTemplate定義一個confirmCallBack回調函數*/@Testpublic void testConfirm(){//使用rabbitTemplate的確認回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 相關的配置信息* @param ack 代表了Exchange交換機是否收到了消息,true表示收到了消息,false表示交換機沒有收到消息* @param cause 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("發送消息后,回調方法執行了~~~");if (ack){System.out.println("發送消息成功,啟動成功方案:"+cause);}else {System.out.println("發送消息失敗,啟動失敗方案:"+cause);}}});//發送消息,假設寫錯交換機的名稱,肯定會發送到Exchange失敗,就會執行我們的confirmCallBack回調方法rabbitTemplate.convertAndSend("test_Exchange_Confirm","testConfirm","測試Confirm確認模式~~~");}/*** 回退模式: 該模式是用來校驗該消息是否從Exchange交換機成功路由到了queue隊列中* 當Exchange路由到queue失敗后,就會執行這個ReturnCallBack方法** 步驟:* 1.開啟回退模式: publisher-returns: true* 2.設置ReturnCallBack* 3,設置Exchange處理的消息的模式* 1.如果消息沒有路由到Queue中,則丟棄消息(默認)* 2.如果消息沒有路由到Queue中,返回給消息到發送方的ReturnCallBack方法*/@Testpublic void testReturn() {//設置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/**** @param message 消息對象* @param replyCode 錯誤碼* @param replyText 錯誤信息* @param exchange 交換機* @param routingKey 路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//當我們的消息發送從Exchange交換機發送到Queue錯誤后就會執行這個回調方法System.out.println("ReturnCallBack 執行了~~~");System.out.println(message);System.out.println(replyCode);System.out.println(replyText);System.out.println(exchange);System.out.println(routingKey);}});//發送消息,/*測試1:使用正確的Exchange與routingKey執行成功,不會執行我們的ReturnCallBack回退方法測試2:使用正確的Exchange與錯誤的不存在的routingKey,就會執行我們的ReturnCallBack回退方法*/rabbitTemplate.convertAndSend("test_Exchange_Confirm", "testConfirm111", "testConfirm~~~發送消息,測試回退模式");}1.3.3 測試消息可靠投遞
confirm測試
return測試
2.Consumer Ack
ack指Acknowledge,確認。 表示消費端收到消息后的確認方式。
有三種確認方式:
? 自動確認:acknowledge=“none”
? 手動確認:acknowledge=“manual”
? 根據異常情況確認:acknowledge=“auto”,(這種方式使用麻煩,不作講解)
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用channel.basicNack()方法,讓其自動重新發送消息。
2.1 消費端application.yml中配置
需要開啟手動簽收消息
listener:# RabbitMQ模式使用simple simple支持事務的simple:# Consumer ACK機制:設置為手動簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費端每次向MQ拉取最大一條消息2.2 在消費端創建監聽類
在方法上使用下面的注解,監聽的隊列
@RabbitListener(queues = “隊列名稱”)
下面的代碼監聽的是我們上面測試confirm的隊列
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK機制:默認自動簽收* 1. 設置手動簽收。acknowledge="manual"* 2. 讓監聽器類實現ChannelAwareMessageListener接口* 3. 如果消息成功處理,則調用channel的 basicAck()簽收* 4. 如果消息處理失敗,則調用channel的basicNack()拒絕簽收,broker重新發送給consumer*/ @Component public class AckListener {@RabbitListener(queues = "test_Queue_Confirm")public void testAck(Message message, Channel channel) throws IOException {//得到消息的唯一deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();//模擬接收到消息消費的邏輯try{//接收到消息進行消費System.out.println(new String(message.getBody()));System.out.println("消息到了ACK機制中~~~");//模擬執行邏輯錯誤 // int i = 1/0;//手動簽收消息/*deliveryTag:表示收到的消息的參數標簽(消息的唯一id)第二個參數:是否簽收多條消息(批量簽收消息)*/channel.basicAck(deliveryTag,true);}catch (Exception e){//當我們上面的邏輯出現錯誤,就不會簽收消息,我們在catch中就執行拒絕簽收System.out.println("消費邏輯出現異常~~~消息被Ack機制重回隊列");//拒絕簽收/*第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue的尾部,broker會重新發送該消息給消費端,false為丟棄改消息,若設置了死信隊列,就會交給死信隊列*/channel.basicNack(deliveryTag,true,false);}}}2.3 測試ACK
啟動主啟動:ConsumerSpringbootApplication
在提供方發送消息
在消費方查看消息被消費
3.TTL 全稱 Time To Live(存活時間/過期時間)
? TTL 全稱 Time To Live(存活時間/過期時間)。
? 當消息到達存活時間后,還沒有被消費,會被自動清除。
? RabbitMQ可以對消息設置過期時間,也可以對整個隊列(Queue)設置過期時間。
? 設置隊列過期時間使用參數:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統一過期。
? 設置消息過期時間使用參數:expiration。單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
? 如果兩者都進行了設置,以時間短的為準。
3.1 在提供方編寫TTL的交換機與隊列的創建代碼
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* 測試RabbitMQ的TTL*/ @Configuration public class RabbitMQConfigTTL {//創建交換機@Beanpublic Exchange exchangeTtl(){//創建一個Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列 的交換機return ExchangeBuilder.topicExchange("test_Exchange_TTL").build();}//創建隊列@Beanpublic Queue queueTtl(){//創建一個隊列,設置消息過期時間為10秒return QueueBuilder.durable("test_Queue_TTL").withArgument("x-message-ttl",10000).build();}//綁定交換機與隊列@Beanpublic Binding bindingTtl(@Qualifier("exchangeTtl") Exchange exchange, @Qualifier("queueTtl") Queue queue){//將隊列綁定在topic通配符交換機上設置路由規則routingKey,沒有參數return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();} }3.2 提供方測試代碼編寫
/*** TTL:過期時間* 1. 隊列統一過期** 2. 消息單獨過期* 結果:* 如果設置了消息的過期時間,也設置了隊列的過期時間,它以時間短的為準。* 隊列過期后,會將隊列所有消息全部移除。* 消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除掉)*/@Testpublic void testTTL(){//* 1. 隊列統一過期//發送10條消息,不去消費,查看web控制臺10秒后這10條消息是否會被丟棄 // for (int i = 0; i < 10; i++) { // //調用方法 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.xf","測試TTL超時時間隊列消息發送~~~"+i); // }//* 2. 消息單獨過期// 消息后處理對象,設置一些消息的參數信息,發送消息的時候傳遞該參數,那么這些消息就會具有該參數//該對象是一個接口,使用匿名類部內來創建實現類MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {//設置發送消息的參數@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//設置消息過期時間為5秒return message;}};//再次發生一條消息,使用我們設置好的消息參數對消息進行封裝//發送成功后,去看我們的隊列中的這條消息是否是5秒過期,因為我們這個消息是在隊列的頂端,等待被消費,而且過期時間短于隊列統一時間,所以優先我們這單條消息過期時間 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.fs","我被使用了消息參數,5秒后過期~~~",messagePostProcessor);//我們再次極端的測試,讓我們這條消息不在隊列的頂端//這條i==5的消息設置5秒過期,但是他在隊列的中間,5秒后已經過期,但是不會被隊列移除掉,當隊列統一的過期時間到了,就會隨著統一被隊列丟棄 或者交給死信交換機//因為隊列只會移除隊列頂端的過期消息,例如當有消費者來消費這10條消息后,但是i=5這條消息//已經過期,當消費到這條消息時,它就在隊列的頂端,就會判斷該消息是否過期,//若過期,者就會移除,或者交給 死信交換機//不會發送給消費者消費的for (int i = 0; i < 10; i++) {if (i == 5) {//消息單獨過期rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我被使用了消息參數,5秒后過期~~~而且在隊列的中間,我會不會5秒后過期呢?", messagePostProcessor);} else {//不過期的消息rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我發送了消息....");}}}3.3 測試發送,查詢queue隊列中的消息存活時間
3.3.1 測試 隊列統一過期
將1. 隊列統一過期這段代碼注釋放開,把其余代碼注釋,然后點擊運行
3.3.2 測試 消息單獨過期
使用這個類MessagePostProcessor來封裝我們發生消息的屬性參數
3.3.3 測試 ,讓我們這條消息不在隊列的頂端
4.死信隊列 DLX 。Dead Letter Exchange(死信交換機)
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以
被重新發送到另一個交換機,這個交換機就是DLX。
消息成為死信的三種情況:
隊列綁定死信交換機:
給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key
4.1 創建用于測試死信隊列的交換機與隊列
死信隊列:
1. 聲明正常的隊列(test_queue_dlx)和正常交換機(test_exchange_dlx)
2. 聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)
3. 正常隊列綁定死信交換機
設置兩個參數:
* x-dead-letter-exchange:死信交換機名稱
* x-dead-letter-routing-key:發送給死信交換機的routingkey
4.2 編寫測試類發送測試死信消息
* 發送測試死信消息:* 1. 過期時間* 2. 長度限制* 3. 消息拒收 /*** 發送測試死信消息:* 1. 過期時間* 2. 長度限制* 3. 消息拒收*/@Testpublic void testDlx() throws InterruptedException {//測試過期時間,死信消息,首先發送給了正常的交換機,交換機路由到正常的隊列,然后該隊列的消息由于設置了10秒過期,10秒內沒有被消費// 過期后就交給死信交換機,然后由死信交換機路由到死信隊列,然后被消費掉 // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了一條10秒后就過期的消息~~~");//測試隊列長度,當一次性發送超過隊列長度的消息,隊列就會將多余的消息交給死信交換機//由于我們創建隊列的時候,改隊列的長度為10,那么就有10 條消息被第一時間交給死信交換機,然后在等10秒,10秒后隊列中的10條消息沒有被消費,也會交給死信交換機//由執行控制臺結果得知,隊列是先進先出的原則先進的0-9會被后進的10-19擠出來,所以0-9先變成死信消息,而10-19是10秒過期后未被消費成的死信消息 // for (int i = 0; i < 20; i++) { // Thread.sleep(10); // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了多條10秒后就過期的消息~~~"+i); // }//測試消費端拒收消息,拒收的消息也不返回發送的隊列,就會變成死信消息,就交給死信交換機處理rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了消費端出錯不消費的消息~~~");}4.3 編寫消費端的監聽隊列類 DlxListener 與 TestDlxListener
DlxListener
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/* 監聽死信隊列中的消息*/ @Component public class DlxListener {//監聽死信隊列@RabbitListener(queues = "queue_dlx")public void testDlx(Message message, Channel channel) throws IOException {//得到消息唯一標識long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費死信隊列中的消息System.out.println(new String(message.getBody()));//手動關閉channel.basicAck(deliveryTag,true);}catch (Exception e){//上面代碼邏輯出現錯誤e.printStackTrace();//拒絕接收,從新發送channel.basicNack(deliveryTag,true,true);}} }TestDlxListener
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*監聽正常隊列的消息,然后異常拒收,也不返回給發送隊列,使消息成為死信消息,交給死信交換機*/ @Component public class TestDlxListener {@RabbitListener(queues = "queue_Normal_DLX")public void testDlxListener(Message message, Channel channel) throws IOException {//得到笑嘻嘻唯一標識long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//消費消息//模擬消費出錯int i = 1/0;//手動提交channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消費出現異常,拒絕簽收");//拒絕接收消息//出錯后將消息丟棄,不返回給發送隊列 拒絕簽收,不重回隊列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }4.4 測試
啟動消費端的主啟動
4.4.1 測試 過期時間 消息過期后交給死信交換機被消費掉
先將TestDlxListener類中的@Component注釋掉 ,將注釋掉的測試代碼打開,后面的代碼注釋掉,在run
4.4.2 測試隊列 長度限制
將測試長度限制的代碼放開,其余代碼注釋,點擊run
4.4.3 消息拒收
模擬業務錯誤,啟動消費端主啟動
4.5 死信隊列小結
消息成為死信的三種情況:
3. 隊列消息長度到達限制;
4. 消費者拒接消費消息,并且不重回隊列;
5. 原隊列存在消息過期設置,消息到達超時時間未被消費;
5 延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
很可惜,在RabbitMQ中并未提供延遲隊列功能。
但是可以使用:TTL+死信隊列 組合實現延遲隊列的效果。
5.1 提供方創建用于測試延遲隊列的交換機與隊列
package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/* RabbitMQ是沒有實現延遲隊列的延遲隊列的正常queue是沒有消費者的,否則生產的消息會被立馬消費掉,就不會交給死信交換機,達不到延遲隊列效果但是我們可以通過使用TTL 加上(DLX)死信隊列組合實現延遲隊列的效果延遲隊列:1. 定義正常交換機(order_exchange)和隊列(order_queue)2. 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)3. 綁定,設置正常隊列過期時間為30分鐘*/ @Configuration public class RabbitMQDelayQueueConfig {//定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)@Beanpublic Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}//定義死信隊列@Beanpublic Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}//將死信交換機與死信隊列相互綁定@Beanpublic Binding orderBindingDlx(@Qualifier("orderExchangeDlx") Exchange exchange,@Qualifier("orderQueueDlx") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}//定義正常交換機(order_exchange)和隊列(order_queue)@Beanpublic Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}//定義隊列@Beanpublic Queue orderQueue(){return QueueBuilder.durable("order_queue").withArgument("x-dead-letter-exchange","order_exchange_dlx")//綁定死信交換機.withArgument("x-dead-letter-routing-key","dlx.order.xf")//綁定routingKey value路由規則dlx.order.#.withArgument("x-message-ttl",10000)//給這個隊列添加過期時間 測試就使用10秒過期時間.build();}//將正常交換機與隊列相互綁定@Beanpublic Binding orderBinding(@Qualifier("orderExchange") Exchange exchange,@Qualifier("orderQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();} }5.2 消費端編寫監聽測試延遲隊列的隊列
package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;/* 測試 延遲隊列效果實現 消費死信隊列中的消息*/ @Component public class OrderListener implements ChannelAwareMessageListener {/*監聽死信隊列的消息,并消費*/@RabbitListener(queues = "order_queue_dlx")public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收轉換消息System.out.println(new String(message.getBody()));//2. 模擬處理業務邏輯System.out.println("處理業務邏輯...");System.out.println("根據訂單id查詢其狀態...");System.out.println("判斷狀態是否為支付成功");System.out.println("未支付,取消訂單,回滾庫存....");//3. 手動簽收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();System.out.println("出現異常,拒絕接受");//4.拒絕簽收,不重回隊列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }5.3 編寫測試代碼,發送消息
/* 發送消息到隊列中,消息10秒到期,然后消費端監聽死信隊列,并消費*/@Testpublic void testDelay() throws InterruptedException {//1.發送訂單消息。 將來是在訂單系統中,下單成功后,發送消息rabbitTemplate.convertAndSend("order_exchange", "order.msg", "訂單信息:id=1,time=2019年8月17日16:41:47");//2.打印倒計時10秒,模擬消息等待10秒后消息過期后,在消費端消費死信for (int i = 10; i > 0 ; i--) {System.out.println(i+"...");Thread.sleep(1000);}} }5.4 測試
啟動消費端主啟動
run測試方法
6 日志與監控
RabbitMQ默認日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務節點名稱、cookie的hash值、
RabbitMQ配置文件地址、內存限制、磁盤限制、默認賬戶guest的創建以及權限配置等等。
6.1 命令
7 消息追蹤
在使用任何消息中間件的過程中,難免會出現某條消息異常丟失的情況。對于RabbitMQ而言,可能
是因為生產者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認機制;也
有可能是因為交換器與隊列之間不同的轉發策略;甚至是交換器并沒有與任何隊列進行綁定,生產者
又不感知或者沒有采取相應的措施;另外RabbitMQ本身的集群策略也可能導致消息的丟失。這個時
候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協助開發和運維人員進行問題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現消息追蹤。
7.1 消息追蹤-Firehose
firehose的機制是將生產者投遞給rabbitmq的消息,rabbitmq投遞給消費者的消息按照指定的格式
發送到默認的exchange上。這個默認的exchange的名稱為amq.rabbitmq.trace,它是一個topic類
型的exchange。發送到這個exchange上的消息的routing key為 publish.exchangename 和
deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別
對應生產者投遞到exchange的消息,和消費者從queue上獲取的消息。
注意:打開 trace 會影響消息寫入功能,適當打開后請關閉。
rabbitmqctl trace_on:開啟Firehose命令
rabbitmqctl trace_off:關閉Firehose命令
7.2 消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實現上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一
層GUI的包裝,更容易使用和管理。
啟用插件:rabbitmq-plugins enable rabbitmq_tracing
8 消息可靠性保障
100%確保消息發送成功
8.1 消息可靠性保障–消息補償
8.2 消息冪等性保障–樂觀鎖機制
冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果。也就是說,其任
意多次執行對資源本身所產生的影響均與一次執行的影響相同。
在MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。
總結
以上是生活随笔為你收集整理的RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ,RabbitMQ 的工
- 下一篇: ElasticSearch,docker