日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringACK对RabbitMQ消息的确认(消费)

發布時間:2023/12/31 javascript 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringACK对RabbitMQ消息的确认(消费) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

SpringAMQP對RabbitMQ消息的確認(消費)

之前已經簡單介紹了基本是從發送方去確認的,我們需要在配置文件當中開啟發送方確認模式,共育兩種,一種是相對于交換機一個是相對于隊列。

本次的介紹是基于消費者對消息的確認,也就是基本的邏輯是消費者對消息處理的確認。

基本上生產者這邊的代碼是不需要去改變的,但是我們需要讓消費者去正確的人發送到消息。我們按照什么形式都可以,確認與不確認都可以,因為本次主要是為了測試消費端對消息的處理確認。

首先生產者的配置和相關的代碼

spring: # profiles: # active: devrabbitmq:host: #遠程主機外網地址username: shabi #遠程用戶名password: #密碼virtual-host: shabi #虛擬機名稱port: 5672 #遠程主機端口名稱publisher-confirm-type: correlated #開啟確認模式publisher-returns: true

然后就是之前我們在測試類當中寫的一些發送的各種模式,包括一般的默認發送,以及發送者確認,以及發送者回執。
然后具體的配置類就是真不要進行了隊列和交換機的聲明和創建,然后進行了具體綁定。

package com.jgdabc.rabbitconfig;import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {//交換機public static final String Exchange_Name = "boot_rabbit_topic_ee";public static final String Queue_Name = "boot_rabbit_topic_qqq";@Bean("bootExchange") //交換機的創建public Exchange bootExchange(){return ExchangeBuilder.topicExchange(Exchange_Name).durable(true).build(); //綁定一個topic類型的交換機,持久化并構建}@Bean("bootQueue") //隊列的創建public Queue bootQueue(){return QueueBuilder.durable(Queue_Name).build();} // 隊列和交換機的綁定關系 // 哪個隊列 // 哪個交換機 // routing key // 這里不寫的話會按照方法名注入@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();}} package com.jgdabc;import com.jgdabc.rabbitconfig.RabbitConfig;import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith;import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; import org.springframework.test.context.junit4.SpringRunner;import java.util.*; import java.util.stream.IntStream; @Slf4j @SpringBootTest @RunWith(SpringRunner.class) public class DemoApplicationTests {// 注入RabbitTemplate@Autowiredprivate RabbitTemplate template;@Testpublic void testSend() {template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");}/*** 在yml配置文件當中開啟去人模式* 在RabbitTemplate定義ConfirmCallBack回調函數*/@Testpublic void testConfirm() {//定義回調template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(b);System.out.println("confirm 方法被執行了");if (!b) {//接收成功System.out.println("消息成功接收");} else {System.out.println("消息接受失敗," + b);}}});//發送一條消息template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "你好,我的小寶貝");} // 回退模式,當消息發送給Exchange后,Exchange路由到Queue失敗后才會執行ReturnCallBack/*** 回退模式* 1:在yml文件當中開啟回退模式* 2:設置ReturnCallBack* 3:設置Exchange處理消息的模式* <1:如果消息沒有路由到Queue,那么丟棄掉消息(默認)* <2:如果路由沒有回退到Queue,返回給消息發送方*/@Testpublic void testReturn() { // 設置交換機處理消息的模式template.setMandatory(true);//設置為true交換機會將路由到隊列失敗的消息再返回給發送者template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息對象:" + returnedMessage.getMessage());System.out.println("錯誤碼:" + returnedMessage.getReplyCode());System.out.println("錯誤信息:" + returnedMessage.getReplyText());System.out.println("交換機:" + returnedMessage.getExchange());System.out.println("路由鍵:" + returnedMessage.getRoutingKey());System.out.println("return執行了...");}});template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi");}}

然后是這次主要介紹的消費端。

先看配置

spring:rabbitmq:host: username: password: virtual-host:port: 5672 # publisher-confirm-type: correlated # publisher-returns: true # 開啟ack也就是手動消息確認listener: # 設置手動確認simple:acknowledge-mode: manual

具體的類,

package com.jgdabc.boot_rabbit_consumer;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.List;/*** consumer ack 機制* 設置手動簽收,acknowledge = “manual”* 如果消息成功處理,則調用channel的basicAck簽收* // * 如果消息處理失敗,則調用channel的basicNack拒絕簽收,broker重新發送給consumer*/ @Component public class ConsumerSpringbootApplication implements ChannelAwareMessageListener {@RabbitListener(queues = "boot_rabbit_topic_qqq") //指定要消費消息的隊public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("接收轉換消息:" + new String(message.getBody())); // 手動簽收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);} // 第二個參數代表運行多條消息被簽收 // 拒絕簽收,第三個參數重回隊列,如果設置為true,則消息重新回到隊列}public void onMessage(Message message) {// System.out.println(message);}}

這個方法具體沒有用,之所以寫上,是因為我實現上邊那個類的時候,如果不實現這個方法的話,那么啟動就會報錯。所以就寫上了。

然后主要在說明一些參數

long deliveryTag = message.getMessageProperties().getDeliveryTag();
message.getMessageProperties ().getMessageId () 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經被消費者消費過了,如果已經消費則取消再次消費。

下面這里加了一個異常的捕獲,因為可能消費者這個處理消息出錯,所以進行了異常的捕獲。首先一定是接收了具體的消息。然后會進行一個簽收

channel.basicAck (long deliveryTag, boolean multiple)為消息確認,參數1:消息的id;參數2:是否批量應答。

basic.nack方法為不確認deliveryTag對應的消息,第二個參數是否應用于多消息,第三個參數是否requeue,與basic.reject區別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。

try {System.out.println("接收轉換消息:" + new String(message.getBody())); // 手動簽收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);}

這里只是列舉一些方法的使用,當然還有其他的方法,后面慢慢來熟悉好了。打開這個管理面板,可以看到沒有隊列,這里提前已經刪除掉之前的創建好的隊列和交換機了,為的是為了是運行展示后的效果比較明顯一些。
交換機和隊列都是可以在程序中創建和綁定的。

現在我們在生產者測試類去生產一條消息。可以隨便去用一個方法就可以了。
我們就運行這個方法

因為沒有做錯誤,所以不會有錯誤信息輸出的。

現在我們去面板看,可以看到這里就自動創建出來隊列和生產了一條消息,當然交換機的創建和隊列的綁定也是執行了。


現在我們在消費者去消費,執行的話,我們就去執行啟動類就好。

因為我們這個類加上了這個注解,其實就是已經實例化給spring了。表明了已經成為spring的一個組件,所以直接去啟動啟動運行類就好了。

你看這里就接收到消息了,并且會處于一個持續運行的等待過程。

同時消費處理成功驗證。

現在我們可以去讓程序出錯,來驗證消息處理失敗情況。
我們在簽收之前讓代碼出一個錯。

哦對了,這個異常是算數異常,我們之前捕獲一個大的異常算了。

下面那段改成這樣。

現在重新開始之前的步驟。然后這里器是會一直打印這段話,主要是因為我們設置basic.nack方法為不確認deliveryTag對應的消息,第二個參數是否應用于多消息,第三個參數是否requeue。我們這里出現異常,第二個參數為true,代表不確認,第三個代表重新讓它回到隊列,設置為true該行消息重新回到隊列,但是我們這里會持續接收進行接收消費,于是來來回回就形成了死循環。


同時驗證我們這里設置的重回隊列確實生效。

大概就是這樣的一個模式,當熱這種處理模式并不是合適的,主要是舉個例子,其他的方法處理模式順著這個模板來就行了。

主要是為了忘記后好回顧,必要的時候直接就地取材。

總結

以上是生活随笔為你收集整理的SpringACK对RabbitMQ消息的确认(消费)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。