javascript
Spring Boot:使用Rabbit MQ消息队列
?
綜合概述
消息隊列
消息隊列就是一個消息的鏈表,可以把消息看作一個記錄,具有特定的格式以及特定的優先級。對消息隊列有寫權限的進程可以向消息隊列中按照一定的規則添加新消息,對消息隊列有讀權限的進程則可以從消息隊列中讀走消息,而消息隊列就是在消息的傳輸過程中保存消息的容器,你可以簡單的把消息隊列理解為類似快遞柜,快遞員(消息發布者)往快遞柜(消息隊列)投遞物件(消息),接受者(消息訂閱者)從快遞柜(消息隊列)接收物件(消息),當然消息隊列往往還包含一些特定的消息傳遞和接收機制。
消息隊列作為分布式系統中重要的組件,可以有效解決應用耦合,異步消息,流量削鋒等系列問題,有利于實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,各種消息隊列也都各有特點,比如Kafka提供高性能、高吞吐量,但可靠性有所欠缺,所以比較適合像日志處理這類對性能要求高但對可靠性要求沒那么嚴格的業務,再比如RabbitMQ支持了各種協議,實現較為臃腫,性能和吞吐量都一般,但卻提供了很好的可靠性,比較適合像銀行金融一類對可靠性要求較高的業務。
應用場景
以下簡單介紹幾個消息隊列在實際應用中的使用場景(以下場景資料引用自網絡)。
1 異步處理
場景說明:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種 1.串行的方式;2.并行方式
(1)串行方式:將注冊信息寫入數據庫成功后,發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端
?
(2)并行方式:將注冊信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。與串行的差別是,并行的方式可以提高處理的時間
?
假設三個業務節點每個使用50毫秒鐘,不考慮網絡等其他開銷,則串行方式的時間是150毫秒,并行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。并行方式處理的請求量是10次(1000/100)
小結:如以上案例描述,傳統的方式系統的性能(并發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入消息隊列,將不是必須的業務邏輯,異步處理。改造后的架構如下:
?
按照以上約定,用戶的響應時間相當于是注冊信息寫入數據庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了兩倍
2 應用解耦
場景說明:用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖
?
傳統模式的缺點:
-
假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗
-
訂單系統與庫存系統耦合
如何解決以上問題呢?引入應用消息隊列后的方案,如下圖:
?
-
訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功
-
庫存系統:訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作
-
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的應用解耦
3 流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
-
可以控制活動的人數
-
可以緩解短時間內高流量壓垮應用
?
-
用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面
-
秒殺業務根據消息隊列中的請求信息,再做后續處理
4 日志處理
日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸的問題。架構簡化如下
?
-
日志采集客戶端,負責日志數據采集,定時寫受寫入Kafka隊列
-
Kafka消息隊列,負責日志數據的接收,存儲和轉發
-
日志處理應用:訂閱并消費kafka隊列中的日志數據
以下是新浪kafka日志處理應用案例:
?
(1)Kafka:接收用戶日志的消息隊列
(2)Logstash:做日志解析,統一成JSON輸出給Elasticsearch
(3)Elasticsearch:實時日志分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能
(4)Kibana:基于Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因
5 消息通訊
消息通訊是指,消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等
點對點通訊:
?
客戶端A和客戶端B使用同一隊列,進行消息通訊。
聊天室通訊:
?
客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發布和接收。實現類似聊天室效果。
以上實際是消息隊列的兩種消息模式,點對點或發布訂閱模式。模型為示意圖,供參考。
Rabbit MQ
AMQP,即 Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦和通訊。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性和安全。
RabbitMQ是一個開源的AMQP實現,服務器端用 Erlang 語言編寫,支持多種客戶端,如:Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,具有很高的易用性和可用性。
?
接下來,我們先來了解幾個相關概念(以下相關介紹資料引用自網絡)。
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。ConnectionFactory為Connection的制造工廠。 Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
Queue
Queue(隊列)是RabbitMQ的內部對象,用于存儲消息。
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息并最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息并消費。
生產者Send Message “A”被傳送到Queue中,消費者發現消息隊列Queue中有訂閱的消息,就會將這條消息A讀取出來進行一些列的業務操作。這里只是一個消費正對應一個隊列Queue,也可以多個消費者訂閱同一個隊列Queue,當然這里就會將Queue里面的消息平分給其他的消費者,但是會存在一個一個問題就是如果每個消息的處理時間不同,就會導致某些消費者一直在忙碌中,而有的消費者處理完了消息后一直處于空閑狀態,因為前面已經提及到了Queue會平分這些消息給相應的消費者。這里我們就可以使用prefetchCount來限制每次發送給消費者消息的個數。詳情見下圖所示:
這里的prefetchCount=1是指每次從Queue中發送一條消息來。等消費者處理完這條消息后Queue會再發送一條消息給消費者。
Message acknowledgment
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息后發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒有收到回執并檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這里不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。 這里會產生另外一個問題,如果我們的開發人員在處理完業務邏輯后,忘記發送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的消息會越來越多;消費者重啟后會重復消費這些消息并重復執行業務邏輯…
另外pub message是沒有ack的。
Message durability
如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失消息,我們可以將Queue與Message都設置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),如果我們需要對這種小概率事件也要管理起來,那么我們要用到事務。由于這里僅為RabbitMQ的簡單介紹,所以這里將不講解RabbitMQ相關的事務。
Exchange
首先明確一點就是生產者產生的消息并不是直接發送給消息隊列Queue的,而是要經過Exchange(交換器),由Exchange再將消息路由到一個或多個Queue,當然這里還會對不符合路由規則的消息進行丟棄掉,這里指的是后續要談到的Exchange Type。那么Exchange是怎樣將消息準確的推送到對應的Queue的呢?那么這里的功勞最大的當屬Binding,RabbitMQ是通過Binding將Exchange和Queue鏈接在一起,這樣Exchange就知道如何將消息準確的推送到Queue中去。簡單示意圖如下所示:
? ? ? ??
在綁定(Binding)Exchange和Queue的同時,一般會指定一個Binding Key,生產者將消息發送給Exchange的時候,一般會產生一個Routing Key,當Routing Key和Binding Key對應上的時候,消息就會發送到對應的Queue中去。那么Exchange有四種類型,不同的類型有著不同的策略。也就是表明不同的類型將決定綁定的Queue不同,換言之就是說生產者發送了一個消息,Routing Key的規則是A,那么生產者會將Routing Key=A的消息推送到Exchange中,這時候Exchange中會有自己的規則,對應的規則去篩選生產者發來的消息,如果能夠對應上Exchange的內部規則就將消息推送到對應的Queue中去。那么接下來就來詳細講解下Exchange里面類型。
Exchange Types
- fanout
? ? ? ? fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
? ? ? ?
??? 上圖所示,生產者(P)生產消息1將消息1推送到Exchange,由于Exchange Type=fanout這時候會遵循fanout的規則將消息推送到所有與它綁定Queue,也就是圖上的兩個Queue最后兩個消費者消費。
- direct
? ? ? ? direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中
? ? ? ? ?
? ???當生產者(P)發送消息時Rotuing key=booking時,這時候將消息傳送給Exchange,Exchange獲取到生產者發送過來消息后,會根據自身的規則進行與匹配相應的Queue,這時發現Queue1和Queue2都符合,就會將消息傳送給這兩個隊列,如果我們以Rotuing key=create和Rotuing key=confirm發送消息時,這時消息只會被推送到Queue2隊列中,其他Routing Key的消息將會被丟棄。
- topic
? ? ? 前面提到的direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候才將消息傳送給Queue,那么topic這個規則就是模糊匹配,可以通過通配符滿足一部分規則就可以傳送。它的約定是:
? ? ??
當生產者發送消息Routing Key=F.C.E的時候,這時候只滿足Queue1,所以會被路由到Queue中,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B時,這里只會發送一條消息到Queue2中。
- headers
????????headers類型的Exchange不依賴于routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
實現案例
首先,需要安裝Rabbit MQ,可以直接安裝,也可以用Docker安裝,這個網上教程很多,這里就不再贅述了。
生成項目模板
為方便我們初始化項目,Spring Boot給我們提供一個項目模板生成網站。
1.? 打開瀏覽器,訪問:https://start.spring.io/
2.? 根據頁面提示,選擇構建工具,開發語言,項目信息等。
3.? 點擊 Generate the project,生成項目模板,生成之后會將壓縮包下載到本地。
4.? 使用IDE導入項目,我這里使用Eclipse,通過導入Maven項目的方式導入。
添加相關依賴
清理掉不需要的測試類及測試依賴,添加 rabbitmq相關依賴。
<!-- rabbitmq --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>下面給出完整的POM文件。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.louis.springboot</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>demo</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- swagger --><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!-- rabbitmq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>添加相關配置
添加一個swagger 配置類,在工程下新建 config 包并添加一個 SwaggerConfig 配置類。
SwaggerConfig.java
package com.louis.springboot.demo.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2;@Configuration @EnableSwagger2 public class SwaggerConfig {@Beanpublic Docket createRestApi(){return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select().apis(RequestHandlerSelectors.any()).paths(PathSelectors.any()).build();}private ApiInfo apiInfo(){return new ApiInfoBuilder().title("Swagger API Doc").description("This is a restful api document of Swagger.").version("1.0").build();}}修改application.properties文件名為application.yml,在其中添加RabbitMQ配置信息,根據自己安裝的RabbitMQ配置。
application.yml
# rabbitmq配置 spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest普通隊列模式
新建一個RabbitMQ配置類,并添加一個demoQueue隊列。
RabbitConfig.java
package com.louis.springboot.demo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {/*** 定義demoQueue隊列* @return*/@Beanpublic Queue demoString() {return new Queue("demoQueue");}}編寫一個消息發布者,并編寫一個發送方法,通過AmqpTemplate往"demoQueue"發送消息。
RabbitProducer.java
package com.louis.springboot.demo.mq; import java.text.SimpleDateFormat; import java.util.Date;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class RabbitProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;public void sendDemoQueue() {Date date = new Date();String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);System.out.println("[demoQueue] send msg: " + dateString); // 第一個參數為剛剛定義的隊列名稱this.rabbitTemplate.convertAndSend("demoQueue", dateString);} }編寫一個消息消費者,通過@RabbitListener(queues = "demoQueue")注解監聽"demoQueue"隊列,并用@RabbitHandler注解相關方法,這樣在在隊列收到消息之后,交友@RabbitHandler注解的方法進行處理。
DemoQueueConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "demoQueue") public class DemoQueueConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[demoQueue] recieved message: " + msg);}}編寫一個控制器,注入RabbitProducer調用相關消息發送方法,方便通過接口觸發消息發送。
RabbitMqController.java
package com.louis.springboot.demo.controller;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.louis.springboot.demo.mq.RabbitProducer;@RestController public class RabbitMqController {@Autowiredprivate RabbitProducer rabbitProducer;@GetMapping("/sendDemoQueue")public Object sendDemoQueue() {rabbitProducer.sendDemoQueue();return "success";} }編譯并啟動應用,打開瀏覽器,訪問:http://localhost:8080/swagger-ui.html,進入swagger接口文檔界面。
調用兩次sendDemoQueue接口,在控制臺可以看到我們輸出的信息,說明消息已經成功發送并被消費。
[demoQueue] send msg: 2019-58-183 04:07:38 [demoQueue] recieved message: 2019-58-183 04:07:38 [demoQueue] send msg: 2019-01-183 05:07:05 [demoQueue] recieved message: 2019-01-183 05:07:05Fanout廣播模式
Fanout其實就是廣播模式,只要跟它綁定的隊列都會通知并且接受到消息。修改配置類,在RabbitConfig中添加如下fanout模式的隊列跟交換機信息。在代碼中我們配置了三個隊列名、一個fanout交換機,并且將這三個隊列綁定到了fanout交換器上。只要我們往這個交換機生產新的消息,那么這三個隊列都會收到。
RabbitConfig.java
//=================== fanout廣播模式 ====================@Beanpublic Queue fanoutA() {return new Queue("fanout.a");}@Beanpublic Queue fanoutB() {return new Queue("fanout.b");}@Beanpublic Queue fanoutC() {return new Queue("fanout.c");}/*** 定義個fanout交換器* @return*/@BeanFanoutExchange fanoutExchange() {// 定義一個名為fanoutExchange的fanout交換器return new FanoutExchange("fanoutExchange");}/*** 將定義的fanoutA隊列與fanoutExchange交換機綁定* @return*/@Beanpublic Binding bindingExchangeWithA() {return BindingBuilder.bind(fanoutA()).to(fanoutExchange());}/*** 將定義的fanoutB隊列與fanoutExchange交換機綁定* @return*/@Beanpublic Binding bindingExchangeWithB() {return BindingBuilder.bind(fanoutB()).to(fanoutExchange());}/*** 將定義的fanoutC隊列與fanoutExchange交換機綁定* @return*/@Beanpublic Binding bindingExchangeWithC() {return BindingBuilder.bind(fanoutC()).to(fanoutExchange());}然后我們在RabbitProducer中添加一個sendFanout方法,用來向fanout隊列發送消息。
RabbitProducer.java
public void sendFanout() {Date date = new Date();String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);System.out.println("[fanout] send msg:" + dateString);// 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey 我們不用管空著就可以,第三個是你要發送的消息this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString); }同樣的,在控制器里添加一個訪問接口。
RabbitMqController.java
@GetMapping("/sendFanout") public Object sendFanout() {rabbitProducer.sendFanout();return "success"; }接著針對三個廣播隊列分別編寫一個消息消費者,指定隊列和處理函數。
FanoutAConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "fanout.a") public class FanoutAConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[fanout.a] recieved message: " + msg);} }FanoutBConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "fanout.b") public class FanoutBConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[fanout.b] recieved message: " + msg);} }FanoutCConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "fanout.c") public class FanoutCConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[fanout.c] recieved message: " + msg);} }重新啟動應用,調用sendFanout接口,通過控制臺可以看到消息發送之后,a, b, c三個隊列都收到了消息。
[fanout] send msg:2019-47-183 05:07:12 [fanout.c] recieved message: 2019-47-183 05:07:12 [fanout.b] recieved message: 2019-47-183 05:07:12 [fanout.a] recieved message: 2019-47-183 05:07:12Topic主題模式
利用topic模式可以實現模糊匹配,同樣的,在RabbitConfig中配置topic隊列跟交換器,注意的是這里需要多配置一個bindingKey。
RabbitConfig.java
//=================== topic主題模式 ====================@Beanpublic Queue topiocA() {return new Queue("topic.a");}@Beanpublic Queue topicB() {return new Queue("topic.b");}@Beanpublic Queue topicC() {return new Queue("topic.c");}/*** 定義個topic交換器* @return*/@BeanTopicExchange topicExchange() {// 定義一個名為fanoutExchange的fanout交換器return new TopicExchange("topicExchange");}/*** 將定義的topicA隊列與topicExchange交換機綁定* @return*/@Beanpublic Binding bindingTopicExchangeWithA() {return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg");}/*** 將定義的topicB隊列與topicExchange交換機綁定* @return*/@Beanpublic Binding bindingTopicExchangeWithB() {return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#");}/*** 將定義的topicC隊列與topicExchange交換機綁定* @return*/@Beanpublic Binding bindingTopicExchangeWithC() {return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z");}上述配置中:
topicA的key為topic.msg 那么他只會接收包含topic.msg的消息
topicB的key為topic.#那么他只會接收topic開頭的消息
topicC的key為topic.*.z那么他只會接收topic.x.z這樣格式的消息
然后修改RabbitProducer,在其中添加如下三個方法,如方法名所示,分別根據匹配規則發送到A\B,B,B\C隊列。
RabbitProducer.java
public void sendTopicTopicAB() {Date date = new Date();String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);dateString = "[topic.msg] send msg:" + dateString;System.out.println(dateString);// 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey topic.msg,第三個是你要發送的消息// 這條信息將會被 topic.a topic.b接收this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString); }public void sendTopicTopicB() {Date date = new Date();String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);dateString = "[topic.good.msg] send msg:" + dateString;System.out.println(dateString);// 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey ,第三個是你要發送的消息// 這條信息將會被topic.b接收this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString); }public void sendTopicTopicBC() {Date date = new Date();String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date);dateString = "[topic.m.z] send msg:" + dateString;System.out.println(dateString);// 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey ,第三個是你要發送的消息// 這條信息將會被topic.b、topic.c接收this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString); }同樣的,在控制器里面添加發送服務對應的接口。
RabbitMqController.java
@GetMapping("/sendTopicTopicAB") public Object sendTopicTopicAB() {rabbitProducer.sendTopicTopicAB();return "success"; }@GetMapping("/sendTopicTopicB") public Object sendTopicTopicB() {rabbitProducer.sendTopicTopicB();return "success"; }@GetMapping("/sendTopicTopicBC") public Object sendTopicTopicBC() {rabbitProducer.sendTopicTopicBC();return "success"; }接著針對三個主題隊列編寫對應的消息消費者。
TopicAConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "topic.a") public class TopicAConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[topic.a] recieved message:" + msg);} }TopicBConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "topic.b") public class TopicBConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[topic.b] recieved message:" + msg);} }TopicCConsumer.java
package com.louis.springboot.demo.mq;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "topic.c") public class TopicCConsumer {/*** 消息消費* @RabbitHandler 代表此方法為接受到消息后的處理方法*/@RabbitHandlerpublic void recieved(String msg) {System.out.println("[topic.c] recieved message:" + msg);} }重啟應用,調用sendTopicTopicAB接口,經過匹配,route key為“topic.msg”的消息被發送到了topic.a和topic.b。
[topic.msg] send msg:2019-12-183 06:07:22 [topic.b] recieved message:[topic.msg] send msg:2019-12-183 06:07:22 [topic.a] recieved message:[topic.msg] send msg:2019-12-183 06:07:22調用sendTopicTopicB接口,經過匹配,route key為“topic.good.msg”的消息被發送到了topic.b。
[topic.good.msg] send msg:2019-15-183 06:07:23 [topic.b] recieved message:[topic.good.msg] send msg:2019-15-183 06:07:23調用sendTopicTopicBC接口,經過匹配,route key為“topic.m.z”的消息被發送到了topic.b和topic.c。
[topic.m.z] send msg:2019-16-183 06:07:09 [topic.b] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09 [topic.c] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09?
總結
以上是生活随笔為你收集整理的Spring Boot:使用Rabbit MQ消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .如何在Linux上安装Postman应
- 下一篇: springboot高级——消息队列相关