生产者消息确认
RabbitMQ提供了publisher confirm機制來避免消息發送到MQ過程中丟失。這種機制必須給每個消息指定一個唯一ID。消息發送到MQ以后,會返回一個結果給發送者,表示消息是否處理成功。
返回結果有兩種方式:
-
publisher-confirm,發送者確認
-
消息成功投遞到交換機,返回ack
-
消息未投遞到交換機,返回nack
-
-
publisher-return,發送者回執
-
消息投遞到交換機了,但是沒有路由到隊列。返回ACK,及路由失敗原因。
-
注意:
修改配置
首先,修改publisher服務中的application.yml文件,添加下面的內容:
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true說明:
-
publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
-
simple:同步等待confirm結果,直到超時
-
correlated:異步回調,定義ConfirmCallback,MQ返回結果時會回調這個ConfirmCallback
-
-
publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
-
template.mandatory:定義消息路由失敗時的策略。true,則調用ReturnCallback;false:則直接丟棄消息
定義Return回調
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在項目加載時配置:
修改publisher服務,添加一個:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration;@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 設置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投遞失敗,記錄日志log.info("消息發送失敗,應答碼{},原因{},交換機{},路由鍵{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有業務需要,可以重發消息});} }定義ConfirmCallback
ConfirmCallback可以在發送消息時指定,因為每個業務處理confirm成功或失敗的邏輯不一定相同。
在publisher服務的cn.leon.mq.spring.SpringAmqpTest類中,定義一個單元測試方法:
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息體String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封裝到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息發送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失敗log.error("消息發送失敗, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息發送異常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.發送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一會兒,等待ack回執Thread.sleep(2000); }總結