日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud —— 消息队列与 RocketMQ

發(fā)布時(shí)間:2025/3/12 javascript 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud —— 消息队列与 RocketMQ 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

導(dǎo)航

  • 一、什么是 MQ
  • 二、常見的 MQ 產(chǎn)品
  • 三、RocketMQ 概念與架構(gòu)設(shè)計(jì)
    • 3.1 基本概念
      • 1、消息模型(Message Model)
      • 2、生產(chǎn)者與消費(fèi)者(Producer & Consumer)
      • 3、主題(Topic)
      • 4、代理服務(wù)器與名稱服務(wù)(Broker Server & Name Server)
      • 5、拉取式與推送式消費(fèi)
      • 6、生產(chǎn)者組與消費(fèi)者組
      • 7、集群消費(fèi)與廣播消費(fèi)
      • 8、消息(Message)
      • 9、標(biāo)簽(Tag)
      • 10、順序消息(Ordered Message)
    • 3.2 架構(gòu)設(shè)計(jì)
    • 3.3 部署架構(gòu)
    • 3.4 集群工作流程
  • 四、RocketMQ 搭建
    • 4.1 基礎(chǔ)環(huán)境搭建
    • 4.2 控制臺安裝
  • 五、Java 實(shí)現(xiàn) MQ 消息發(fā)送與接收
  • 六、在 Spring 中使用 RocketMQ
    • 6.1 消息生產(chǎn)端
    • 6.2 消息消費(fèi)端

一、什么是 MQ

MQ 是 Message Queue 的縮寫,譯為 “消息隊(duì)列”。

MQ 的主要職責(zé)就是轉(zhuǎn)發(fā)生產(chǎn)者的消息給消費(fèi)者,并具備一定的消息緩存能力,在分布式系統(tǒng)中,常常用于各個(gè)應(yīng)用進(jìn)程之間的通訊行為。

在傳統(tǒng)的應(yīng)用間通訊手段上,往往大多采用直接訪問對方URL等同步的數(shù)據(jù)傳輸方式,客戶端與服務(wù)端的消息耦合,這在某些要求實(shí)時(shí)性和必要性的業(yè)務(wù)場景下是必需的,但對于某些業(yè)務(wù)場景,例如短信通知、郵件通知等,本身并不是主業(yè)務(wù)流程中必要的關(guān)鍵環(huán)節(jié),實(shí)時(shí)性也要求不高,因此,完全可以采用異步的方式來完成,MQ的一個(gè)重要作用就是基于這種情況,實(shí)現(xiàn)應(yīng)用間、業(yè)務(wù)間的異步解耦是將比較耗時(shí)且不需要即時(shí)響應(yīng)的的操作作為消息放入消息隊(duì)列,同時(shí),由于使用了MQ,只要保證消息格式不變,消息的發(fā)送方和接收方并不需要聯(lián)系彼此,也不需要受對方處理速度的影響,即解耦合。

流量削峰也是MQ 的常用場景,一般在秒殺或團(tuán)購活動(dòng)中使用廣泛。

二、常見的 MQ 產(chǎn)品

目前業(yè)界有很多MQ產(chǎn)品,比較出名的有以下這些:

1、ZeroMQ
號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景。擴(kuò)展性好,開發(fā)比較靈活,采用C語言實(shí)現(xiàn),實(shí)際上只是一個(gè)socket庫的重新封裝,如果做為消息隊(duì)列使用,需要開發(fā)大量的代碼。ZeroMQ僅提供非持久性的隊(duì)列,也就是說如果down機(jī),數(shù)據(jù)將會(huì)丟失。

2、RabbitMQ
使用 erlang 語言開發(fā),性能較好,適合企業(yè)級開發(fā),但不利于做二次開發(fā)和維護(hù)。

3、ActiveMQ
歷史悠久的 Apache 開源項(xiàng)目。已經(jīng)在很多產(chǎn)品中得到應(yīng)用,實(shí)現(xiàn)了JMS 1.1 規(guī)范,可以和 spring-jms 輕松融合,實(shí)現(xiàn)了多種協(xié)議,支持持久化,對隊(duì)列數(shù)較多的情況支持不好。

4、RocketMQ
阿里開源的 MQ 組件,由Java 開發(fā),性能很好,使用簡單。

5、Kafka
Apache 下的一個(gè)子項(xiàng)目,是一個(gè)高性能跨語言分布式 Publish/Subscribe 消息隊(duì)列系統(tǒng),相對于ActiveMQ 是一個(gè)非常輕量級的消息系統(tǒng),除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)。

三、RocketMQ 概念與架構(gòu)設(shè)計(jì)

本節(jié)內(nèi)容取材自官方文檔:基本概念、架構(gòu)設(shè)計(jì)

3.1 基本概念

Rocket MQ 中重要角色的比喻:

Producer 寄件人、Consumer 收件人、NameServer 郵局、Broker 郵遞員。

1、消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。這三者共同組成 RocketMQ 的消息模型。Broker 在實(shí)際部署過程中對應(yīng)一臺服務(wù)器,每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于不同的 Broker。Message Queue 用于存儲(chǔ)消息的物理地址,每個(gè)Topic中的消息地址存儲(chǔ)于多個(gè) Message Queue 中。ConsumerGroup 由多個(gè)Consumer 實(shí)例構(gòu)成。

2、生產(chǎn)者與消費(fèi)者(Producer & Consumer)

生產(chǎn)者負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)生產(chǎn)消息。一個(gè)消息生產(chǎn)者會(huì)把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。Rocket MQ 支持分布式集群方式部署,Producer 通過MQ的負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊(duì)列進(jìn)行消息投遞,投遞過程支持快速失敗并且低延遲。
消費(fèi)者負(fù)責(zé)消費(fèi)消息,一般是后臺系統(tǒng)負(fù)責(zé)異步消費(fèi)。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了pull 和 push 兩種消費(fèi)形式。RocketMQ 支持分布式集群方式部署,同時(shí)支持實(shí)時(shí)消息訂閱機(jī)制和集群廣播等消費(fèi)模式。

3、主題(Topic)

表示一類消息的集合。每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。

4、代理服務(wù)器與名稱服務(wù)(Broker Server & Name Server)

Broker Server 消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。代理服務(wù)器在RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來的消息并存儲(chǔ)、同時(shí)為消費(fèi)者的拉取請求作準(zhǔn)備。代理服務(wù)器也存儲(chǔ)消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等。
Name Server 充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒有信息交換。

5、拉取式與推送式消費(fèi)

拉取式:應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用控制。一旦獲取了批量消息,應(yīng)用就會(huì)啟動(dòng)消費(fèi)過程。
推送式:該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端,該消費(fèi)模式一般實(shí)時(shí)性較高。

6、生產(chǎn)者組與消費(fèi)者組

同一類Producer或Consumer的集合,發(fā)送或消費(fèi)同一類消息且發(fā)送或消費(fèi)邏輯一致。

7、集群消費(fèi)與廣播消費(fèi)

這是消費(fèi)者組的兩種消息模式。
集群消費(fèi)模式,相同Consumer Group的每個(gè)Consumer實(shí)例平均分?jǐn)傁ⅰ?br /> 廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收全量的消息。

8、消息(Message)

消息系統(tǒng)所傳輸信息的物理載體,每條消息必須屬于一個(gè)主題。每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。

9、標(biāo)簽(Tag)

為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。

10、順序消息(Ordered Message)

順序消息分為兩種類型。
普通順序消息,消費(fèi)者通過同一個(gè)消息隊(duì)列( Topic 分區(qū),稱作 Message Queue) 收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。
嚴(yán)格順序消息,消費(fèi)者收到的所有消息均是有順序的。

3.2 架構(gòu)設(shè)計(jì)

RocketMQ 架構(gòu)上主要分為四部分:生產(chǎn)者、消費(fèi)者、Name Server、Broker Server

  • NameServer:NameServer 是一個(gè)非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動(dòng)態(tài)注冊與發(fā)現(xiàn)。主要包括兩個(gè)功能:Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機(jī)制,檢查Broker是否還存活;路由信息管理,每個(gè)NameServer將保存關(guān)于Broker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。然后Producer和Conumser通過NameServer就可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。NameServer通常也是集群的方式部署,各實(shí)例間相互不進(jìn)行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個(gè)NameServer實(shí)例上面都保存一份完整的路由信息。當(dāng)某個(gè)NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動(dòng)態(tài)感知Broker的路由的信息。

  • BrokerServer:Broker主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證,為了實(shí)現(xiàn)這些功能,Broker包含了以下幾個(gè)重要子模塊。
    – Remoting Module:整個(gè)Broker的實(shí)體,負(fù)責(zé)處理來自clients端的請求。
    – Client Manager:負(fù)責(zé)管理客戶端(Producer/Consumer)和維護(hù)Consumer的Topic訂閱信息
    – Store Service:提供方便簡單的API接口處理消息存儲(chǔ)到物理硬盤和查詢功能。
    – HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。
    – Index Service:根據(jù)特定的Message key對投遞到Broker的消息進(jìn)行索引服務(wù),以提供消息的快速查詢。

3.3 部署架構(gòu)

  • NameServer是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無任何信息同步。
  • Broker分為Master與Slave,一個(gè)Master可以對應(yīng)多個(gè)Slave,但是一個(gè)Slave只能對應(yīng)一個(gè)Master,Master與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個(gè)。每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊Topic信息到所有NameServer。 注意:當(dāng)前RocketMQ版本在部署架構(gòu)上支持一Master多Slave,但只有BrokerId=1的從服務(wù)器才會(huì)參與消息的讀負(fù)載。
  • Producer與NameServer集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic 服務(wù)的Master建立長連接,且定時(shí)向Master發(fā)送心跳。Producer完全無狀態(tài),可集群部署。
  • Consumer與NameServer集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic服務(wù)的Master、Slave建立長連接,且定時(shí)向Master、Slave發(fā)送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費(fèi)者在向Master拉取消息時(shí),Master服務(wù)器會(huì)根據(jù)拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產(chǎn)生讀I/O),以及從服務(wù)器是否可讀等因素建議下一次是從Master還是Slave拉取。

3.4 集群工作流程

1、啟動(dòng)NameServer,NameServer起來后監(jiān)聽端口,等待Broker、Producer、Consumer連上來,相當(dāng)于一個(gè)路由控制中心。
2、Broker啟動(dòng),跟所有的NameServer保持長連接,定時(shí)發(fā)送心跳包。心跳包中包含當(dāng)前Broker信息(IP+端口等)以及存儲(chǔ)所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關(guān)系。
3、收發(fā)消息前,先創(chuàng)建Topic,創(chuàng)建Topic時(shí)需要指定該Topic要存儲(chǔ)在哪些Broker上,也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建Topic。
4、Producer發(fā)送消息,啟動(dòng)時(shí)先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上,輪詢從隊(duì)列列表中選擇一個(gè)隊(duì)列,然后與隊(duì)列所在的Broker建立長連接從而向Broker發(fā)消息。
5、Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費(fèi)消息。

四、RocketMQ 搭建

RocketMQ是阿里開源的分布式消息中間件,現(xiàn)在是Apache 的一個(gè)頂級項(xiàng)目,在阿里內(nèi)部使用非常廣泛,已經(jīng)經(jīng)過了“雙十一”萬億級場景下的消息流轉(zhuǎn)。
(待補(bǔ)充)

4.1 基礎(chǔ)環(huán)境搭建

準(zhǔn)備環(huán)境:Linux系統(tǒng) CentOS6 64位(虛擬機(jī)),IP:192.168.1.140,JDK8

1、下載并上傳 RocketMQ
打開官網(wǎng)下載頁面:Release Notes - Apache RocketMQ - Version 4.9.1 下載 Binary 版本 zip 包:

下載后上傳到 Linux /usr/local/src/ 目錄下:

2、解壓縮,并移動(dòng)到安裝目錄

unzip rocketmq-all-4.9.1-bin-release.zip mv rocketmq-all-4.9.1-bin-release /usr/local/rocketmq

3、啟動(dòng) RocketMQ
切換到 RocketMQ 安裝目錄,啟動(dòng) NameServer、BrokerServer,啟動(dòng)腳本在 bin 目錄下。& 代表后臺啟動(dòng)

nohup ./bin/mqnamesrv &

查看 rocketmq 啟動(dòng)日志,可以看到 The Name Server boot success 字眼,說明NameServer啟動(dòng)成功:

啟動(dòng) Broker 之前,需要修改幾項(xiàng)配置。

# 編輯 bin/runbroker.sh 和 bin/runserver.sh 文件,修改里面的堆大小,視情況而定。 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m

然后啟動(dòng) Broker

# -n 代表 NameServer 地址 nohup ./bin/mqbroker -n localhost:9876 &

查看 Broker 啟動(dòng)日志

4、測試RocketMQ
官方提供了兩個(gè)測試腳本用于驗(yàn)證 RocketMQ 的可用性。
開啟兩個(gè)終端,分別執(zhí)行以下命令:

Producer 發(fā)送消息:

export NAMESRV_ADDR=localhost:9876 ./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

Consumer 接收消息:

export NAMESRV_ADDR=localhost:9876 ./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

可以看到發(fā)送消息和接收消息都正常完成了:


5、關(guān)閉 RocketMQ

bin/mqshutdown broker bin/mqshutdown namesrv

4.2 控制臺安裝

1、下載
在 git 上下載工程

https://github.com/apache/rocketmq-externals/releases

2、修改配置文件
修改 rocketmq-console\src\main\resources\application.properties

server.port=7777 rocketmq.config.namesrvAddr=192.168.1.140:9876 # nameserver 地址,注意防火墻要開啟 9876 端口

3、打成 jar 包,并啟動(dòng)
進(jìn)入控制臺項(xiàng)目,將工程打成 jar 包

mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar

4、訪問控制臺
打開瀏覽器,輸入 http://localhost:7777 ,就可以看到如下界面:

五、Java 實(shí)現(xiàn) MQ 消息發(fā)送與接收

本節(jié)使用 main 方法實(shí)現(xiàn)簡單的 rocketmq 的消息發(fā)送和接收,在此之前,需要確認(rèn)好是否已經(jīng)完成前面的 RocketMQ 的環(huán)境部署,以及控制臺的安裝。

1、引入依賴
在需要使用 rocketmq 的項(xiàng)目中加入maven依賴

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version> </dependency>

2、編寫消息發(fā)送端

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class RocketMQSendTest {public static void main(String[] args) throws Exception {// 1. 創(chuàng)建消息生產(chǎn)者,并設(shè)置生產(chǎn)組名DefaultMQProducer producer = new DefaultMQProducer("shop-order");// 2. 為生產(chǎn)者設(shè)置 NameServer 地址producer.setNamesrvAddr("192.168.1.140:9876");// 3. 啟動(dòng)生產(chǎn)者producer.start();// 4. 構(gòu)建消息對象,主要是設(shè)置消息的主題、標(biāo)簽、內(nèi)容Message message = new Message("topic-order", "morty", "Test RocketMQ Message".getBytes());// 5. 發(fā)送消息,第二個(gè)參數(shù)代表超時(shí)時(shí)間SendResult result = producer.send(message, 10000);System.out.println("發(fā)送結(jié)果:" + result);// 6. 關(guān)閉生產(chǎn)者producer.shutdown();} }

代碼可直接運(yùn)行,由于 MQ是一種解耦組件,所以,可以直接向MQ 中發(fā)送消息而不需要等待消費(fèi)者。

3、編寫消息接收端
消息接收者基于訂閱監(jiān)聽機(jī)制,需要注冊相應(yīng)的監(jiān)聽器完成消息的消費(fèi):

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;public class RocketMQReceiveTest {public static void main(String[] args) throws Exception {// 1. 創(chuàng)建消息消費(fèi)者,指定消費(fèi)者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("shop-order-consumer");// 2. 指定 NameServerconsumer.setNamesrvAddr("192.168.1.140:9876");// 3. 指定消費(fèi)者訂閱的主題和標(biāo)簽consumer.subscribe("topic-order", "*");// 4. 設(shè)置回調(diào)函數(shù),并編寫消息消費(fèi)邏輯consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 消息消費(fèi)邏輯System.out.println("message ====> " + list);// 返回消費(fèi)成功狀態(tài)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5. 啟動(dòng)消費(fèi)者consumer.start();System.out.println("消費(fèi)者啟動(dòng)成功!");} }

啟動(dòng)消費(fèi)者代碼,觀察日志:

同時(shí),也可以看到 RocketMQ 控制臺有相關(guān)主題信息展示:

六、在 Spring 中使用 RocketMQ

以 shop-order、shop-user 兩個(gè)微服務(wù)為基礎(chǔ),實(shí)現(xiàn)一個(gè)下單的消息通知功能。

下單消息通知功能要求,下單后向用戶發(fā)送下單消息,結(jié)構(gòu)如下圖所示:

6.1 消息生產(chǎn)端

1、在 shop-order 中添加 rocketmq 依賴

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version> </dependency> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version> </dependency>

2、添加配置

rocketmq:name-server: 192.168.1.140:9876producer:group: shop-order

3、編寫發(fā)送消息代碼
在下單成功后,使用 rocketmq 的接口實(shí)現(xiàn)消息的發(fā)送。

@Slf4j @RestController @RequestMapping("/order") public class OrderController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/prod/{pid}")public Order order(@PathVariable("pid") Integer pid) {log.info("接收到{}號商品的下單請求,準(zhǔn)備調(diào)用商品微服務(wù)", pid);// 調(diào)用商品微服務(wù),查詢商品信息(略)// 下單(即創(chuàng)建訂單并保存)(略)// 訂單入庫orderService.createOrder(order);log.info("創(chuàng)建訂單成功,訂單信息為:{}", JSON.toJSONString(order));// 使用 rocketMQTemplate 發(fā)送下單成功消息rocketMQTemplate.convertAndSend("order-topic", order);return order;} }

4、測試
訪問下單接口,觀察 RocketMQ 控制臺。

6.2 消息消費(fèi)端

1、添加必要的依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version> </dependency>

2、配置 rocketmq NameServer 地址

rocketmq:name-server: 192.168.1.140:9876

3、編寫 MQ 監(jiān)聽器

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener;@Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class OrderListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("接收到了下單成功的消息,{}", order);} }

4、測試消息接收
啟動(dòng) shop-user 模塊,觀察日志,可以看到應(yīng)用一啟動(dòng)成功,就收到了來自 MQ 的訂閱消息:

總結(jié)

以上是生活随笔為你收集整理的Spring Cloud —— 消息队列与 RocketMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。