javascript
Spring cloud集成Rabbitmq
1、配置pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.0.RELEASE</version> </dependency>2、yml配置
spring:
? ?#rabbit mq
? ?rabbitmq:
? ? ?host: 192.168.1.146
? ? ?port: 5672
? ? ?username: guest
? ? ?password: guest
3、連接、交換器、隊(duì)列等設(shè)置
@Configuration public class RabbitMqConfig {/*** 組件發(fā)布EXCHANGE*/public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";/*** 組件發(fā)布消息隊(duì)列*/public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";public static final String ROUTING_KEY_="";@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/**** @return*/@Beanpublic FanoutExchange exchangeComponentPublished(){return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);}@Beanpublic Queue queueComponentPublished(){return new Queue(QUEUE_COMPONENT_PUBLISHED,false);}@Beanpublic FanoutExchange exchangeComponentSynced(){return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);}@Beanpublic FanoutExchange exchangeExtractionTask(){return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);}@Beanpublic Queue queueComponentSynced(){return new Queue(QUEUE_COMPONENT_SYNCED,false);}@Beanpublic Queue queueExtractionTask(){return new Queue(QUEUE_EXTRACTION_TASK,false);}@Beanpublic Binding componentSyncedBinding(){return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());}@Beanpublic Binding extractionTaskBinding(){return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}}需要定義交換器,隊(duì)列,綁定,會自動注冊。
4、生產(chǎn)者
@Component public class ExtractionTaskMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(ExtractionTaskMessage msg){rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());ObjectMapper mapper = new ObjectMapper();Message m = null;try {m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();} catch (JsonProcessingException e) {e.printStackTrace();}rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);} }使用RabbitTemplate發(fā)送消息。
5、消費(fèi)者
@Component @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK) public class ExtractionTaskMessageReceiver {private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);@Autowiredprivate DataExtractorFactory dataExtractorFactory;@Autowiredprivate ExtractionTaskService extractionTaskService;@Autowiredprivate ExtractionDataService extractionDataService;@RabbitHandlerpublic void process(@Payload ExtractionTaskMessage msg) {try {} catch (DataExtractionException e){logger.error( "",e.getMessage());}catch (Exception e) {//異常處理logger.error(e.getMessage());}}} 使用 @RabbitListener 進(jìn)行監(jiān)聽,@RabbitHandler定義消息處理方法。在進(jìn)行監(jiān)聽時(shí),會查詢所有@RabbitHandler注解的消息處理方法,如果沒有參數(shù)類型匹配的方法,則異常。
@RabbitListener 注意
-
消息處理方法參數(shù)是由 MessageConverter 轉(zhuǎn)化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實(shí)例中去設(shè)置(默認(rèn) Spring 使用的實(shí)現(xiàn)是 SimpleRabbitListenerContainerFactory)
-
消息的 content_type 屬性表示消息 body 數(shù)據(jù)以什么數(shù)據(jù)格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應(yīng)類型接收消息 body 內(nèi)容,但若方法參數(shù)類型不正確會拋異常:
- application/octet-stream:二進(jìn)制字節(jié)數(shù)組存儲,使用 byte[]
- application/x-java-serialized-object:java 對象序列化格式存儲。使用 Object、相應(yīng)類型(反序列化時(shí)類型應(yīng)該同包同名,否者會拋出找不到類異常)
- text/plain:文本數(shù)據(jù)類型存儲。使用 String
- application/json:JSON 格式。使用 Object、相應(yīng)類型
注意:@RabbitListener注解在類上或方法上,行為不一樣。在類上,消費(fèi)方法參數(shù)類型不可以設(shè)置為Message。在方法上,方法類型可以為Message
MessageConvert
- 涉及網(wǎng)絡(luò)傳輸?shù)膽?yīng)用序列化不可避免,發(fā)送端以某種規(guī)則將消息轉(zhuǎn)成 byte 數(shù)組進(jìn)行發(fā)送,接收端則以約定的規(guī)則進(jìn)行 byte[] 數(shù)組的解析
- RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸?shù)膬?nèi)容,RabbitMQ 抽象出一個(gè) MessageConvert 接口處理消息的序列化,其實(shí)現(xiàn)有 SimpleMessageConverter(默認(rèn))、Jackson2JsonMessageConverter 等
- 當(dāng)調(diào)用了 convertAndSend 方法時(shí)會使用 MessageConvert 進(jìn)行消息的序列化
- SimpleMessageConverter 對于要發(fā)送的消息體 body 為 byte[] 時(shí)不進(jìn)行處理,如果是 String 則轉(zhuǎn)成字節(jié)數(shù)組,如果是 Java 對象,則使用 jdk 序列化將消息轉(zhuǎn)成字節(jié)數(shù)組,轉(zhuǎn)出來的結(jié)果較大,含class類名,類相應(yīng)方法等信息。因此性能較差
- 當(dāng)使用 RabbitMQ 作為中間件時(shí),數(shù)據(jù)量比較大,此時(shí)就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
設(shè)置MessageConvert
Json格式
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}?自定義
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {return null;}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){return (User)ois.readObject();}catch (Exception e){e.printStackTrace();return null;}}});return factory;}@Payload 與 @Headers
- 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
? ? System.out.println("body:"+body);
? ? System.out.println("Headers:"+headers);
}
- 也可以獲取單個(gè) Header 屬性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
? ? System.out.println("body:"+body);
? ? System.out.println("token:"+token);
}
?
通過 @RabbitListener 注解聲明 Binding
- 通過 @RabbitListener 的 bindings 屬性聲明 Binding(若 RabbitMQ 中不存在該綁定所需要的 Queue、Exchange、RouteKey 則自動創(chuàng)建,若存在則拋出異常)
@RabbitListener 和 @RabbitHandler 搭配使用
- @RabbitListener 可以標(biāo)注在類上面,需配合 @RabbitHandler 注解一起使用
- @RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
?
?
?
總結(jié)
以上是生活随笔為你收集整理的Spring cloud集成Rabbitmq的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring boot添加 LocalD
- 下一篇: Spring注解编程基石(四)