使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)
生活随笔
收集整理的這篇文章主要介紹了
使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、首先是rabbitmq的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"><!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都沒有,要用跟jar包匹配的版本 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><rabbit:connection-factory id="connectionFactory"host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"publisher-confirms="true" /><rabbit:admin connection-factory="connectionFactory" /><!-- 給模板指定轉換器 --><!-- mandatory必須設置true,return callback才生效 --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener" mandatory="true" /><rabbit:queue name="CONFIRM_TEST" /><rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" ><rabbit:bindings><rabbit:binding queue="CONFIRM_TEST" /></rabbit:bindings></rabbit:direct-exchange><!-- 配置consumer, 監聽的類和queue的對應關系 --><rabbit:listener-containerconnection-factory="connectionFactory" acknowledge="manual" ><rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" /></rabbit:listener-container></beans>2、發送方:
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;@Service("publishService") public class PublishService {@Autowired private AmqpTemplate amqpTemplate; public void send(String exchange, String routingKey, Object message) { amqpTemplate.convertAndSend(exchange, routingKey, message);} }3、消費方:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;@Service("receiveConfirmTestListener") public class ReceiveConfirmTestListener implements ChannelAwareMessageListener { @Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch(Exception e){e.printStackTrace();//TODO 業務處理channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);}} }4、確認后回調方:
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service;@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);} }5、失敗后return回調:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.stereotype.Service;@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);} }6、測試類:
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.dingcheng.confirms.publish.PublishService; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:application-context.xml"}) public class TestConfirm { @Autowired private PublishService publishService; private static String exChange = "DIRECT_EX";@Test public void test1() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test1---message:"+message);//exchange,queue 都正確,confirm被回調, ack=truepublishService.send(exChange,"CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test2() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test2---message:"+message);//exchange 錯誤,queue 正確,confirm被回調, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} @Test public void test3() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test3---message:"+message);//exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTEpublishService.send(exChange,"",message); // Thread.sleep(1000); } @Test public void test4() throws InterruptedException{ String message = "currentTime:"+System.currentTimeMillis();System.out.println("test4---message:"+message);//exchange 錯誤,queue 錯誤,confirm被回調, ack=falsepublishService.send(exChange+"NO","CONFIRM_TEST",message); Thread.sleep(1000);} }7、測試結果:
test1---message:currentTime:1483786948506 test2---message:currentTime:1483786948532 consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506 test3---message:currentTime:1483786948536 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) confirm--:correlationData:null,ack:false,cause:Channel closed by application [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey: confirm--:correlationData:null,ack:true,cause:null test4---message:currentTime:1483786948546 confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40) [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)8、總結如下:
如果消息沒有到exchange,則confirm回調,ack=false
如果消息到達exchange,則confirm回調,ack=true
exchange到queue成功,則不回調return
exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
備注:需要說明,spring-rabbit和原生的rabbit-client ,表現是不一樣的。測試的時候,原生的client,exchange錯誤的話,直接就報錯了,是不會到confirmListener和returnListener的
?
源碼地址:https://github.com/qq315737546/spring-rabbit
全文地址請點擊:https://blog.csdn.net/qq315737546/article/details/54176560
總結
以上是生活随笔為你收集整理的使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Qt之QSlider
- 下一篇: 洛谷 P2048 [NOI2010]超级