日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制

發(fā)布時(shí)間:2024/4/13 javascript 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
引入了spring-boot-starter-amqp模塊,他引入了spring-messaging模塊,包括引入了spring-rabbit模塊,怎么配置使用呢,<dependency><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId> </dependency>SpringBoot主要是引入了相關(guān)場(chǎng)景依賴,一切都是自動(dòng)配置的,我們可以來分析一下原理,rabbitmq給我們自動(dòng)配置了哪些 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.learn</groupId><artifactId>springboot-02-amqp</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.12.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><thymeleaf.version>3.0.9.RELEASE</thymeleaf.version><thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><!-- 這個(gè)插件,可以將應(yīng)用打包成一個(gè)可執(zhí)行的jar包 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project> 我們就找一下和rabbitmq相關(guān)的類,有一個(gè)叫RabbitAutoConfiguration,這個(gè)自動(dòng)配置類里面幫我們做了什么,我們?cè)谶@記錄一下,自動(dòng)配置類,我們配了哪些,進(jìn)來看,這一塊我們配了連接工廠,這是人家?guī)臀覀兣浜玫倪B接工廠CachingConnectionFactory,有自動(dòng)配置了連接工廠,從這里可以獲取,Rabbitmq的連接,比如我們的主機(jī)地址,用戶名,密碼,還有VisualHost等等@Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator {@Beanpublic CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)throws Exception {RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();if (config.determineHost() != null) {factory.setHost(config.determineHost());}factory.setPort(config.determinePort());if (config.determineUsername() != null) {factory.setUsername(config.determineUsername());}if (config.determinePassword() != null) {factory.setPassword(config.determinePassword());}if (config.determineVirtualHost() != null) {factory.setVirtualHost(config.determineVirtualHost());}if (config.getRequestedHeartbeat() != null) {factory.setRequestedHeartbeat(config.getRequestedHeartbeat());}RabbitProperties.Ssl ssl = config.getSsl();if (ssl.isEnabled()) {factory.setUseSSL(true);if (ssl.getAlgorithm() != null) {factory.setSslAlgorithm(ssl.getAlgorithm());}factory.setKeyStore(ssl.getKeyStore());factory.setKeyStorePassphrase(ssl.getKeyStorePassword());factory.setTrustStore(ssl.getTrustStore());factory.setTrustStorePassphrase(ssl.getTrustStorePassword());}if (config.getConnectionTimeout() != null) {factory.setConnectionTimeout(config.getConnectionTimeout());}factory.afterPropertiesSet();CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory.getObject());connectionFactory.setAddresses(config.determineAddresses());connectionFactory.setPublisherConfirms(config.isPublisherConfirms());connectionFactory.setPublisherReturns(config.isPublisherReturns());if (config.getCache().getChannel().getSize() != null) {connectionFactory.setChannelCacheSize(config.getCache().getChannel().getSize());}if (config.getCache().getConnection().getMode() != null) {connectionFactory.setCacheMode(config.getCache().getConnection().getMode());}if (config.getCache().getConnection().getSize() != null) {connectionFactory.setConnectionCacheSize(config.getCache().getConnection().getSize());}if (config.getCache().getChannel().getCheckoutTimeout() != null) {connectionFactory.setChannelCheckoutTimeout(config.getCache().getChannel().getCheckoutTimeout());}return connectionFactory;}}都是從config里面得到的,config就是我們的RabbitProperties,我們點(diǎn)進(jìn)來@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties {這里面封裝了Rabbitmq的所有配置,那我們就來配置他們,找到我們的配置文件,spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=5672 #spring.rabbitmq.virtual-host=首先rabbitmq的主機(jī)地址,我們寫上我們的ip地址,http不用寫,http是web的訪問地址,還有rabbitmq的用戶名,我們也寫上,guest,guest用戶,還有我們的密碼,也叫g(shù)uest,還有rabbitmq的端口,這個(gè)端口呢,不寫默認(rèn)就是5672,這個(gè)可以不寫,virtual-host屬性我們之前有說過,是一定要指定的,我們也可以不指定,virtual-host如果我們不寫,如果是空字符串也是訪問"/"/*** If addresses have been set and the first address has a virtual host it is returned.* Otherwise returns the result of calling {@code getVirtualHost()}.* @return the virtual host or {@code null}* @see #setAddresses(String)* @see #getVirtualHost()*/ public String determineVirtualHost() {if (CollectionUtils.isEmpty(this.parsedAddresses)) {return getVirtualHost();}Address address = this.parsedAddresses.get(0);return address.virtualHost == null ? getVirtualHost() : address.virtualHost; }都可以通過配置文件來知道,這個(gè)配置我們基本寫完,但是我們?nèi)绾谓orabbitmq發(fā)送以及接收消息呢,我們可以看自動(dòng)配置,這個(gè)自動(dòng)配置除了看工廠,它還會(huì)給我們放一個(gè)RabbitTemplate,給容器中加一個(gè)他,給RabbitMQ接收和發(fā)送消息的,就像以前的JdbcTemplate,RedisTemplate,那么他還有什么呢,還可以來這里繼續(xù)來看,除了RabbitTemplate,他還放了AmqpAdmin,這AmqpAdmin呢,它是Rabbitmq的管理組件,Rabbitmq系統(tǒng)管理組件,他不來發(fā)消息和收消息,可以幫我們聲明一個(gè)隊(duì)列,創(chuàng)建一個(gè)交換器,這個(gè)是自動(dòng)配置給我們放的組件,我們就用RabbitTemplate,我在這個(gè)測(cè)試類里面 我們嘗試給消息隊(duì)列里面發(fā)送一些內(nèi)容,我們來測(cè)試幾個(gè)消息,第一個(gè)單播下的點(diǎn)對(duì)點(diǎn)消息,我們將一個(gè)消息發(fā)送隊(duì)列里面,我們可以使用rabbitTemplate,兩個(gè)方法,send傳入exchange,就是交換器,消息先給交換器,根據(jù)路由件放到哪個(gè)隊(duì)列里面,后面這個(gè)就是我們要發(fā)的消息,這個(gè)是需要我們自己來編輯這個(gè)消息,第一個(gè)傳一個(gè)交換器,第二個(gè)是傳一個(gè)路由件routekey,第三個(gè)傳一個(gè)message,他的好處呢就是,特別是message需要自己定義,Message需要自己定義,需要自己構(gòu)造一個(gè),自己構(gòu)造的時(shí)候呢,我們來看一下這個(gè)Message,我們要發(fā)的消息內(nèi)容,amqp這個(gè)核心包里面org.springframework.amqp.core.Message你要把你發(fā)的消息序列號(hào)成一個(gè)數(shù)組,還有你消息的頭信息,public Message(byte[] body, MessageProperties messageProperties) { //NOSONARthis.body = body; //NOSONARthis.messageProperties = messageProperties; }可以定制消息體內(nèi)容和消息頭,這是rabbitTemplate.send,我們常見的是convertAndSend,轉(zhuǎn)化并發(fā)送,我們只需要傳入交換器,路由件,還有數(shù)據(jù)對(duì)象,這個(gè)對(duì)象默認(rèn)就會(huì)被當(dāng)成消息體,而且會(huì)自動(dòng)轉(zhuǎn)換,我們只需要傳入要發(fā)送的對(duì)象,只需要傳入發(fā)送的對(duì)象,自動(dòng)序列化給rabbitmq,這個(gè)object是被當(dāng)成消息體的,默認(rèn)當(dāng)成消息體,我們就可以用這個(gè)方法,我們就直接用下面這個(gè)方法,我們要發(fā)點(diǎn)對(duì)點(diǎn)消息,我們就要按照交換器的規(guī)則,direct能夠直接派發(fā)給一個(gè)隊(duì)列,我們就連上這個(gè)交換器,我們就把消息給exchange.direct,我們連上這個(gè)交換器,這個(gè)交換器我們綁定了很多的路由件,比如china,路由件是他,交給這個(gè)隊(duì)列,我們就用china.news,我們用這個(gè)路由件,要發(fā)送的內(nèi)容是什么,就隨便寫一個(gè)map,我可以寫一個(gè)map,;里面存一些數(shù)據(jù)放進(jìn)去,比如msg存一些提示消息,這是我們要發(fā)的數(shù)據(jù),我來發(fā)送測(cè)試一下,我們看這個(gè)數(shù)據(jù)能不能到達(dá),這個(gè)運(yùn)行完成了,我們來我們的消息隊(duì)列來看一下localhost:15672我們剛才發(fā)的是china.news

消息是這個(gè)樣子的

是因?yàn)樾蛄刑?hào)默認(rèn)是JAVA的序列化方式,給我們序列化的對(duì)象,解碼以后的樣子,對(duì)象唄默認(rèn)序列化以后,發(fā)送出去,發(fā)送點(diǎn)對(duì)點(diǎn)消息,我們要接收到這個(gè)消息,怎么辦呢,可以用這個(gè)API,我們同樣用rabbitTemplate,有這個(gè)receive方法可以接收,包括還有receiveAndConverter,接收并轉(zhuǎn)化,如果用這個(gè)接收,你收來自哪個(gè)消息隊(duì)列的消息,會(huì)給你把這個(gè)消息轉(zhuǎn)成Messager,里面就只有消息頭和消息體,如果用receiveAndConverter,可以把消息體轉(zhuǎn)成我們想要的對(duì)象,我們最終的消息是發(fā)到china.news,這個(gè)里面了,所以我們要收數(shù)據(jù),也是收這個(gè)消息隊(duì)列的,在這里寫上消息隊(duì)列的名稱,然后把收過來的數(shù)據(jù)拿過來,這個(gè)數(shù)據(jù)的類型是什么,再來打印的數(shù)據(jù)是什么 package com.learn.springboot;import java.util.Arrays; import java.util.HashMap; import java.util.Map;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;/*** 1.單播(點(diǎn)對(duì)點(diǎn))*/@Testpublic void contextLoads() {// Message需要自己構(gòu)造一個(gè),定義消息體內(nèi)容和消息頭 // rabbitTemplate.send(exchange, routingKey, message);// object默認(rèn)當(dāng)成消息體,只需要傳入要發(fā)送的對(duì)象,自動(dòng)序列化發(fā)送給rabbitmq // rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);Map<String,Object> map = new HashMap<String,Object>();map.put("msg", "這是第一個(gè)消息");map.put("data", Arrays.asList("helloworld",1231,true)); // 對(duì)象被默認(rèn)序列化以后發(fā)送出去rabbitTemplate.convertAndSend("exchange.direct", "china.news",map);}@Testpublic void receive() {Object o = rabbitTemplate.receiveAndConvert("china.news");System.out.println(o.getClass());System.out.println(o);} } class java.util.HashMap {msg=這是第一個(gè)消息, data=[helloworld, 1231, true]} 數(shù)據(jù)收到的類型是HashMap,拿到這些數(shù)據(jù)沒什么問題,所以我們也能準(zhǔn)確的收到數(shù)據(jù),但是這個(gè)數(shù)據(jù)收到以后,來看這個(gè)消息隊(duì)列里面,這個(gè)消息隊(duì)列就沒有了

確實(shí)已經(jīng)沒有了,剛才已經(jīng)收到了,接收數(shù)據(jù),有時(shí)候我想把數(shù)據(jù)序列化成JSON的格式,那種序列化的結(jié)果有點(diǎn)不好看,如何將數(shù)據(jù)自動(dòng)的轉(zhuǎn)為JSON發(fā)送出去,其實(shí)非常的簡(jiǎn)單,我們到RabbitTemplate這里來看,為什么剛才是那種序列化結(jié)果,原因我們來往下翻,原因有一個(gè)消息轉(zhuǎn)換器private volatile MessageConverter messageConverter = new SimpleMessageConverter();這個(gè)轉(zhuǎn)換器呢,默認(rèn)是SimpleMessageConverter,這個(gè)和MessageConverter呢,我們來序列化數(shù)據(jù)的時(shí)候,就是按照J(rèn)DK的序列化規(guī)則,content = SerializationUtils.deserialize(createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));我們可以給他換一個(gè)MessageConverterorg.springframework.amqp.support.converter.MessageConverter我們來寫上配置,比如我們?cè)赾onfig包下寫一個(gè)MyAMQPConfig,這個(gè)Config我們來寫一個(gè)@Configuration,我們就自己來放一個(gè)MessageConverter,這是我們的MessageConverter,來放哪個(gè)MessageConverter呢,就來看他有哪些,抽象AbstractJsonMessageConverter有跟JSON有關(guān)的,Jackson2JsonMessageConverter,我們換成他,我們自動(dòng)配置也會(huì)生效,當(dāng)我們來配置RabbitTemplate的時(shí)候,如果我們有自己定義的MessageConverter,也會(huì)給我們?cè)O(shè)置進(jìn)來,@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);MessageConverter messageConverter = this.messageConverter.getIfUnique();if (messageConverter != null) {rabbitTemplate.setMessageConverter(messageConverter);}我們?cè)賮頊y(cè)試這個(gè)消息,我們把消息發(fā)出去,來看還是不是原來的消息 package com.learn.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class MyAMQPConfig {@Beanpublic MessageConverter messageConvert() {return new Jackson2JsonMessageConverter();} }

消息就是我們的JSON數(shù)據(jù),而且還有定義的消息頭,消息頭有我們key的類型,我們整個(gè)消息的類型,application/json,序列化能過去,存起來,能不能取出消息,我們看到這塊也是沒有問題的,class java.util.HashMap {msg=這是第一個(gè)消息, data=[helloworld, 1231, true]}比如我們發(fā)送Book對(duì)象,接下來我們來發(fā)一個(gè)圖書的對(duì)象,我們來new一個(gè)book package com.learn.bean;public class Book {private String bookName;private String author;public Book() {super();}public Book(String bookName, String author) {super();this.bookName = bookName;this.author = author;}public String getBookName() {return bookName;}public void setBookName(String bookName) {this.bookName = bookName;}public String getAuthor() {return author;}public void setAuthor(String author) {this.author = author;}@Overridepublic String toString() {return "Book [bookName=" + bookName + ", author=" + author + "]";}} package com.learn.springboot;import java.util.Arrays; import java.util.HashMap; import java.util.Map;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import com.learn.bean.Book;@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;/*** 1.單播(點(diǎn)對(duì)點(diǎn))*/@Testpublic void contextLoads() {// Message需要自己構(gòu)造一個(gè),定義消息體內(nèi)容和消息頭 // rabbitTemplate.send(exchange, routingKey, message);// object默認(rèn)當(dāng)成消息體,只需要傳入要發(fā)送的對(duì)象,自動(dòng)序列化發(fā)送給rabbitmq // rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);Map<String,Object> map = new HashMap<String,Object>();map.put("msg", "這是第一個(gè)消息");map.put("data", Arrays.asList("helloworld",1231,true)); // 對(duì)象被默認(rèn)序列化以后發(fā)送出去rabbitTemplate.convertAndSend("exchange.direct", "china.news",new Book("西游記","吳承恩"));}@Testpublic void receive() {Object o = rabbitTemplate.receiveAndConvert("china.news");System.out.println(o.getClass());System.out.println(o);} }

能不能反序列化獲取出來呢,我們先用Object對(duì)象來接收,class com.learn.bean.Book Book [bookName=西游記, author=吳承恩]這個(gè)Class拿到的是Book對(duì)象,強(qiáng)轉(zhuǎn)也可以在這里強(qiáng)轉(zhuǎn),消息序列化和反序列化,我們測(cè)試的是一個(gè)單播,那如果想廣播,不管是單播還是廣播,我們發(fā)給對(duì)應(yīng)的exchange,我們?cè)賮頊y(cè)試一下,廣播類型的轉(zhuǎn)換器,然后路由件就不用寫了,因?yàn)閺V播是不用管路由件的,我們來測(cè)試一下廣播 /*** 廣播*/@Testpublic void sendMsg() {rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三國演義","羅貫中"));}

?

總結(jié)

以上是生活随笔為你收集整理的SpringBoot高级-消息-RabbitTemplate发送接受消息序列化机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。