日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot 集成rabbitmq 实例

發(fā)布時(shí)間:2025/3/8 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot 集成rabbitmq 实例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

springboot 集成rabbitmq 實(shí)例

個(gè)人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過程,整理此篇文章。

本文章共分為以下部分:

  • rabbitmq簡介
  • springboot配置
  • rabbitmq生產(chǎn)者配置
  • rabbitmq消費(fèi)者配置
  • 問題補(bǔ)充

一、rabbitmq簡介

目前流程的消息隊(duì)列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的應(yīng)用場景,關(guān)于各個(gè)框架的介紹,大家可自行百度,網(wǎng)上很多文章介紹~其中rabbit因?yàn)槠鋋ck特性以及還算不錯(cuò)的性能被大多數(shù)公司采用。

概念:

  • 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
  • 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽隊(duì)列中的對應(yīng)消息,消費(fèi)消息
  • 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
  • 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
  • 綁定 完成交換機(jī)和隊(duì)列之間的綁定

模式:

  • direct
    直連模式,用于實(shí)例間的任務(wù)分發(fā)
  • topic
    話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列
  • headers
    適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則
  • fanout
    分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key

安裝

單機(jī)版安裝很簡單,大概步驟如下:

# 安裝erlang包yum install erlang # 安裝socatyum install socat # 安裝rabbit? ? rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm? # 啟動(dòng)服務(wù)rabbitmq-server start # 增加管理控制功能rabbitmq-plugins?enable?rabbitmq_management # 增加用戶:sudo rabbitmqctl add_user root passwordrabbitmqctl?set_user_tags root?administrator?rabbitmqctl?set_permissions?-p?/ root '.*' '.*' '.*'

集群安裝,可參考以下博客:
? ? ?
rabbitmq集群安裝

二、springboot配置

廢話少說直接上代碼:
配置參數(shù)
application.yml:

spring:rabbitmq:addresses: 192.168.1.1:5672username: usernamepassword: passwordpublisher-confirms: truevirtual-host: /

java config讀取參數(shù)

/*** RabbitMq配置文件讀取類** @author chenhf* @create 2017-10-23 上午9:31**/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig {@Value("${spring.rabbitmq.addresses}")private String addresses;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.publisher-confirms}")private Boolean publisherConfirms;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 構(gòu)建mq實(shí)例工廠@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;} }

三、rabbitmq生產(chǎn)者配置

主要配置了直連和話題模式,其中話題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來驗(yàn)證匹配模式。

/*** 用于配置交換機(jī)和隊(duì)列對應(yīng)關(guān)系* 新增消息隊(duì)列應(yīng)該按照如下步驟* 1、增加queue bean,參見queueXXXX方法* 2、增加queue和exchange的binding* @author chenhf* @create 2017-10-23 上午10:33**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class RabbitMqExchangeConfig {/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);/*** @Author:chenhf* @Description: 主題型交換機(jī)* @Date:下午5:49 2017/10/23* @param* @return*/@BeanTopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());rabbitAdmin.declareExchange(contractTopicExchange);logger.debug("完成主題型交換機(jī)bean實(shí)例化");return contractTopicExchange;}/*** 直連型交換機(jī)*/@BeanDirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());rabbitAdmin.declareExchange(contractDirectExchange);logger.debug("完成直連型交換機(jī)bean實(shí)例化");return contractDirectExchange;}//在此可以定義隊(duì)列@BeanQueue queueTest(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());rabbitAdmin.declareQueue(queue);logger.debug("測試隊(duì)列實(shí)例化完成");return queue;}//topic 1@BeanQueue queueTopicTest1(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());rabbitAdmin.declareQueue(queue);logger.debug("話題測試隊(duì)列1實(shí)例化完成");return queue;}//topic 2@BeanQueue queueTopicTest2(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());rabbitAdmin.declareQueue(queue);logger.debug("話題測試隊(duì)列2實(shí)例化完成");return queue;}//在此處完成隊(duì)列和交換機(jī)綁定@BeanBinding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊(duì)列與直連型交換機(jī)綁定完成");return binding;}//topic binding1@BeanBinding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊(duì)列與話題交換機(jī)1綁定完成");return binding;}//topic binding2@BeanBinding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());rabbitAdmin.declareBinding(binding);logger.debug("測試隊(duì)列與話題交換機(jī)2綁定完成");return binding;}}

在這里用到枚舉類:RabbitMqEnum

/*** 定義rabbitMq需要的常量** @author chenhf* @create 2017-10-23 下午4:07**/ public class RabbitMqEnum {/*** @param* @Author:chenhf* @Description:定義數(shù)據(jù)交換方式* @Date:下午4:08 2017/10/23* @return*/public enum Exchange {CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對點(diǎn)");private String code;private String name;Exchange(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義隊(duì)列名稱* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueName {TESTQUEUE("TESTQUEUE", "測試隊(duì)列"),TOPICTEST1("TOPICTEST1", "topic測試隊(duì)列"),TOPICTEST2("TOPICTEST2", "topic測試隊(duì)列");private String code;private String name;QueueName(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定義routing_key* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueEnum {TESTQUEUE("TESTQUEUE1", "測試隊(duì)列key"),TESTTOPICQUEUE1("*.TEST.*", "topic測試隊(duì)列key"),TESTTOPICQUEUE2("lazy.#", "topic測試隊(duì)列key");private String code;private String name;QueueEnum(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}}

以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口
測試時(shí)直接調(diào)用此工具類,testUser類需自己實(shí)現(xiàn)

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser); /*** rabbitmq發(fā)送消息工具類** @author chenhf* @create 2017-10-26 上午11:10**/@Component public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);private RabbitTemplate rabbitTemplate;@Autowiredpublic RabbitMqSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {logger.info("confirm: " + correlationData.getId());}/*** 發(fā)送到 指定routekey的指定queue* @param routeKey* @param obj*/public void sendRabbitmqDirect(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);}/*** 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上* @param routeKey* @param obj*/public void sendRabbitmqTopic(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);} }

四、rabbitmq消費(fèi)者配置

springboot注解方式監(jiān)聽隊(duì)列,無法手動(dòng)指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫onMessage來進(jìn)行手動(dòng)回調(diào),詳見以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀

直連消費(fèi)者
通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊(duì)列中消息是否被消費(fèi)

/*** 消費(fèi)者配置** @author chenhf* @create 2017-10-30 下午3:14**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class ExampleAmqpConfiguration {@Bean("testQueueContainer")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TESTQUEUE");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("testQueueListener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());//通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊(duì)列中消息是否被消費(fèi)if ("2".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}if ("1".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}};}}

topic消費(fèi)者1

/*** 消費(fèi)者配置** @author chenhf* @create 2017-10-30 下午3:14**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration {@Bean("topicTest1Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST1");container.setMessageListener(exampleListener1());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest1Listener")public ChannelAwareMessageListener exampleListener1(){return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST1:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}

topic消費(fèi)者2

/*** 消費(fèi)者配置** @author chenhf* @create 2017-10-30 下午3:14**/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration2 {@Bean("topicTest2Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST2");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest2Listener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST2:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}

問題補(bǔ)充

使用過程中可能出現(xiàn)的坑參考此篇文章
https://segmentfault.com/a/11...

總結(jié)

以上是生活随笔為你收集整理的springboot 集成rabbitmq 实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。