日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot使用rabbitMQ(带回调)

發布時間:2025/6/16 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot使用rabbitMQ(带回调) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

springboot提供了各類東西的簡單集成,rabbitMQ也不例外,本文重點介紹如何集成rabbitMQ以及如何使用帶回調的rabbitMQ

萬年不變的第一步:pom

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

生產者

配置文件1:RabbitConfig

package com.mos.eboot.web.config;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope;import javax.annotation.Resource;/*** @author 小塵哥*/ @Configuration public class RabbitConfig {@Resourceprivate RabbitConstants rabbitConstants;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(rabbitConstants.getHost());connectionFactory.setUsername(rabbitConstants.getUsername());connectionFactory.setVirtualHost(rabbitConstants.getVirtualHost());connectionFactory.setPassword(rabbitConstants.getPassword()); // * 如果要進行消息回調,則這里必須要設置為trueconnectionFactory.setPublisherConfirms(rabbitConstants.getPublisherConfirms());return connectionFactory;}/*** 因為要設置回調類,所以應是prototype類型,如果是singleton類型,則回調類為最后一次設置*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}}

配置文件2:RabbitConstants(主要用于用戶名、密碼等值從配置文件獲取,也可以用@Value方式)

package com.mos.eboot.web.config;import org.springframework.boot.context.properties.ConfigurationProperties;/*** rabbit配置* @author 小塵哥*/ @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitConstants {public static final String EXCHANGE = "bootExchange";public static final String ROUTINGKEY = "routingkey";public static final String QUEUE = "bootQueue";private String host;private Integer port;private String username;private String password;private Boolean publisherConfirms;private String virtualHost;public String getHost() {return host;}public void setHost(String host) {this.host = host;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public Boolean getPublisherConfirms() {return publisherConfirms;}public void setPublisherConfirms(Boolean publisherConfirms) {this.publisherConfirms = publisherConfirms;}public String getVirtualHost() {return virtualHost;}public void setVirtualHost(String virtualHost) {this.virtualHost = virtualHost;} }

配置文件3:DemoSender,即實際的消息發送者

package com.mos.eboot.web.sender;import com.mos.eboot.tools.util.IDGen; import com.mos.eboot.web.config.RabbitConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** @author 小塵哥*/ @Component public class DemoSender implements RabbitTemplate.ConfirmCallback{private static final Logger LOGGER = LoggerFactory.getLogger(DemoSender.class);private RabbitTemplate rabbitTemplate;@Autowiredpublic DemoSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);}public void send(String msg) {CorrelationData correlationData = new CorrelationData(IDGen.genId());LOGGER.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE, RabbitConstants.ROUTINGKEY, msg, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm: " + correlationData.getId());} }

測試:DemoController

package com.mos.eboot.web.controller;import com.mos.eboot.tools.controller.BaseController; import com.mos.eboot.tools.result.ResultModel; import com.mos.eboot.web.config.RabbitConstants; import com.mos.eboot.web.sender.DemoSender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; import javax.servlet.http.HttpSession;/*** @author 小塵哥*/ @RestController @RequestMapping("demo") public class DemoController extends BaseController {private static final Logger LOGGER = LoggerFactory.getLogger(DemoController.class);@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate DemoSender demoSender;@RequestMapping("amqp")public ResultModel amqp(){rabbitTemplate.convertAndSend(RabbitConstants.QUEUE,"1message from web");rabbitTemplate.convertAndSend("exchange","topic.messages","2message from web for exchage");rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE,RabbitConstants.ROUTINGKEY,"3message from web for fanoutExchange");//主要是下面這個demoSender.send("message from web for fanoutExchange1234234");return ResultModel.defaultSuccess(null);} }

消費者

配置都相同,添加一個Listener,用來接收消息

package com.mos.eboot.consumer.config;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.Payload;/*** @author 小塵哥*/ @Configuration @RabbitListener(queues = RabbitConstants.QUEUE) public class Listener {/** 設置交換機類型 */@Beanpublic DirectExchange defaultExchange() {/*** DirectExchange:按照routingkey分發到指定隊列* TopicExchange:多關鍵字匹配* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念* HeadersExchange :通過添加屬性key-value匹配*/return new DirectExchange(RabbitConstants.EXCHANGE);}@Beanpublic Queue fooQueue() {return new Queue(RabbitConstants.QUEUE);}@Beanpublic Binding binding() {/** 將隊列綁定到交換機 */return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(RabbitConstants.ROUTINGKEY);}@RabbitHandlerpublic void process(@Payload String foo) {System.out.println("Listener: " + foo);} }

yml配置

spring:redis:database: 0# Redis服務器地址host: 127.0.0.1# Redis服務器連接端口port: 6379# Redis服務器連接密碼(默認為空)password: 123456789rabbitmq:host: 172.16.14.93port: 5672username: dreamerpassword: dreamervirtualHost: ebootpublisherConfirms: true

測試結果

訪問http://localhost:8881/demo/amqp(根據你的實際情況)

生產者

消費者

可以看到消費者接收到了所發送的三個消息,但是其中只有第三個demoSender.send()發送的有回調,而在DemoSender中重寫的confirm里也接收到了回調信息。

完整代碼已上傳碼云,戳【eboot】獲取源碼

總結

以上是生活随笔為你收集整理的springboot使用rabbitMQ(带回调)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。