日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring cloud集成Rabbitmq

發(fā)布時(shí)間:2024/4/13 javascript 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring cloud集成Rabbitmq 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、配置pom

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.0.RELEASE</version> </dependency>

2、yml配置

spring:
? ?#rabbit mq
? ?rabbitmq:

? ? ?host: 192.168.1.146
? ? ?port: 5672
? ? ?username: guest
? ? ?password: guest

3、連接、交換器、隊(duì)列等設(shè)置

@Configuration public class RabbitMqConfig {/*** 組件發(fā)布EXCHANGE*/public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";/*** 組件發(fā)布消息隊(duì)列*/public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";public static final String ROUTING_KEY_="";@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/**** @return*/@Beanpublic FanoutExchange exchangeComponentPublished(){return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);}@Beanpublic Queue queueComponentPublished(){return new Queue(QUEUE_COMPONENT_PUBLISHED,false);}@Beanpublic FanoutExchange exchangeComponentSynced(){return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);}@Beanpublic FanoutExchange exchangeExtractionTask(){return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);}@Beanpublic Queue queueComponentSynced(){return new Queue(QUEUE_COMPONENT_SYNCED,false);}@Beanpublic Queue queueExtractionTask(){return new Queue(QUEUE_EXTRACTION_TASK,false);}@Beanpublic Binding componentSyncedBinding(){return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());}@Beanpublic Binding extractionTaskBinding(){return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}}

需要定義交換器,隊(duì)列,綁定,會自動注冊。

4、生產(chǎn)者

@Component public class ExtractionTaskMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(ExtractionTaskMessage msg){rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());ObjectMapper mapper = new ObjectMapper();Message m = null;try {m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();} catch (JsonProcessingException e) {e.printStackTrace();}rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);} }

使用RabbitTemplate發(fā)送消息。

5、消費(fèi)者

@Component @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK) public class ExtractionTaskMessageReceiver {private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);@Autowiredprivate DataExtractorFactory dataExtractorFactory;@Autowiredprivate ExtractionTaskService extractionTaskService;@Autowiredprivate ExtractionDataService extractionDataService;@RabbitHandlerpublic void process(@Payload ExtractionTaskMessage msg) {try {} catch (DataExtractionException e){logger.error( "",e.getMessage());}catch (Exception e) {//異常處理logger.error(e.getMessage());}}} 使用 @RabbitListener 進(jìn)行監(jiān)聽,@RabbitHandler定義消息處理方法。

在進(jìn)行監(jiān)聽時(shí),會查詢所有@RabbitHandler注解的消息處理方法,如果沒有參數(shù)類型匹配的方法,則異常。

@RabbitListener 注意

  • 消息處理方法參數(shù)是由 MessageConverter 轉(zhuǎn)化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實(shí)例中去設(shè)置(默認(rèn) Spring 使用的實(shí)現(xiàn)是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 屬性表示消息 body 數(shù)據(jù)以什么數(shù)據(jù)格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應(yīng)類型接收消息 body 內(nèi)容,但若方法參數(shù)類型不正確會拋異常:

    • application/octet-stream:二進(jìn)制字節(jié)數(shù)組存儲,使用 byte[]
    • application/x-java-serialized-object:java 對象序列化格式存儲。使用 Object、相應(yīng)類型(反序列化時(shí)類型應(yīng)該同包同名,否者會拋出找不到類異常)
    • text/plain:文本數(shù)據(jù)類型存儲。使用 String
    • application/json:JSON 格式。使用 Object、相應(yīng)類型

注意:@RabbitListener注解在類上或方法上,行為不一樣。在類上,消費(fèi)方法參數(shù)類型不可以設(shè)置為Message。在方法上,方法類型可以為Message

MessageConvert

  • 涉及網(wǎng)絡(luò)傳輸?shù)膽?yīng)用序列化不可避免,發(fā)送端以某種規(guī)則將消息轉(zhuǎn)成 byte 數(shù)組進(jìn)行發(fā)送,接收端則以約定的規(guī)則進(jìn)行 byte[] 數(shù)組的解析
  • RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸?shù)膬?nèi)容,RabbitMQ 抽象出一個(gè) MessageConvert 接口處理消息的序列化,其實(shí)現(xiàn)有 SimpleMessageConverter(默認(rèn))、Jackson2JsonMessageConverter 等
  • 當(dāng)調(diào)用了 convertAndSend 方法時(shí)會使用 MessageConvert 進(jìn)行消息的序列化
  • SimpleMessageConverter 對于要發(fā)送的消息體 body 為 byte[] 時(shí)不進(jìn)行處理,如果是 String 則轉(zhuǎn)成字節(jié)數(shù)組,如果是 Java 對象,則使用 jdk 序列化將消息轉(zhuǎn)成字節(jié)數(shù)組,轉(zhuǎn)出來的結(jié)果較大,含class類名,類相應(yīng)方法等信息。因此性能較差
  • 當(dāng)使用 RabbitMQ 作為中間件時(shí),數(shù)據(jù)量比較大,此時(shí)就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

設(shè)置MessageConvert

Json格式

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}

?自定義

public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {return null;}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){return (User)ois.readObject();}catch (Exception e){e.printStackTrace();return null;}}});return factory;}

@Payload 與 @Headers

  • 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
? ? System.out.println("body:"+body);
? ? System.out.println("Headers:"+headers);
}

  • 也可以獲取單個(gè) Header 屬性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
? ? System.out.println("body:"+body);
? ? System.out.println("token:"+token);
}
?

通過 @RabbitListener 注解聲明 Binding

  • 通過 @RabbitListener 的 bindings 屬性聲明 Binding(若 RabbitMQ 中不存在該綁定所需要的 Queue、Exchange、RouteKey 則自動創(chuàng)建,若存在則拋出異常)
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),value = @Queue(value = "consumer_queue",durable = "true"),key = "key.#" )) public void processMessage1(Message message) {System.out.println(message); }

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 可以標(biāo)注在類上面,需配合 @RabbitHandler 注解一起使用
  • @RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
@Component @RabbitListener(queues = "consumer_queue") public class Receiver {@RabbitHandlerpublic void processMessage1(String message) {System.out.println(message);}@RabbitHandlerpublic void processMessage2(byte[] message) {System.out.println(new String(message));}}

?

?

?

總結(jié)

以上是生活随笔為你收集整理的Spring cloud集成Rabbitmq的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。