日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列

發布時間:2024/4/15 javascript 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

搭建SpringBoot項目,用于演示

springboot版本

<!-- spring boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><!-- import 導入父工程的配置--><scope>import</scope></dependency>

消費與提供方的pom.xml

<dependencies><!-- spring-boot-starter-web spring-boot-starter-actuator綁定在一塊 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

提供端的application.yml

# 配置RabbitMQ的基本信息 ip 端口 username password 虛擬機.. spring:rabbitmq:host: 192.168.93.132 # ipport: 5672username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開啟 confirm 確認模式 # publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: true

消費端的application.yml

spring:rabbitmq:host: 192.168.93.132 #主機ipport: 5672 #端口username: xiaofupassword: xiaofuvirtual-host: /springboot# 舊版本 開啟 confirm 確認模式# publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: truelistener:# RabbitMQ模式使用simple simple支持事務的simple:# Consumer ACK機制:設置為手動簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費端每次向MQ拉取最大一條消息# direct 是不支持事務的 # direct: # acknowledge-mode: manual # ACK機制:設置為手動簽收 # retry: # enabled: true # 是否支持重試 # max-attempts: 3 # 重試機制,3次

1.消息的可靠投遞

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提
供了兩種方式用來控制消息的投遞可靠性模式。
? confirm 確認模式
? return 退回模式
rabbitmq 整個消息投遞的路徑為:
producer—>rabbitmq broker—>exchange—>queue—>consumer
? 消息從 producer 到 exchange 則會返回一個 confirmCallback 。
?== 消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback 。==
我們將利用這兩個 callback 控制消息的可靠性投遞

  • 持久化
    ? exchange要持久化
    ? queue要持久化
    ? message要持久化
  • 生產方確認Confirm
  • 消費方確認Ack
  • Broker高可用
  • 1.1配置confirm 確認模式與return 退回模式

    # 舊版本 開啟 confirm 確認模式 # publisher-confirms: true# 新版的開啟 confirm 確認模式publisher-confirm-type: correlated# 開啟 return 退回模式publisher-returns: true

    1.2創建用于測試消息的可靠投遞的交換機與隊列

    package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /*消息的可靠投遞創建測試交換機與隊列測試交換機可靠性記得在application.yml中配置#開啟 confirm 確認模式 設置為默認的自動確認模式publisher-confirm-type: none*/ @Configuration public class RabbitMQConfigConfirmAndReturn {//創建交換機@Beanpublic Exchange exchangeConfirm(){//創建一個Direct:定向,把消息交給符合指定routing key 的隊列的交換機return ExchangeBuilder.directExchange("test_Exchange_Confirm").build();}//創建一個隊列@Beanpublic Queue queueConfirm(){//創建一個隊列而且是持久的return QueueBuilder.durable("test_Queue_Confirm").build();}// 隊列和交換機綁定關系 Binding/*1. 指定哪個隊列2. 指定哪個交換機3. routing key*/@Beanpublic Binding bindingConfirm(@Qualifier("exchangeConfirm") Exchange exchange,@Qualifier("queueConfirm") Queue queue){//把隊列綁定在交換機上指定routingKey沒有參數return BindingBuilder.bind(queue).to(exchange).with("testConfirm").noargs();}// return 退回模式//使用test_Exchange_Confirm這個交換機}

    1.3 ProducerTest測試類中編寫測試confirm與return測試方法

    1.3.1首先在測試類中注入RabbitTemplate

    @SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest {//1.注入RabbitTemplate@Autowiredprivate RabbitTemplate rabbitTemplate; }

    1.3.2測試方法

    /*** 確認模式: 該模式是來校驗消息是否發送成功到交換機中* 步驟:* 1.確認開啟: publisher-confirm-type: none* 2.在rabbitTemplate定義一個confirmCallBack回調函數*/@Testpublic void testConfirm(){//使用rabbitTemplate的確認回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 相關的配置信息* @param ack 代表了Exchange交換機是否收到了消息,true表示收到了消息,false表示交換機沒有收到消息* @param cause 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("發送消息后,回調方法執行了~~~");if (ack){System.out.println("發送消息成功,啟動成功方案:"+cause);}else {System.out.println("發送消息失敗,啟動失敗方案:"+cause);}}});//發送消息,假設寫錯交換機的名稱,肯定會發送到Exchange失敗,就會執行我們的confirmCallBack回調方法rabbitTemplate.convertAndSend("test_Exchange_Confirm","testConfirm","測試Confirm確認模式~~~");}/*** 回退模式: 該模式是用來校驗該消息是否從Exchange交換機成功路由到了queue隊列中* 當Exchange路由到queue失敗后,就會執行這個ReturnCallBack方法** 步驟:* 1.開啟回退模式: publisher-returns: true* 2.設置ReturnCallBack* 3,設置Exchange處理的消息的模式* 1.如果消息沒有路由到Queue中,則丟棄消息(默認)* 2.如果消息沒有路由到Queue中,返回給消息到發送方的ReturnCallBack方法*/@Testpublic void testReturn() {//設置ReturnCallbackrabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/**** @param message 消息對象* @param replyCode 錯誤碼* @param replyText 錯誤信息* @param exchange 交換機* @param routingKey 路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//當我們的消息發送從Exchange交換機發送到Queue錯誤后就會執行這個回調方法System.out.println("ReturnCallBack 執行了~~~");System.out.println(message);System.out.println(replyCode);System.out.println(replyText);System.out.println(exchange);System.out.println(routingKey);}});//發送消息,/*測試1:使用正確的Exchange與routingKey執行成功,不會執行我們的ReturnCallBack回退方法測試2:使用正確的Exchange與錯誤的不存在的routingKey,就會執行我們的ReturnCallBack回退方法*/rabbitTemplate.convertAndSend("test_Exchange_Confirm", "testConfirm111", "testConfirm~~~發送消息,測試回退模式");}

    1.3.3 測試消息可靠投遞

    confirm測試

    return測試

    2.Consumer Ack

    ack指Acknowledge,確認。 表示消費端收到消息后的確認方式。

    有三種確認方式:
    ? 自動確認:acknowledge=“none”
    ? 手動確認:acknowledge=“manual”
    ? 根據異常情況確認:acknowledge=“auto”,(這種方式使用麻煩,不作講解)

    其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從 RabbitMQ 的消息緩存中移除。但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失。如果設置了手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用channel.basicNack()方法,讓其自動重新發送消息。

    2.1 消費端application.yml中配置

    需要開啟手動簽收消息

    listener:# RabbitMQ模式使用simple simple支持事務的simple:# Consumer ACK機制:設置為手動簽收acknowledge-mode: manualprefetch: 1 # 限流,配置1 表示消費端每次向MQ拉取最大一條消息

    2.2 在消費端創建監聽類

    在方法上使用下面的注解,監聽的隊列
    @RabbitListener(queues = “隊列名稱”)

    下面的代碼監聽的是我們上面測試confirm的隊列

    package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*** Consumer ACK機制:默認自動簽收* 1. 設置手動簽收。acknowledge="manual"* 2. 讓監聽器類實現ChannelAwareMessageListener接口* 3. 如果消息成功處理,則調用channel的 basicAck()簽收* 4. 如果消息處理失敗,則調用channel的basicNack()拒絕簽收,broker重新發送給consumer*/ @Component public class AckListener {@RabbitListener(queues = "test_Queue_Confirm")public void testAck(Message message, Channel channel) throws IOException {//得到消息的唯一deliveryTaglong deliveryTag = message.getMessageProperties().getDeliveryTag();//模擬接收到消息消費的邏輯try{//接收到消息進行消費System.out.println(new String(message.getBody()));System.out.println("消息到了ACK機制中~~~");//模擬執行邏輯錯誤 // int i = 1/0;//手動簽收消息/*deliveryTag:表示收到的消息的參數標簽(消息的唯一id)第二個參數:是否簽收多條消息(批量簽收消息)*/channel.basicAck(deliveryTag,true);}catch (Exception e){//當我們上面的邏輯出現錯誤,就不會簽收消息,我們在catch中就執行拒絕簽收System.out.println("消費邏輯出現異常~~~消息被Ack機制重回隊列");//拒絕簽收/*第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue的尾部,broker會重新發送該消息給消費端,false為丟棄改消息,若設置了死信隊列,就會交給死信隊列*/channel.basicNack(deliveryTag,true,false);}}}

    2.3 測試ACK

    啟動主啟動:ConsumerSpringbootApplication
    在提供方發送消息
    在消費方查看消息被消費

    3.TTL 全稱 Time To Live(存活時間/過期時間)

    ? TTL 全稱 Time To Live(存活時間/過期時間)。
    ? 當消息到達存活時間后,還沒有被消費,會被自動清除。
    ? RabbitMQ可以對消息設置過期時間,也可以對整個隊列(Queue)設置過期時間。

    ? 設置隊列過期時間使用參數:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統一過期。
    ? 設置消息過期時間使用參數:expiration。單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
    ? 如果兩者都進行了設置,以時間短的為準。

    3.1 在提供方編寫TTL的交換機與隊列的創建代碼

    package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* 測試RabbitMQ的TTL*/ @Configuration public class RabbitMQConfigTTL {//創建交換機@Beanpublic Exchange exchangeTtl(){//創建一個Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列 的交換機return ExchangeBuilder.topicExchange("test_Exchange_TTL").build();}//創建隊列@Beanpublic Queue queueTtl(){//創建一個隊列,設置消息過期時間為10秒return QueueBuilder.durable("test_Queue_TTL").withArgument("x-message-ttl",10000).build();}//綁定交換機與隊列@Beanpublic Binding bindingTtl(@Qualifier("exchangeTtl") Exchange exchange, @Qualifier("queueTtl") Queue queue){//將隊列綁定在topic通配符交換機上設置路由規則routingKey,沒有參數return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();} }

    3.2 提供方測試代碼編寫

    /*** TTL:過期時間* 1. 隊列統一過期** 2. 消息單獨過期* 結果:* 如果設置了消息的過期時間,也設置了隊列的過期時間,它以時間短的為準。* 隊列過期后,會將隊列所有消息全部移除。* 消息過期后,只有消息在隊列頂端,才會判斷其是否過期(移除掉)*/@Testpublic void testTTL(){//* 1. 隊列統一過期//發送10條消息,不去消費,查看web控制臺10秒后這10條消息是否會被丟棄 // for (int i = 0; i < 10; i++) { // //調用方法 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.xf","測試TTL超時時間隊列消息發送~~~"+i); // }//* 2. 消息單獨過期// 消息后處理對象,設置一些消息的參數信息,發送消息的時候傳遞該參數,那么這些消息就會具有該參數//該對象是一個接口,使用匿名類部內來創建實現類MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {//設置發送消息的參數@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");//設置消息過期時間為5秒return message;}};//再次發生一條消息,使用我們設置好的消息參數對消息進行封裝//發送成功后,去看我們的隊列中的這條消息是否是5秒過期,因為我們這個消息是在隊列的頂端,等待被消費,而且過期時間短于隊列統一時間,所以優先我們這單條消息過期時間 // rabbitTemplate.convertAndSend("test_Exchange_TTL","ttl.hehe.fs","我被使用了消息參數,5秒后過期~~~",messagePostProcessor);//我們再次極端的測試,讓我們這條消息不在隊列的頂端//這條i==5的消息設置5秒過期,但是他在隊列的中間,5秒后已經過期,但是不會被隊列移除掉,當隊列統一的過期時間到了,就會隨著統一被隊列丟棄 或者交給死信交換機//因為隊列只會移除隊列頂端的過期消息,例如當有消費者來消費這10條消息后,但是i=5這條消息//已經過期,當消費到這條消息時,它就在隊列的頂端,就會判斷該消息是否過期,//若過期,者就會移除,或者交給 死信交換機//不會發送給消費者消費的for (int i = 0; i < 10; i++) {if (i == 5) {//消息單獨過期rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我被使用了消息參數,5秒后過期~~~而且在隊列的中間,我會不會5秒后過期呢?", messagePostProcessor);} else {//不過期的消息rabbitTemplate.convertAndSend("test_Exchange_TTL", "ttl.hehe.xf", "我發送了消息....");}}}

    3.3 測試發送,查詢queue隊列中的消息存活時間

    3.3.1 測試 隊列統一過期

    將1. 隊列統一過期這段代碼注釋放開,把其余代碼注釋,然后點擊運行

    3.3.2 測試 消息單獨過期

    使用這個類MessagePostProcessor來封裝我們發生消息的屬性參數

    3.3.3 測試 ,讓我們這條消息不在隊列的頂端

    4.死信隊列 DLX 。Dead Letter Exchange(死信交換機)

    死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以
    被重新發送到另一個交換機,這個交換機就是DLX。

    消息成為死信的三種情況:

  • 隊列消息長度到達限制;
  • 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;
  • 原隊列存在消息過期設置,消息到達超時時間未被消費;
  • 隊列綁定死信交換機:
    給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key

    4.1 創建用于測試死信隊列的交換機與隊列

    死信隊列:
    1. 聲明正常的隊列(test_queue_dlx)和正常交換機(test_exchange_dlx)
    2. 聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)
    3. 正常隊列綁定死信交換機
    設置兩個參數:
    * x-dead-letter-exchange:死信交換機名稱
    * x-dead-letter-routing-key:發送給死信交換機的routingkey

    package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;/* 死信隊列 死信隊列:1. 聲明正常的隊列(test_queue_dlx)和正常交換機(test_exchange_dlx)2. 聲明死信隊列(queue_dlx)和死信交換機(exchange_dlx)3. 正常隊列綁定死信交換機設置兩個參數:* x-dead-letter-exchange:死信交換機名稱* x-dead-letter-routing-key:發送給死信交換機的routingkey*/ @Configuration public class RabbitMQDeadMessageConfig {//創建自定義 死信交換機 邏輯認為是用來做死信服務的@Beanpublic Exchange exchangeDlx(){return ExchangeBuilder.topicExchange("exchange_del").build();}//創建自定義 死信隊列 邏輯認為是用來做死信服務的@Beanpublic Queue queueDlx(){return QueueBuilder.durable("queue_dlx").build();}//將自定義的死信隊列綁定在一塊@Beanpublic Binding bindingDlx(@Qualifier("exchangeDlx") Exchange exchange,@Qualifier("queueDlx") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.*").noargs();}//創建正常接收消息的交換機@Beanpublic Exchange exchangeNormalDlx(){return ExchangeBuilder.topicExchange("exchange_Normal_DLX").build();}//創建正常接收消息的隊列,綁定我們的死信交換機@Beanpublic Queue queueNormalDlx(){return QueueBuilder.durable("queue_Normal_DLX")//正常隊列的名稱.withArgument("x-dead-letter-exchange","exchange_del")//設置改隊列的死信交換機.withArgument("x-dead-letter-routing-key","dlx.xf")//設置該隊列的發送消息時指定的routingkey.withArgument("x-message-ttl",10000)//設置隊列中消息的過期時間.withArgument("x-max-length",10).build();//設置隊列的最大容量}// @Bean // public Queue queueNormalDlx(@Qualifier("exchangeDlx") Exchange exchange,@Qualifier("queueDlx") Queue queue){ // Map<String, Object> args = new HashMap<>(); // // set the queue with a dead letter feature // args.put("x-dead-letter-exchange", exchange);//設置該隊列的死信交換機 // args.put("x-dead-letter-routing-key", "dlx.xf");//設置該隊列的發送消息時指定的routingkey // args.put("x-message-ttl",10000);//設置隊列中消息的過期時間 // args.put("x-max-length",10);//設置隊列的最大容量 // return new Queue("queue_Normal_DLX", true, false, false, args); // }//將正常的交換機與隊列綁定@Beanpublic Binding bindingNormalDlx(@Qualifier("exchangeNormalDlx") Exchange exchange,@Qualifier("queueNormalDlx") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();} }

    4.2 編寫測試類發送測試死信消息

    * 發送測試死信消息:* 1. 過期時間* 2. 長度限制* 3. 消息拒收 /*** 發送測試死信消息:* 1. 過期時間* 2. 長度限制* 3. 消息拒收*/@Testpublic void testDlx() throws InterruptedException {//測試過期時間,死信消息,首先發送給了正常的交換機,交換機路由到正常的隊列,然后該隊列的消息由于設置了10秒過期,10秒內沒有被消費// 過期后就交給死信交換機,然后由死信交換機路由到死信隊列,然后被消費掉 // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了一條10秒后就過期的消息~~~");//測試隊列長度,當一次性發送超過隊列長度的消息,隊列就會將多余的消息交給死信交換機//由于我們創建隊列的時候,改隊列的長度為10,那么就有10 條消息被第一時間交給死信交換機,然后在等10秒,10秒后隊列中的10條消息沒有被消費,也會交給死信交換機//由執行控制臺結果得知,隊列是先進先出的原則先進的0-9會被后進的10-19擠出來,所以0-9先變成死信消息,而10-19是10秒過期后未被消費成的死信消息 // for (int i = 0; i < 20; i++) { // Thread.sleep(10); // rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了多條10秒后就過期的消息~~~"+i); // }//測試消費端拒收消息,拒收的消息也不返回發送的隊列,就會變成死信消息,就交給死信交換機處理rabbitTemplate.convertAndSend("exchange_Normal_DLX","test.dlx.xf","我發送了消費端出錯不消費的消息~~~");}

    4.3 編寫消費端的監聽隊列類 DlxListener 與 TestDlxListener

    DlxListener

    package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/* 監聽死信隊列中的消息*/ @Component public class DlxListener {//監聽死信隊列@RabbitListener(queues = "queue_dlx")public void testDlx(Message message, Channel channel) throws IOException {//得到消息唯一標識long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消費死信隊列中的消息System.out.println(new String(message.getBody()));//手動關閉channel.basicAck(deliveryTag,true);}catch (Exception e){//上面代碼邏輯出現錯誤e.printStackTrace();//拒絕接收,從新發送channel.basicNack(deliveryTag,true,true);}} }

    TestDlxListener

    package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;/*監聽正常隊列的消息,然后異常拒收,也不返回給發送隊列,使消息成為死信消息,交給死信交換機*/ @Component public class TestDlxListener {@RabbitListener(queues = "queue_Normal_DLX")public void testDlxListener(Message message, Channel channel) throws IOException {//得到笑嘻嘻唯一標識long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//消費消息//模擬消費出錯int i = 1/0;//手動提交channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消費出現異常,拒絕簽收");//拒絕接收消息//出錯后將消息丟棄,不返回給發送隊列 拒絕簽收,不重回隊列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }

    4.4 測試

    啟動消費端的主啟動

    4.4.1 測試 過期時間 消息過期后交給死信交換機被消費掉

    先將TestDlxListener類中的@Component注釋掉 ,將注釋掉的測試代碼打開,后面的代碼注釋掉,在run

    4.4.2 測試隊列 長度限制

    將測試長度限制的代碼放開,其余代碼注釋,點擊run


    4.4.3 消息拒收

    模擬業務錯誤,啟動消費端主啟動

    4.5 死信隊列小結

  • 死信交換機和死信隊列和普通的沒有區別
  • 當消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
  • 消息成為死信的三種情況:
    3. 隊列消息長度到達限制;
    4. 消費者拒接消費消息,并且不重回隊列;
    5. 原隊列存在消息過期設置,消息到達超時時間未被消費;

    5 延遲隊列

    延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
    很可惜,在RabbitMQ中并未提供延遲隊列功能。
    但是可以使用:TTL+死信隊列 組合實現延遲隊列的效果。

    5.1 提供方創建用于測試延遲隊列的交換機與隊列

    package com.fs.rabbitmq.config;import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/* RabbitMQ是沒有實現延遲隊列的延遲隊列的正常queue是沒有消費者的,否則生產的消息會被立馬消費掉,就不會交給死信交換機,達不到延遲隊列效果但是我們可以通過使用TTL 加上(DLX)死信隊列組合實現延遲隊列的效果延遲隊列:1. 定義正常交換機(order_exchange)和隊列(order_queue)2. 定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)3. 綁定,設置正常隊列過期時間為30分鐘*/ @Configuration public class RabbitMQDelayQueueConfig {//定義死信交換機(order_exchange_dlx)和隊列(order_queue_dlx)@Beanpublic Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}//定義死信隊列@Beanpublic Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}//將死信交換機與死信隊列相互綁定@Beanpublic Binding orderBindingDlx(@Qualifier("orderExchangeDlx") Exchange exchange,@Qualifier("orderQueueDlx") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}//定義正常交換機(order_exchange)和隊列(order_queue)@Beanpublic Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}//定義隊列@Beanpublic Queue orderQueue(){return QueueBuilder.durable("order_queue").withArgument("x-dead-letter-exchange","order_exchange_dlx")//綁定死信交換機.withArgument("x-dead-letter-routing-key","dlx.order.xf")//綁定routingKey value路由規則dlx.order.#.withArgument("x-message-ttl",10000)//給這個隊列添加過期時間 測試就使用10秒過期時間.build();}//將正常交換機與隊列相互綁定@Beanpublic Binding orderBinding(@Qualifier("orderExchange") Exchange exchange,@Qualifier("orderQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();} }

    5.2 消費端編寫監聽測試延遲隊列的隊列

    package com.fs.Queuelistener;import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;/* 測試 延遲隊列效果實現 消費死信隊列中的消息*/ @Component public class OrderListener implements ChannelAwareMessageListener {/*監聽死信隊列的消息,并消費*/@RabbitListener(queues = "order_queue_dlx")public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收轉換消息System.out.println(new String(message.getBody()));//2. 模擬處理業務邏輯System.out.println("處理業務邏輯...");System.out.println("根據訂單id查詢其狀態...");System.out.println("判斷狀態是否為支付成功");System.out.println("未支付,取消訂單,回滾庫存....");//3. 手動簽收channel.basicAck(deliveryTag,true);} catch (Exception e) {//e.printStackTrace();System.out.println("出現異常,拒絕接受");//4.拒絕簽收,不重回隊列 requeue=falsechannel.basicNack(deliveryTag,true,false);}} }

    5.3 編寫測試代碼,發送消息

    /* 發送消息到隊列中,消息10秒到期,然后消費端監聽死信隊列,并消費*/@Testpublic void testDelay() throws InterruptedException {//1.發送訂單消息。 將來是在訂單系統中,下單成功后,發送消息rabbitTemplate.convertAndSend("order_exchange", "order.msg", "訂單信息:id=1,time=2019年8月17日16:41:47");//2.打印倒計時10秒,模擬消息等待10秒后消息過期后,在消費端消費死信for (int i = 10; i > 0 ; i--) {System.out.println(i+"...");Thread.sleep(1000);}} }

    5.4 測試

    啟動消費端主啟動

    run測試方法

    6 日志與監控

    RabbitMQ默認日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
    日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務節點名稱、cookie的hash值、
    RabbitMQ配置文件地址、內存限制、磁盤限制、默認賬戶guest的創建以及權限配置等等。

    6.1 命令

    7 消息追蹤

    在使用任何消息中間件的過程中,難免會出現某條消息異常丟失的情況。對于RabbitMQ而言,可能
    是因為生產者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認機制;也
    有可能是因為交換器與隊列之間不同的轉發策略;甚至是交換器并沒有與任何隊列進行綁定,生產者
    又不感知或者沒有采取相應的措施;另外RabbitMQ本身的集群策略也可能導致消息的丟失。這個時
    候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協助開發和運維人員進行問題的定位。
    在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現消息追蹤。

    7.1 消息追蹤-Firehose

    firehose的機制是將生產者投遞給rabbitmq的消息,rabbitmq投遞給消費者的消息按照指定的格式
    發送到默認的exchange上。這個默認的exchange的名稱為amq.rabbitmq.trace,它是一個topic類
    型的exchange。發送到這個exchange上的消息的routing key為 publish.exchangename 和
    deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別
    對應生產者投遞到exchange的消息,和消費者從queue上獲取的消息。
    注意:打開 trace 會影響消息寫入功能,適當打開后請關閉。
    rabbitmqctl trace_on:開啟Firehose命令
    rabbitmqctl trace_off:關閉Firehose命令

    7.2 消息追蹤-rabbitmq_tracing

    rabbitmq_tracing和Firehose在實現上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一
    層GUI的包裝,更容易使用和管理。
    啟用插件:rabbitmq-plugins enable rabbitmq_tracing

    8 消息可靠性保障

    100%確保消息發送成功

    8.1 消息可靠性保障–消息補償

    8.2 消息冪等性保障–樂觀鎖機制

    冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果。也就是說,其任
    意多次執行對資源本身所產生的影響均與一次執行的影響相同。
    在MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。

    總結

    以上是生活随笔為你收集整理的RabbitMQ,Springboot整合RabbitMQ实现 消息可靠性投递,Consumer ACK,TTL,死信队列,使用TTL+死信队列=延迟队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。