RabbitMQ大揭秘
RabbitMQ大揭秘
歡迎關(guān)注H尋夢人公眾號
通過SpringBoot整合RabbitMQ的案例來說明,RabbitMQ相關(guān)的各個(gè)屬性以及使用方式;并通過相關(guān)源碼深刻理解。
Queue(消息隊(duì)列)
Queue(消息隊(duì)列) 用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
RabbitMQ 中消息只能存儲在 隊(duì)列 中,這一點(diǎn)和 Kafka 這種消息中間件相反。Kafka 將消息存儲在 topic(主題) 這個(gè)邏輯層面,而相對應(yīng)的隊(duì)列邏輯只是topic實(shí)際存儲文件中的位移標(biāo)識。 RabbitMQ 的生產(chǎn)者生產(chǎn)消息并最終投遞到隊(duì)列中,消費(fèi)者可以從隊(duì)列中獲取消息并消費(fèi)。
多個(gè)消費(fèi)者可以訂閱同一個(gè)隊(duì)列,這時(shí)隊(duì)列中的消息會被平均分?jǐn)?#xff08;Round-Robin,即輪詢)給多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理,這樣避免的消息被重復(fù)消費(fèi)。
RabbitMQ 不支持隊(duì)列層面的廣播消費(fèi),如果有廣播消費(fèi)的需求,需要在其上進(jìn)行二次開發(fā),這樣會很麻煩,不建議這樣做。
交換機(jī)的類型:
交換機(jī)主要包括如下4種類型:
1. 基本配置
1.1 配置文件屬性說明
RabbitMQ最基本的基礎(chǔ)配置如下:
server:port: 11000spring:application:name: rabbitmq-testrabbitmq: # 單機(jī)IP配置 # host: rabbitmq-host # port: 5672# 集群IP配置addresses: rabbitmq01-host:5672,rabbitmq02-host:5672# 用戶名和密碼,默認(rèn)都是guestusername: xiongminpassword: xiongmin# 交換器名可以不設(shè)置默認(rèn) 【"" --> /】 交換器virtual-host: /rabbitmq_testpublisher-returns: true # 發(fā)送者開啟 return 確認(rèn)機(jī)制publisher-confirm-type: correlated # 發(fā)送者開啟 confirm 確認(rèn)機(jī)制 等價(jià)于 spring.rabbitmq.publisher-returns=truedefault-exchange: default_exchangelistener:simple:acknowledge-mode: manual # 設(shè)置消費(fèi)端手動 ackretry:enabled: true # 支持重試----------------------------------------------------------------------------------------------server:port: 11000spring:application:name: rabbitmq-testrabbitmq: # 單機(jī)IP配置 # host: rabbitmq-host # port: 5672# 集群IP配置addresses: rabbitmq01-host:5672,rabbitmq02-host:5672# 用戶名和密碼,默認(rèn)都是guestusername: xiongminpassword: xiongmin# 交換器名可以不設(shè)置默認(rèn) 【"" --> /】 交換器virtual-host: /rabbitmq_testpublisher-returns: true # 發(fā)送者開啟 return 確認(rèn)機(jī)制publisher-confirm-type: correlated # 發(fā)送者開啟 confirm 確認(rèn)機(jī)制 等價(jià)于 spring.rabbitmq.publisher-returns=true#連接超時(shí)時(shí)間connection-timeout: 15000# 使用return-callback時(shí)必須設(shè)置mandatory為truetemplate:mandatory: truedefault-exchange: default_exchange# 消費(fèi)端配置listener:simple:retry:enabled: true # 支持重試#消費(fèi)端concurrency: 5#最大消費(fèi)端數(shù)max-concurrency: 10#自動簽收auto 手動 manualacknowledge-mode: manual # 設(shè)置消費(fèi)端手動 ack#限流(海量數(shù)據(jù),同時(shí)只能過來一條)prefetch: 11.2 配置類說明
package com.cli.springboot_rabbitmq.config;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @Author xiongmin* @Description* @Date 2021/2/25 18:31* @Version 1.0**/ @Configuration public class RabbitMQConfig {@Value("${spring.rabbitmq.addresses}")private String addresses;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.default-exchange}")private String defaultExchange;@Autowiredprivate ConfirmCallbackService confirmCallbackService;@Autowiredprivate ReturnCallbackService returnCallbackService;public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost); // connectionFactory.setPublisherConfirms(rabbitmqProps.isPublisherConfirms()); //消息回調(diào),必須要設(shè)置return connectionFactory;}/*** 使用的自己的創(chuàng)建的RabbitMQ 完全沒有使用到RabbitMQ相關(guān)的默認(rèn)配置,即使在yaml文件中配置了消費(fèi)者手動確認(rèn),使用如下的rabbitMQ 也是無效的,* 因?yàn)镽abbitTemplate相關(guān)的ConnectionFactory 沒有設(shè)置消費(fèi)者手動確認(rèn)消息,這里不會使用到y(tǒng)aml的配置* @return*/@Bean(value = "rabbitTemplateMessaging")public RabbitTemplate rabbitTemplateMessaging() {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());// 消息類型轉(zhuǎn)換器 -- 數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列rabbitTemplate.setMessageConverter(jackson2MessageConverter());// 設(shè)置默認(rèn)的交換機(jī),如果發(fā)送的消息沒有指定交換機(jī),則使用默認(rèn)的交換機(jī)rabbitTemplate.setExchange(defaultExchange);/*** mandatory:交換器無法根據(jù)自身類型和路由鍵找到一個(gè)符合條件的隊(duì)列時(shí)的處理方式* true:RabbitMQ會調(diào)用Basic.Return命令將消息返回給生產(chǎn)者* false:RabbitMQ會把消息直接丟棄*/rabbitTemplate.setMandatory(true);/*** 消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理*/rabbitTemplate.setConfirmCallback(confirmCallbackService);/*** 消息投遞到隊(duì)列失敗回調(diào)處理*/rabbitTemplate.setReturnsCallback(returnCallbackService);// 服務(wù)端響應(yīng)發(fā)送到的隊(duì)列 reply-address 格式: exchange/routingKeyrabbitTemplate.setReplyAddress("messaging/messaging-response");// 設(shè)置回復(fù)和接受消息的時(shí)間,單位為毫秒rabbitTemplate.setReplyTimeout(20000);rabbitTemplate.setReceiveTimeout(20000);return rabbitTemplate;}/*** @param connectionFactory connectionFactory屬性信息會直接是用yaml配置文件中配置的* @return*/@Bean(value = "rabbitTemplateInit")public RabbitTemplate rabbitTemplateInit(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 消息類型轉(zhuǎn)換器 -- 數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列rabbitTemplate.setMessageConverter(jackson2MessageConverter());// 設(shè)置默認(rèn)的交換機(jī),如果發(fā)送的消息沒有指定交換機(jī),則使用默認(rèn)的交換機(jī)rabbitTemplate.setExchange(defaultExchange);/*** mandatory:交換器無法根據(jù)自身類型和路由鍵找到一個(gè)符合條件的隊(duì)列時(shí)的處理方式* true:RabbitMQ會調(diào)用Basic.Return命令將消息返回給生產(chǎn)者* false:RabbitMQ會把消息直接丟棄*/rabbitTemplate.setMandatory(true);/*** 消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理*/rabbitTemplate.setConfirmCallback(confirmCallbackService);/*** 消息投遞到隊(duì)列失敗回調(diào)處理*/rabbitTemplate.setReturnsCallback(returnCallbackService);// 服務(wù)端響應(yīng)發(fā)送到的隊(duì)列 reply-address 格式: exchange/routingKeyrabbitTemplate.setReplyAddress("messaging/messaging-response");// 設(shè)置回復(fù)和接受消息的時(shí)間,單位為毫秒rabbitTemplate.setReplyTimeout(20000);rabbitTemplate.setReceiveTimeout(20000);return rabbitTemplate;}/*** 的作用解釋如下:* 數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列* [[RabbitMQ]Jackson2JsonMessageConverter轉(zhuǎn)換實(shí)體類常的問題](https://blog.csdn.net/qq_31897023/article/details/103875594)* [Springboot Rabbitmq 使用Jackson2JsonMessageConverter 消息傳遞后轉(zhuǎn)對象](https://www.cnblogs.com/timseng/p/11688019.html)* @return*/@Beanpublic Jackson2JsonMessageConverter jackson2MessageConverter() {Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();return converter;}//--------------------------配置相關(guān)的Exchange和相關(guān)的Queue----------------------------------/*** 創(chuàng)建topic模式的交換器* @return*/@BeanTopicExchange exchange() {return new TopicExchange("topicExchange",true,false);}/*** 創(chuàng)建fanout模式的交換器* 發(fā)布訂閱模式* 發(fā)布訂閱是交換機(jī)針對隊(duì)列來說的,一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列* 注意:多個(gè)消費(fèi)者可以訂閱同一個(gè)隊(duì)列,這時(shí)隊(duì)列中的消息會被平均分?jǐn)?#xff08;Round-Robin,即輪詢)給多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理,這樣避免的消息被重復(fù)消費(fèi)。* @return*/@BeanFanoutExchange fanoutExchange() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效; 如果持久性,則RabbitMQ重啟后,交換機(jī)還存在// autoDelete:是否自動刪除,當(dāng)所有與之綁定的消息隊(duì)列都完成了對此交換機(jī)的使用后,刪掉它return new FanoutExchange("fanoutExchange",true,false);}//Direct交換機(jī) 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。return new DirectExchange("TestDirectExchange",true,false);}//隊(duì)列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。// return new Queue("TestDirectQueue",true,true,false);//一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)falsereturn new Queue("TestDirectQueue",true);}//綁定 將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}// 默認(rèn)的 Direct交換機(jī) 起名:DefaultDirectExchange@BeanDirectExchange DefaultDirectExchange() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。return new DirectExchange(defaultExchange,true,false);}//隊(duì)列 起名:DefaultDirectQueue@Beanpublic Queue DefaultDirectQueue() {// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會被存儲在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效// exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable// autoDelete:是否自動刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會自動刪除。// return new Queue("TestDirectQueue",true,true,false);//一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)falsereturn new Queue("DefaultDirectQueue",true);}//綁定 將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:DefaultDirectRouting@BeanBinding bindingDefaultDirect() {return BindingBuilder.bind(DefaultDirectQueue()).to(DefaultDirectExchange()).with("DefaultDirectRouting");}/*** 創(chuàng)建三個(gè)隊(duì)列 :fanout.A fanout.B fanout.C* 將三個(gè)隊(duì)列都綁定在交換機(jī) fanoutExchange 上* 因?yàn)槭巧刃徒粨Q機(jī), 路由鍵無需配置,配置也不起作用*/@Beanpublic Queue queueA() {return new Queue("fanout.A");}@Beanpublic Queue queueB() {return new Queue("fanout.B");}@Beanpublic Queue queueC() {return new Queue("fanout.C");}@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}//綁定鍵public final static String MAN = "topic.MAN";public final static String WOMAN = "topic.WOMAN";@Beanpublic Queue firstQueue() {return new Queue(RabbitMQConfig.MAN);}@Beanpublic Queue secondQueue() {return new Queue(RabbitMQConfig.WOMAN);}//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.MAN//這樣只要是消息攜帶的路由鍵是topic.MAN,才會分發(fā)到該隊(duì)列@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);}//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊(duì)列@BeanBinding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}交換機(jī)的屬性
除交換機(jī)類型外,在聲明交換機(jī)時(shí)還可以附帶許多其他的屬性,其中最重要的幾個(gè)分別是:
- Name:交換機(jī)名稱
- Durability:是否持久化。如果持久性,則RabbitMQ重啟后,交換機(jī)還存在
- Auto-delete:當(dāng)所有與之綁定的消息隊(duì)列都完成了對此交換機(jī)的使用后,刪掉它
- Arguments:擴(kuò)展參數(shù)
2. 消息轉(zhuǎn)換器
可以注意到的是上面的配置中RabbitTemplate設(shè)置的消息轉(zhuǎn)換器是Jackson2JsonMessageConverter;下面將對消息轉(zhuǎn)換器說明
消息轉(zhuǎn)換器接口源碼:
/** Copyright 2002-2019 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.springframework.amqp.support.converter;import java.lang.reflect.Type;import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.lang.Nullable;/*** Message converter interface.** @author Mark Fisher* @author Mark Pollack* @author Gary Russell*/ public interface MessageConverter {/*** Convert a Java object to a Message. 將消息對象轉(zhuǎn)換成java對象。* @param object the object to convert* @param messageProperties The message properties.* @return the Message* @throws MessageConversionException in case of conversion failure*/Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;/*** Convert a Java object to a Message. 將java對象和屬性對象轉(zhuǎn)換成Message對象。* The default implementation calls {@link #toMessage(Object, MessageProperties)}.* @param object the object to convert* @param messageProperties The message properties.* @param genericType the type to use to populate type headers.* @return the Message* @throws MessageConversionException in case of conversion failure* @since 2.1*/default Message toMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType)throws MessageConversionException {return toMessage(object, messageProperties);}/*** Convert from a Message to a Java object.* @param message the message to convert* @return the converted Java object* @throws MessageConversionException in case of conversion failure*/Object fromMessage(Message message) throws MessageConversionException;}可以通過實(shí)現(xiàn)MessageConverter接口,實(shí)現(xiàn)自定義的消息轉(zhuǎn)換器,如下:
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter;public class TestMessageConverter implements MessageConverter {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {System.out.println("=======toMessage=========");return new Message(object.toString().getBytes(),messageProperties);}//消息類型轉(zhuǎn)換器中fromMessage方法返回的類型就是消費(fèi)端處理器接收的類型@Overridepublic Object fromMessage(Message message) throws MessageConversionException {System.out.println("=======fromMessage=========");return new String(message.getBody());} }簡介Jackson2JsonMessageConverter消息轉(zhuǎn)換器:
使用Jackson2JsonMessageConverter后,反序列化時(shí)要求發(fā)送的類和接受的類完全一樣(字段,類名,包路徑)。【也就是消息的生產(chǎn)的消息類型和消息的消費(fèi)方法的消息參數(shù)類型一致】
- Springboot Rabbitmq 使用Jackson2JsonMessageConverter 消息傳遞后轉(zhuǎn)對象
- [RabbitMQ]Jackson2JsonMessageConverter轉(zhuǎn)換實(shí)體類常的問題
- RabbitMQ筆記十:MessageConverter詳解
3. 生產(chǎn)者消費(fèi)者使用案例
3.1 相關(guān)注解
- RabbitMQ筆記十三:使用@RabbitListener注解消費(fèi)消息
3.2 消息的發(fā)送處理
package com.cli.springboot_rabbitmq.config;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.util.UUID;/*** @Author xiongmin* @Description* @Date 2021/2/26 10:35* @Version 1.0**/ @Slf4j @Service public class RabbitMQService {// @Resource(name = "rabbitTemplateMessaging") // private RabbitTemplate rabbitTemplate;/*** 換成這個(gè)RabbitTemplate 之后,所有的消費(fèi)者都要手動Ack消息確認(rèn)被消費(fèi)*/@Resource(name = "rabbitTemplateInit")private RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitMQConfig amqpConfig;// 發(fā)送消息的后置處理器,MessagePostProcessor類的postProcessMessage方法得到的Message就是將參數(shù)Object內(nèi)容轉(zhuǎn)換成Message對象// 沒有指定具體的Exchange就會使用默認(rèn)的Exchangepublic void messageDeliver(String routineKey, Object o) {rabbitTemplate.convertAndSend(routineKey, o, message -> {System.out.println("-------處理前message-------------");System.out.println(message);// 設(shè)置message的一些頭部信息message.getMessageProperties().setMessageId(UUID.randomUUID().toString());message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());return message;});}// 發(fā)送消息的后置處理器,MessagePostProcessor類的postProcessMessage方法得到的Message就是將參數(shù)Object內(nèi)容轉(zhuǎn)換成Message對象public void messageDeliver(String exchange, String routineKey, Object o) {rabbitTemplate.convertAndSend(exchange, routineKey, o, message -> {System.out.println("-------處理前message-------------");System.out.println(message);// 設(shè)置message的一些頭部信息message.getMessageProperties().setMessageId(UUID.randomUUID().toString());message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println("-------處理后message-------------");System.out.println(message);return message;},new CorrelationData(UUID.randomUUID().toString()));}// 發(fā)送消息的后置處理器,MessagePostProcessor類的postProcessMessage方法得到的Message就是將參數(shù)Object內(nèi)容轉(zhuǎn)換成Message對象public void messageDeliver(String exchange, String routineKey, User user) {rabbitTemplate.convertAndSend(exchange, routineKey, user, message -> {System.out.println("-------處理前message-------------");System.out.println(message);// 設(shè)置message的一些頭部信息/*** 消息在消費(fèi)時(shí),先根據(jù)消息投中給的messageId找到對飲給的User, 在判斷User的狀態(tài)是否已經(jīng)被消費(fèi)過* 感覺這也是一種防止消息重復(fù)消費(fèi)的方式,即使同一個(gè)消息投遞多次,也你能防止消息重復(fù)消費(fèi)*/message.getMessageProperties().setMessageId(user.getName());message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println("-------處理后message-------------");System.out.println(message);return message;},new CorrelationData(UUID.randomUUID().toString()));}/*** 防止同一時(shí)間段有好多個(gè)同步配置的消息發(fā)送,避免多個(gè)重復(fù)消息* @param deviceId*/public void deliveConfigSyncMsgBeforeCheck(String deviceId) {if (connect == null) {connect = rabbitTemplate.getConnectionFactory().createConnection();// connect = amqpConfig.connectionFactory().createConnection();}if (channel == null) {try {channel = connect.createChannel(false);} catch (Exception e) {connect = amqpConfig.connectionFactory().createConnection();channel = connect.createChannel(false);}}try {long messageCount = channel.messageCount("f5-config");// 如果f5-ltm-state為空那我就發(fā)一條消息,否者就不發(fā),防止MQ消息堆積if (messageCount == 0) {messageDeliver("f5.config", deviceId);} else {boolean a = false;int i;for(i=0; i < messageCount; i++) {channel.basicQos(1);GetResponse response = channel.basicGet("f5-config", true);if (response == null) {continue;}AMQP.BasicProperties props = response.getProps();byte[] body = response.getBody();String message = new String(body);channel.basicPublish("messaging","f5.config", props, message.getBytes("UTF-8"));if (message.contains(deviceId)) {a = true;break;}}if (!a && i >= messageCount) {messageDeliver("f5.config", deviceId);}}} catch (Exception e) {logger.info("Retrieve Message fail " + e.getMessage());}}}發(fā)送消息的后置處理器,MessagePostProcessor類的postProcessMessage方法得到的Message就是將參數(shù)Object內(nèi)容轉(zhuǎn)換成Message對象
rabbitTemplate.convertAndSend(exchange, routineKey, o, message -> {System.out.println("-------處理前message-------------");System.out.println(message);// 設(shè)置message的一些頭部信息message.getMessageProperties().setMessageId(UUID.randomUUID().toString());message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println("-------處理后message-------------");System.out.println(message);return message;},new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate的convertAndSend的相關(guān)源碼:@Overridepublic void convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor,@Nullable CorrelationData correlationData) throws AmqpException {Message messageToSend = convertMessageIfNecessary(message);messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData,nullSafeExchange(exchange), nullSafeRoutingKey(routingKey));send(exchange, routingKey, messageToSend, correlationData);}- 消息模板-RabbitTemplate
- 給RabbitMQ發(fā)送消息時(shí),設(shè)置請求頭Header。
3.3 消息的消費(fèi)處理
@RabbitListener和@RabbitHandler搭配使用
@RabbitListener可以標(biāo)注在類上面,當(dāng)使用在類上面的時(shí)候,需要配合@RabbitHandler注解一起使用,@RabbitListener標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給帶有@RabbitHandler的方法處理,具體找哪個(gè)方法處理,需要跟進(jìn)MessageConverter轉(zhuǎn)換后的java對象。
package com.cli.springboot_rabbitmq.consumer;import com.cli.springboot_rabbitmq.model.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.util.Map;/*** @Author xiongmin* @Description* @Date 2021/2/26 11:54* @Version 1.0**/ @Component @Slf4j @RabbitListener(queues = "topic.WOMAN") //監(jiān)聽的隊(duì)列名稱 topic.WOMAN public class TopicConsumerListenerTwo {// @RabbitHandler // public void receive(@Payload String message, @Headers Map<String, Object> headers) { // try { // log.info("TopicReceiver消費(fèi)者收到消息 : "); // log.info("message = " + message); // // log.info("TopicReceiver消費(fèi)者收到消息頭部信息 : "); // if (null != headers && !headers.isEmpty()) { // headers.forEach((key, value) -> { // log.info(key + ": " + value + "\n"); // }); // } else { // log.info("headers is empty"); // } // } catch (Exception e) { // log.error(e.getMessage(), e); // } // }/*** 如果生產(chǎn)者生產(chǎn)的消息類型為String,那么就會執(zhí)行該方法處理消息* @param message*/@RabbitHandlerpublic void receive(@Payload String message) {try {log.info("TopicReceiver消費(fèi)者收到消息 : ");log.info("message = " + message);} catch (Exception e) {log.error(e.getMessage(), e);}}// @RabbitHandler // public void receive(@Payload User user, @Headers Map<String, Object> headers) { // try { // log.info("TopicReceiver消費(fèi)者收到消息 : "); // log.info("user = " + user); // // log.info("TopicReceiver消費(fèi)者收到消息頭部信息 : "); // if (null != headers && !headers.isEmpty()) { // headers.forEach((key, value) -> { // log.info(key + ": " + value + "\n"); // }); // } else { // log.info("headers is empty"); // } // } catch (Exception e) { // log.error(e.getMessage(), e); // } // }/*** 如果生產(chǎn)者生產(chǎn)的消息類型為User,那么就會執(zhí)行該方法處理消息* @param user*/@RabbitHandlerpublic void receive(@Payload User user) {try {log.info("TopicReceiver消費(fèi)者收到消息 : ");log.info("user = " + user);} catch (Exception e) {log.error(e.getMessage(), e);}}/*** 如果生產(chǎn)者生產(chǎn)的消息類型為Map,那么就會執(zhí)行該方法處理消息* @param message*/@RabbitHandlerpublic void receive(@Payload Map message) {log.info("TopicReceiver {topic.WOMAN}消費(fèi)者收到消息 : ");if (null != message && !message.isEmpty()) {message.forEach((key, value) -> {log.info(key + ": " + value + "\n");});} else {log.info("message is empty");}}/*** @DO 總結(jié)* @RabbitListener標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給帶有@RabbitHandler的方法處理,具體找哪個(gè)方法處理,需要更具M(jìn)essageConverter轉(zhuǎn)換后的java對象。* 注意,如果需要消息的頭部信息,由于頭部信息是一個(gè)MAP數(shù)據(jù)結(jié)構(gòu),那么Payload的數(shù)據(jù)類型不能為MAP類型,否者會報(bào)錯(cuò),且即使其他不是Map類型的Payload,要獲取消息的頭部信息也會報(bào)錯(cuò)* 因?yàn)橄M(fèi)者消息Payload是MAP的類型的消息時(shí),會查看那個(gè)方法中有MAP, 當(dāng)有多個(gè)方法中具有MAP參數(shù)時(shí),此時(shí)程序也不知道該使用哪個(gè)方法來處理這個(gè)消息,就會拋出異常*/}@RabbitListener標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給帶有@RabbitHandler的方法處理,具體找哪個(gè)方法處理,需要更具M(jìn)essageConverter轉(zhuǎn)換后的java對象。
注意: 如果需要消息的頭部信息,由于頭部信息是一個(gè)MAP數(shù)據(jù)結(jié)構(gòu),那么Payload的數(shù)據(jù)類型不能為MAP類型,否者會報(bào)錯(cuò),且即使其他不是Map類型的Payload,要獲取消息的頭部信息也會報(bào)錯(cuò);
因?yàn)橄M(fèi)者消息Payload是MAP的類型的消息時(shí),會查看那個(gè)方法中有MAP, 當(dāng)有多個(gè)方法中具有MAP參數(shù)時(shí),此時(shí)程序也不知道該使用哪個(gè)方法來處理這個(gè)消息,就會拋出異常
4. 消息確認(rèn)機(jī)制
yaml配置打開confirmCallback 和returnCallback
server:port: 11000spring:application:name: rabbitmq-testrabbitmq: # 單機(jī)IP配置 # host: rabbitmq-host # port: 5672# 集群IP配置addresses: rabbitmq01-host:5672,rabbitmq02-host:5672# 用戶名和密碼,默認(rèn)都是guestusername: xiongminpassword: xiongmin# 交換器名可以不設(shè)置默認(rèn) 【"" --> /】 交換器virtual-host: /rabbitmq_testpublisher-returns: true # 發(fā)送者開啟 return 確認(rèn)機(jī)制publisher-confirm-type: correlated # 發(fā)送者開啟 confirm 確認(rèn)機(jī)制 等價(jià)于 spring.rabbitmq.publisher-returns=true#連接超時(shí)時(shí)間connection-timeout: 15000# 使用return-callback時(shí)必須設(shè)置mandatory為truetemplate:mandatory: truedefault-exchange: default_exchange# 消費(fèi)端配置listener:simple:retry:enabled: true # 支持重試#消費(fèi)端concurrency: 5#最大消費(fèi)端數(shù)max-concurrency: 10#自動簽收auto 手動 manualacknowledge-mode: manual # 設(shè)置消費(fèi)端手動 ack#限流(海量數(shù)據(jù),同時(shí)只能過來一條)prefetch: 1分別實(shí)現(xiàn)confirmCallback和returnCallback回調(diào)的類接口
ConfirmCallbackService.java
package com.cli.springboot_rabbitmq.config;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** @Author xiongmin* @Description 監(jiān)聽消息是否發(fā)送交換機(jī)回調(diào) 只有投遞失敗的時(shí)候才會執(zhí)行* @Date 2021/2/27 11:16* @Version 1.0**/ @Component @Slf4j public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("消息發(fā)送異常!");} else {log.info("發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);}} }ReturnCallbackService.java
package com.cli.springboot_rabbitmq.config;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** @Author xiongmin* @Description 消息為路由到隊(duì)列監(jiān)聽類 只有投遞失敗的時(shí)候才會執(zhí)行* @Date 2021/2/27 11:20* @Version 1.0* 如果消息未能投遞到目標(biāo) queue 里將觸發(fā)回調(diào) returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當(dāng)前消息的詳細(xì)投遞數(shù)據(jù),方便后續(xù)做重發(fā)或者補(bǔ)償?shù)炔僮鳌?*/ @Component @Slf4j public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("Fail... message:{},從交換機(jī)exchange:{},以路由鍵routingKey:{}," +"未找到匹配隊(duì)列,replyCode:{},replyText:{}",returned.getMessage(), returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText());} }設(shè)置RabbitTemplate
/*** @param connectionFactory connectionFactory屬性信息會直接是用yaml配置文件中配置的* @return*/@Bean(value = "rabbitTemplateInit")public RabbitTemplate rabbitTemplateInit(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);// 消息類型轉(zhuǎn)換器 -- 數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列rabbitTemplate.setMessageConverter(jackson2MessageConverter());// 設(shè)置默認(rèn)的交換機(jī),如果發(fā)送的消息沒有指定交換機(jī),則使用默認(rèn)的交換機(jī)rabbitTemplate.setExchange(defaultExchange);/*** mandatory:交換器無法根據(jù)自身類型和路由鍵找到一個(gè)符合條件的隊(duì)列時(shí)的處理方式* true:RabbitMQ會調(diào)用Basic.Return命令將消息返回給生產(chǎn)者* false:RabbitMQ會把消息直接丟棄*/rabbitTemplate.setMandatory(true);/*** 消費(fèi)者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理*/rabbitTemplate.setConfirmCallback(confirmCallbackService);/*** 消息投遞到隊(duì)列失敗回調(diào)處理*/rabbitTemplate.setReturnsCallback(returnCallbackService);// 服務(wù)端響應(yīng)發(fā)送到的隊(duì)列 reply-address 格式: exchange/routingKeyrabbitTemplate.setReplyAddress("messaging/messaging-response");// 設(shè)置回復(fù)和接受消息的時(shí)間,單位為毫秒rabbitTemplate.setReplyTimeout(20000);rabbitTemplate.setReceiveTimeout(20000);return rabbitTemplate;}消費(fèi)者消費(fèi)消息
package com.cli.springboot_rabbitmq.consumer;import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.HashMap;/*** @Author xiongmin* @Description* @Date 2021/2/26 10:51* @Version 1.0**/ @Component @Slf4j @RabbitListener(queues = "TestDirectQueue") //監(jiān)聽的隊(duì)列名稱 TestDirectQueue,監(jiān)聽多個(gè)隊(duì)列需要用單號分隔 public class DirectConsumerListener {@RabbitHandlerpublic void receive(@Payload HashMap msg, Channel channel, Message message) throws IOException {try {log.info("DirectReceiver消費(fèi)者收到消息 : ");if (null != msg && !msg.isEmpty()) {msg.forEach((key, value) -> {log.info(key + ": " + value + "\n");});} else {log.info("msg is empty");}// 消費(fèi)者手動ACK確認(rèn)消息被消費(fèi), 生產(chǎn)者會執(zhí)行ConfirmCallback回調(diào)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error(e.getMessage(),e);try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重復(fù)處理失敗,拒絕再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息} else {log.error("消息即將返回隊(duì)列再次處理");channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}} catch (Exception ex) {log.error("消息消費(fèi)異常時(shí)處理失敗",ex.getMessage(),ex);}}} }消費(fèi)者回執(zhí)方法
消費(fèi)消息有三種回執(zhí)方法,我們來分析一下每種方法的含義。
1、basicAck
basicAck:表示成功確認(rèn),使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。
void basicAck(long deliveryTag, boolean multiple) 復(fù)制代碼deliveryTag:表示消息投遞序號,每次消費(fèi)消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認(rèn)模式下,我們可以對指定deliveryTag的消息進(jìn)行ack、nack、reject等操作。
multiple:是否批量確認(rèn),值為 true 則會一次性 ack所有小于當(dāng)前消息 deliveryTag 的消息。
舉個(gè)栗子: 假設(shè)我先發(fā)送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認(rèn),當(dāng)我發(fā)第四條消息此時(shí)deliveryTag為8,multiple設(shè)置為 true,會將5、6、7、8的消息全部進(jìn)行確認(rèn)。
2、basicNack
basicNack :表示失敗確認(rèn),一般在消費(fèi)消息業(yè)務(wù)異常時(shí)用到此方法,可以將消息重新投遞入隊(duì)列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) 復(fù)制代碼deliveryTag:表示消息投遞序號。
multiple:是否批量確認(rèn)。
requeue:值為 true 消息將重新入隊(duì)列。
3、basicReject
basicReject:拒絕消息,與basicNack區(qū)別在于不能進(jìn)行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue) 復(fù)制代碼deliveryTag:表示消息投遞序號。
requeue:值為 true 消息將重新入隊(duì)列。
ConfirmCallBack和returnBackCall回調(diào)執(zhí)行時(shí)機(jī)如下:
先從總體的情況分析,推送消息存在四種情況:
①消息推送到server,但是在server里找不到交換機(jī)
②消息推送到server,找到交換機(jī)了,但是沒找到隊(duì)列
③消息推送到sever,交換機(jī)和隊(duì)列啥都沒找到
④消息推送成功
那么我先寫幾個(gè)接口來分別測試和認(rèn)證下以上4種情況,消息確認(rèn)觸發(fā)回調(diào)函數(shù)的情況:
①消息推送到server,但是在server里找不到交換機(jī)
結(jié)論: ①這種情況觸發(fā)的是 ConfirmCallback 回調(diào)函數(shù)。
②消息推送到server,找到交換機(jī)了,但是沒找到隊(duì)列
結(jié)論:②這種情況觸發(fā)的是 ConfirmCallback和RetrunCallback兩個(gè)回調(diào)函數(shù)。
③消息推送到sever,交換機(jī)和隊(duì)列啥都沒找到
這種情況其實(shí)一看就覺得跟①很像,沒錯(cuò) ,③和①情況回調(diào)是一致的,所以不做結(jié)果說明了。
結(jié)論: ③這種情況觸發(fā)的是 ConfirmCallback 回調(diào)函數(shù)。
④消息推送成功
結(jié)論: ④這種情況觸發(fā)的是 ConfirmCallback 回調(diào)函數(shù)。
- SpringBoot2.x整合RabbitMQ(完整版)
- Springboot 整合RabbitMq ,用心看完這一篇就夠了
- 用了 springboot + rabbitmq 消息確認(rèn)機(jī)制,我感覺掉坑里了
- RabbitMQ 可靠投遞
相關(guān)鏈接
- Spring 實(shí)戰(zhàn)(第 4 版)-電子書
- 中間件系列三 RabbitMQ之交換機(jī)的四種類型和屬性
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ大揭秘的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: H5微信jsapi支付流程
- 下一篇: 六、MSP432飞控软件框架分析