日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

学习Spring Boot:(二十六)使用 RabbitMQ 消息队列

發布時間:2025/3/12 javascript 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习Spring Boot:(二十六)使用 RabbitMQ 消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

前面學習了 RabbitMQ 基礎,現在主要記錄下學習 Spring Boot 整合 RabbitMQ ,調用它的 API ,以及中間使用的相關功能的記錄。

相關的可以去[我的博客/RabbitMQ]

正文

我這里測試都是使用的是 topic 交換器,Spring Boot 2.0.0, jdk 1.8

配置

Spring Boot 版本 2.0.0
在 pom.xml 文件中引入 AMQP 的依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

在系統配置文件中加入連接屬性

spring:application:name: RabbitMQ-Demorabbitmq:host: k.wuwii.comport: 5672username: kronchanpassword: 123456#virtual-host: testpublisher-confirms: true # 開啟確認消息是否到達交換器,需要設置 truepublisher-returns: true # 開啟確認消息是否到達隊列,需要設置 true

基本的使用

消費者

新增一個消費者類:

@Log public class MessageReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {byte[] body = message.getBody();log.info(">>>>>>> receive: " + new String(body));} finally {// 確認成功消費,否則消息會轉發給其他的消費者,或者進行重試channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} }
配置類

新增 RabbitMQ 的配置類,主要是對消費者的隊列,交換器,路由鍵的一些設置:

@Configuration public class RabbitMQConfig {public final static String QUEUE_NAME = "springboot.demo.test1";public final static String ROUTING_KEY = "route-key";public final static String EXCHANGES_NAME = "demo-exchanges";@Beanpublic Queue queue() {// 是否持久化boolean durable = true;// 僅創建者可以使用的私有隊列,斷開后自動刪除boolean exclusive = false;// 當所有消費客戶端連接斷開后,是否自動刪除隊列boolean autoDelete = false;return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);}/*** 設置交換器,這里我使用的是 topic exchange*/@Beanpublic TopicExchange exchange() {// 是否持久化boolean durable = true;// 當所有消費客戶端連接斷開后,是否自動刪除隊列boolean autoDelete = false;return new TopicExchange(EXCHANGES_NAME, durable, autoDelete);}/*** 綁定路由*/@Beanpublic Binding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(QUEUE_NAME);container.setMessageListener(receiver());//container.setMaxConcurrentConsumers(1);//container.setConcurrentConsumers(1); 默認為1//container.setExposeListenerChannel(true);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 設置為手動,默認為 AUTO,如果設置了手動應答 basicack,就要設置manualreturn container;}@Beanpublic MessageReceiver receiver() {return new MessageReceiver();}}
生產者
@Component public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);public void send() {// public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)// exchange: 交換機名稱// routingKey: 路由關鍵字// object: 發送的消息內容// correlationData:消息IDCorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());// ConfirmListener是當消息無法發送到Exchange被觸發,此時Ack為False,這時cause包含發送失敗的原因,例如exchange不存在時// 需要在系統配置文件中設置 publisher-confirms: trueif (!rabbitTemplate.isConfirmListener()) {rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info(">>>>>>> 消息id:{} 發送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 發送失敗", correlationData.getId());}});}// ReturnCallback 是在交換器無法將路由鍵路由到任何一個隊列中,會觸發這個方法。// 需要在系統配置文件中設置 publisher-returns: truerabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息id:{} 發送失敗", message.getMessageProperties().getCorrelationId());});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);log.info("Already sent message.");}}
測試發送消息

先啟動系統啟動類,消費者開始訂閱,啟動測試類發送消息。

@RunWith(SpringRunner.class) @SpringBootTest public class SpringbootRabbitmqApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void testReceiver() {sender.send();} }

可以在消費者接收到信息,并且發送端將打出日志 成功發送消息的記錄,也可以測試下 Publisher Confirms and Returns機制 主要是測試 ConfirmCallback 和 ReturnCallback 這兩個方法。

  • ConfirmCallback ,確認消息是否到達交換器,例如我們發送一個消息到一個你沒有創建過的 交換器上面去,看看情況,
  • ReturnCallback,確認消息是否到達隊列,我們可以這樣測試,定義一個路由鍵,不會被任何隊列訂閱到,最后查看結果就可以了。

使用注解的方式

引入依賴和連接參數

跟文章第一步的配置一樣的。

消費者
@Component @Log public class MessageReceiver {/*** 無返回消息的** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_KEY))public void receive(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));}/*** 設置有返回消息的* 需要注意的是,* 1. 在消息的在生產者(發送消息端)一定要使用 SendAndReceive(……) 這種帶有 receive 的方法,否則會拋異常,不捕獲會死循環。* 2. 該方法調用時會鎖定當前線程,并且有可能會造成MQ的性能下降或者服務端/客戶端出現死循環現象,請謹慎使用。** @param message* @return*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_REPLY_KEY))public String receiveAndReply(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));return ">>>>>>>> I got the message";}}

主要是使用到 @RabbitListener,雖然看起來參數很多,仔細的你會發現這個和寫配置類里面的基本屬性是一摸一樣的,沒有任何區別。

需要注意的是我在這里多做了個有返回值的消息,這個使用異常的話,會不斷重試消息,從而阻塞了線程。而且使用它的時候只能使用帶有 receive 的方法給它發送消息。

生產者

生產者沒什么變化。

@Component public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);private RabbitTemplate rabbitTemplate;/*** 注入 RabbitTemplate*/@Autowiredpublic MessageSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);}/*** 測試無返回消息的*/public void send() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(Constant.EXCHANGES_NAME, Constant.ROUTING_KEY, ">>>>>> Hello World".getBytes(), correlationData);log.info(">>>>>>>>>> Already sent message");}/*** 測試有返回消息的,需要注意一些問題*/public void sendAndReceive() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Object o = rabbitTemplate.convertSendAndReceive(Constant.EXCHANGES_NAME, Constant.ROUTING_REPLY_KEY, ">>>>>>>> Hello World Second".getBytes(), correlationData);log.info(">>>>>>>>>>> {}", Objects.toString(o));}/*** Confirmation callback.** @param correlationData correlation data for the callback.* @param ack true for ack, false for nack* @param cause An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(">>>>>>> 消息id:{} 發送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 發送失敗", correlationData.getId());}}/*** Returned message callback.** @param message the returned message.* @param replyCode the reply code.* @param replyText the reply text.* @param exchange the exchange.* @param routingKey the routing key.*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息id:{} 發送失敗", message.getMessageProperties().getCorrelationId());} }
測試
@RunWith(SpringRunner.class) @SpringBootTest public class SpringbootAnnotationApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void send() {sender.send();}@Testpublic void sendAndReceive() {sender.sendAndReceive();} }

總結

以上是生活随笔為你收集整理的学习Spring Boot:(二十六)使用 RabbitMQ 消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。