日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)

發布時間:2025/3/19 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、首先是rabbitmq的配置文件:

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"><!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都沒有,要用跟jar包匹配的版本 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><rabbit:connection-factory id="connectionFactory"host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"publisher-confirms="true" /><rabbit:admin connection-factory="connectionFactory" /><!-- 給模板指定轉換器 --><!-- mandatory必須設置true,return callback才生效 --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener" mandatory="true" /><rabbit:queue name="CONFIRM_TEST" /><rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" ><rabbit:bindings><rabbit:binding queue="CONFIRM_TEST" /></rabbit:bindings></rabbit:direct-exchange><!-- 配置consumer, 監聽的類和queue的對應關系 --><rabbit:listener-containerconnection-factory="connectionFactory" acknowledge="manual" ><rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" /></rabbit:listener-container></beans>

2、發送方:

import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;@Service("publishService") public class PublishService {@Autowired private AmqpTemplate amqpTemplate; public void send(String exchange, String routingKey, Object message) { amqpTemplate.convertAndSend(exchange, routingKey, message);} }

3、消費方:

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;@Service("receiveConfirmTestListener") public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { @Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch(Exception e){e.printStackTrace();//TODO 業務處理channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);}} }

4、確認后回調方:

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service;@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);} }

5、失敗后return回調:

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.stereotype.Service;@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);} }

6、測試類:

import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.dingcheng.confirms.publish.PublishService; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:application-context.xml"}) public class TestConfirm { @Autowired private PublishService publishService; private static String exChange = "DIRECT_EX";@Test public void test1() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test1---message:"+message);//exchange,queue 都正確,confirm被回調, ack=truepublishService.send(exChange,"CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test2() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test2---message:"+message);//exchange 錯誤,queue 正確,confirm被回調, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test3() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test3---message:"+message);//exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTEpublishService.send(exChange,"",message); // Thread.sleep(1000); } @Test public void test4() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test4---message:"+message);//exchange 錯誤,queue 錯誤,confirm被回調, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} }

7、測試結果:

test1---message:currentTime:1483786948506 test2---message:currentTime:1483786948532 consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506 test3---message:currentTime:1483786948536 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) confirm--:correlationData:null,ack:false,cause:Channel closed by application [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey: confirm--:correlationData:null,ack:true,cause:null test4---message:currentTime:1483786948546 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)

8、總結如下:

如果消息沒有到exchange,則confirm回調,ack=false

如果消息到達exchange,則confirm回調,ack=true

exchange到queue成功,則不回調return

exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)

備注:需要說明,spring-rabbit和原生的rabbit-client ,表現是不一樣的。測試的時候,原生的client,exchange錯誤的話,直接就報錯了,是不會到confirmListener和returnListener的

?

源碼地址:https://github.com/qq315737546/spring-rabbit

全文地址請點擊:https://blog.csdn.net/qq315737546/article/details/54176560

總結

以上是生活随笔為你收集整理的使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。