8.springBoot消息服务
參考博客
https://blog.csdn.net/daqi1983/article/details/116144764 8-Spring Boot消息服務(wù)
視頻鏈接
https://www.bilibili.com/video/BV1nz4y1X7jh?p=12
1.消息服務(wù)概述
1.為什么使用消息服務(wù)
在多數(shù)應(yīng)用尤其是分布式系統(tǒng)中,消息服務(wù)是不可或缺的重要部分,它使用起來比較簡單,同時解決了不少難題,例如異步處理、應(yīng)用解耦、流量削鋒、分布式事務(wù)管理等,使用消息服務(wù)可以實現(xiàn)一個高性能、高可用、高擴(kuò)展的系統(tǒng)。
a.異步處理
b.應(yīng)用解耦
c.流量削峰
d.分布式事務(wù)管理
2.常用的消息中間件
1.ActiveMQ(性能相對較弱) 2.RabbitMQ 3.Kafka 4.RocketMQ2.RabbitMQ 消息中間件
1.RabbitMQ 簡介
RabbitMQ是基于AMQP協(xié)議的輕量級、可靠、可伸縮和可移植的消息代理,Spring使用RabbitMQ通過AMQP協(xié)議進(jìn)行通信,在Spirng Boot中對RabbitMQ進(jìn)行了集成管理。
2.消息代理過程
3.RabbitMQ 工作模式介紹
a.Work queues(工作隊列模式) b.Publish/Subscribe(發(fā)布訂閱模式) c.Routing(路由模式) d.Topics(通配符模式) e.RPC3.RabbitMQ安裝及整合環(huán)境搭建
Windows下RabbitMQ安裝及配置 https://blog.csdn.net/weixin_42673046/article/details/118442323
RabbitMQ的安裝(超詳細(xì))https://blog.csdn.net/qq_42498815/article/details/116139712
在Win環(huán)境下安裝Erlang https://blog.csdn.net/g6256613/article/details/80191402
Windows下RabbitMQ安裝及配置 https://blog.csdn.net/zhm3023/article/details/82217222 (效果極好)
4.Spring Boot與RabbitMQ整合實現(xiàn)
4.1、創(chuàng)建Spring Boot項目,添加Web依賴以及RabbitMQ依賴。
4.2、在pplication.properties配置文件中編寫需要設(shè)置的配置屬性。
# 配置RabbitMQ消息中間件連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虛擬主機(jī)路徑/,默認(rèn)可以省略 spring.rabbitmq.virtual-host=/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.7.RELEASE</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.7.RELEASE</version><configuration><mainClass>com.example.demo.DemoApplication</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build> </project>5.Publish/Subscribe(發(fā)布訂閱模式)
1.基于API的方式及測試的搭建步驟
a.使用AmqpAdmin定制消息組件。
① 在測試類中先引入AmqpAdmin管理類,定制Publish/Subscribe工作模式所需的消息組件
@Autowiredprivate AmqpAdmin amqpAdmin;/*** 使用AmqpAdmin管理員API定制消息組件 import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue;*/@Testpublic void amqpAdmin() {// 1、定義fanout類型的交換器amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));// 2、定義兩個默認(rèn)持久化隊列,分別處理email和smsamqpAdmin.declareQueue(new Queue("fanout_queue_email"));amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));// 3、將隊列分別與交換器進(jìn)行綁定amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null));amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));} //注意Queue會報紅,這時候要導(dǎo)一下AmQp的包 //運行的使用要用rabbitmq-server.bat指令開啟服務(wù),否則會報無法連接rabbitMQ的錯誤② 執(zhí)行上述單元測試方法amqpAdmin(),驗證RabbitMQ消息組件的定制效果,通過RabbitMQ可視化管理頁面的Exchanges面板查看效果。
③ 在RabbitMQ可視化管理頁面的Exchanges面板中新出現(xiàn)了一個名稱為fanout_exchange的交換器,且其類型是設(shè)置的fanout類型。單擊fanout_exchange交換器進(jìn)入查看。
④ 切換到Queues面板頁面,查看定制生成的消息隊列信息
b.消息發(fā)送者發(fā)送消息
① 創(chuàng)建一個實體類User,將User對象作為消息的內(nèi)容
public class User {private Integer id;private String username;//自己補(bǔ)充其他//idea右鍵自動生成get和set以及toString方法 }② 在項目測試類中使用Spring框架提供的RabbitTemplate模板類實現(xiàn)消息發(fā)送。
@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 1、Publish/Subscribe工作模式消息發(fā)送端*/@Testpublic void psubPublisher() {User user=new User();user.setId(1);user.setUsername("石頭");rabbitTemplate.convertAndSend("fanout_exchange","",user);}3、執(zhí)行上述消息發(fā)送的測試方法psubPublisher(),將出現(xiàn)異常,控制執(zhí)行效果如圖
這個轉(zhuǎn)換器只支持Spring、byte[]和Serializable序列化后的消息
異常的兩種解決方式:
將實體類實現(xiàn)JDK自帶的Serializable序列化接口,不推薦,RabbitMQ管理頁面顯示消息不夠有好
定制其他類型的消息轉(zhuǎn)換器
創(chuàng)建一個RabbitMQ消息配置類RabbitMQConfig,并在該配置類中通過@Bean注解自定義了一個Jackson2JsonMessageConverter類型的消息轉(zhuǎn)換器組件,該組件的返回值必須為MessageConverter類型
執(zhí)行上述測試方法,執(zhí)行成功,查看RabbitMQ可視化管理頁面Queues面板信息,兩個消息隊列中各自擁有一條待接收的消息。
單擊某個隊列的詳情頁面,查看具體的信息
c.消息消費者接收消息
消息消費者接受消息,創(chuàng)建一個針對RabbitMQ消息中間件進(jìn)行消息接收和處理的業(yè)務(wù)類RabbitMQService,在該類中編寫如下方法
啟動項目來監(jiān)聽并接收消息隊列中的消息。程序啟動成功后,立即查看控制臺打印結(jié)果
2.基于配置類的方式
① 打開RabbitMQ消息配置類RabbitMQConfig,定義fanout類型的交換器、不同名稱的消息隊列以及將不同名稱的消息隊列與交換器綁定。
/*** 使用基于配置類的方式定制消息組件* @return*/// 1、定義fanout類型的交換器@Beanpublic Exchange fanout_exchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } // 2、定義兩個不同名稱的消息隊列@Beanpublic Queue fanout_queue_email(){ return new Queue("fanout_queue_email"); } @Beanpublic Queue fanout_queue_sms(){ return new Queue("fanout_queue_sms"); } // 3、將兩個不同名稱的消息隊列與交換器進(jìn)行綁定@Beanpublic Binding bindingEmail(){ return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();}@Beanpublic Binding bindingSms(){return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();}② 重新運行測試類的psubPublisher方法,來發(fā)送信息,之前定義的消息消費者會消費這個消息。
6.Routing(路由模式)
1.使用基于注解的方式定制消息組件和消息消費者
打開業(yè)務(wù)類RabbitMQService,在該類中使用@RabbitListener注解及其相關(guān)屬性定制Routing路由模式的消息組件,并模擬編寫消息消費者接收的方法。
/*** 2.1、路由模式消息接收,處理error級別日志信息* @param message*/@RabbitListener(bindings =@QueueBinding(value =@Queue("routing_queue_error"),exchange =@Exchange(value = "routing_exchange",type = "direct"),key = "error_routing_key"))public void routingConsumerError(String message) {System.out.println("接收到error級別日志消息: "+message);}/*** 2.2、路由模式消息接收,處理info、error、warning級別日志信息* @param message*/@RabbitListener(bindings =@QueueBinding(value =@Queue("routing_queue_all"),exchange =@Exchange(value = "routing_exchange",type = "direct"),key = {"error_routing_key","info_routing_key","warning_routing_key"}))public void routingConsumerAll(String message) {System.out.println("接收到info、error、warning等級別日志消息: "+message);}2.消息發(fā)送者發(fā)送消息
在測試類中使用RabbitTemplate模板類實現(xiàn)Routing路由模式下的消息發(fā)送
/*** 2、Routing工作模式消息發(fā)送端*/@Testpublic void routingPublisher() {rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send error message");}將測試方法routingPublisher()中進(jìn)行消息發(fā)送的參數(shù)進(jìn)行修改,調(diào)整發(fā)送info級別的日志信息(注意同時修改info_routing_key路由鍵)
再次啟動該測試方法,查看控制臺執(zhí)行效果,如圖
通過RabbitMQ可視化管理頁面查看Routing模式的消息組件,如圖所示
7.Topics(通配符模式)
1.使用基于注解的方式定制消息組件和消息消費者
打開業(yè)務(wù)類RabbitMQService,在該類中使用@RabbitListener注解及其相關(guān)屬性定制Topics通配符模式的消息組件,并模擬編寫消息消費者接收的方法。
/*** 3.1、通配符模式消息接收,進(jìn)行郵件業(yè)務(wù)訂閱處理* @param message*/@RabbitListener(bindings =@QueueBinding(value =@Queue("topic_queue_email"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.email.#"))public void topicConsumerEmail(String message) {System.out.println("接收到郵件訂閱需求處理消息: "+message);}/*** 3.2、通配符模式消息接收,進(jìn)行短信業(yè)務(wù)訂閱處理* @param message*/@RabbitListener(bindings =@QueueBinding(value =@Queue("topic_queue_sms"),exchange =@Exchange(value = "topic_exchange",type = "topic"),key = "info.#.sms.#"))public void topicConsumerSms(String message) {System.out.println("接收到短信訂閱需求處理消息: "+message);}2.消息發(fā)送者發(fā)送消息
在項目測試類Chapter08ApplicationTests中使用RabbitTemplate模板類實現(xiàn)Routing路由模式下的消息發(fā)送。
/*** 3、Topcis工作模式消息發(fā)送端*/@Testpublic void topicPublisher() {// 1、只發(fā)送郵件訂閱用戶消息 // rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message");// 2、只發(fā)送短信訂閱用戶消息 // rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send sms message");// 3、發(fā)送同時訂閱郵件和短信的用戶消息rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms message");}執(zhí)行測試方法topicPublisher(),先進(jìn)行郵件訂閱用戶的消息發(fā)送
進(jìn)行短信訂閱用戶的消息發(fā)送
同時進(jìn)行郵件和短信訂閱用戶的消息發(fā)送方法
通過RabbitMQ可視化管理頁面查看自動定制的Topics通配符模式的消息組件,使用基于注解的方式自動生成了Topics通配符模式下的消息組件,并進(jìn)行了自動綁定。
作業(yè) :Publish/Subscribe(發(fā)布訂閱模式)下,使用注解方式實現(xiàn)消息服務(wù)
附錄
https://blog.csdn.net/weixin_37641832/article/details/83270778 深入理解AMQP協(xié)議
總結(jié)
以上是生活随笔為你收集整理的8.springBoot消息服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 真的很大:一大波iPad Pro高清上手
- 下一篇: 在proteus自带的stm32f401