javascript
SpringACK对RabbitMQ消息的确认(消费)
SpringAMQP對RabbitMQ消息的確認(消費)
之前已經簡單介紹了基本是從發送方去確認的,我們需要在配置文件當中開啟發送方確認模式,共育兩種,一種是相對于交換機一個是相對于隊列。
本次的介紹是基于消費者對消息的確認,也就是基本的邏輯是消費者對消息處理的確認。
基本上生產者這邊的代碼是不需要去改變的,但是我們需要讓消費者去正確的人發送到消息。我們按照什么形式都可以,確認與不確認都可以,因為本次主要是為了測試消費端對消息的處理確認。
首先生產者的配置和相關的代碼
spring: # profiles: # active: devrabbitmq:host: #遠程主機外網地址username: shabi #遠程用戶名password: #密碼virtual-host: shabi #虛擬機名稱port: 5672 #遠程主機端口名稱publisher-confirm-type: correlated #開啟確認模式publisher-returns: true然后就是之前我們在測試類當中寫的一些發送的各種模式,包括一般的默認發送,以及發送者確認,以及發送者回執。
然后具體的配置類就是真不要進行了隊列和交換機的聲明和創建,然后進行了具體綁定。
然后是這次主要介紹的消費端。
先看配置
spring:rabbitmq:host: username: password: virtual-host:port: 5672 # publisher-confirm-type: correlated # publisher-returns: true # 開啟ack也就是手動消息確認listener: # 設置手動確認simple:acknowledge-mode: manual具體的類,
package com.jgdabc.boot_rabbit_consumer;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.List;/*** consumer ack 機制* 設置手動簽收,acknowledge = “manual”* 如果消息成功處理,則調用channel的basicAck簽收* // * 如果消息處理失敗,則調用channel的basicNack拒絕簽收,broker重新發送給consumer*/ @Component public class ConsumerSpringbootApplication implements ChannelAwareMessageListener {@RabbitListener(queues = "boot_rabbit_topic_qqq") //指定要消費消息的隊public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("接收轉換消息:" + new String(message.getBody())); // 手動簽收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);} // 第二個參數代表運行多條消息被簽收 // 拒絕簽收,第三個參數重回隊列,如果設置為true,則消息重新回到隊列}public void onMessage(Message message) {// System.out.println(message);}}這個方法具體沒有用,之所以寫上,是因為我實現上邊那個類的時候,如果不實現這個方法的話,那么啟動就會報錯。所以就寫上了。
然后主要在說明一些參數
long deliveryTag = message.getMessageProperties().getDeliveryTag();
message.getMessageProperties ().getMessageId () 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經被消費者消費過了,如果已經消費則取消再次消費。
下面這里加了一個異常的捕獲,因為可能消費者這個處理消息出錯,所以進行了異常的捕獲。首先一定是接收了具體的消息。然后會進行一個簽收
channel.basicAck (long deliveryTag, boolean multiple)為消息確認,參數1:消息的id;參數2:是否批量應答。
basic.nack方法為不確認deliveryTag對應的消息,第二個參數是否應用于多消息,第三個參數是否requeue,與basic.reject區別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
try {System.out.println("接收轉換消息:" + new String(message.getBody())); // 手動簽收channel.basicAck(deliveryTag, true);} catch (IOException e) {channel.basicNack(deliveryTag, true, true);}這里只是列舉一些方法的使用,當然還有其他的方法,后面慢慢來熟悉好了。打開這個管理面板,可以看到沒有隊列,這里提前已經刪除掉之前的創建好的隊列和交換機了,為的是為了是運行展示后的效果比較明顯一些。
交換機和隊列都是可以在程序中創建和綁定的。
現在我們在生產者測試類去生產一條消息。可以隨便去用一個方法就可以了。
我們就運行這個方法
因為沒有做錯誤,所以不會有錯誤信息輸出的。
現在我們去面板看,可以看到這里就自動創建出來隊列和生產了一條消息,當然交換機的創建和隊列的綁定也是執行了。
現在我們在消費者去消費,執行的話,我們就去執行啟動類就好。
因為我們這個類加上了這個注解,其實就是已經實例化給spring了。表明了已經成為spring的一個組件,所以直接去啟動啟動運行類就好了。
你看這里就接收到消息了,并且會處于一個持續運行的等待過程。
同時消費處理成功驗證。
現在我們可以去讓程序出錯,來驗證消息處理失敗情況。
我們在簽收之前讓代碼出一個錯。
哦對了,這個異常是算數異常,我們之前捕獲一個大的異常算了。
下面那段改成這樣。
現在重新開始之前的步驟。然后這里器是會一直打印這段話,主要是因為我們設置basic.nack方法為不確認deliveryTag對應的消息,第二個參數是否應用于多消息,第三個參數是否requeue。我們這里出現異常,第二個參數為true,代表不確認,第三個代表重新讓它回到隊列,設置為true該行消息重新回到隊列,但是我們這里會持續接收進行接收消費,于是來來回回就形成了死循環。
同時驗證我們這里設置的重回隊列確實生效。
大概就是這樣的一個模式,當熱這種處理模式并不是合適的,主要是舉個例子,其他的方法處理模式順著這個模板來就行了。
主要是為了忘記后好回顧,必要的時候直接就地取材。
總結
以上是生活随笔為你收集整理的SpringACK对RabbitMQ消息的确认(消费)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第十章:MATLAB:矩阵分析(特征值与
- 下一篇: gradle idea java ssm