日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息

發(fā)布時(shí)間:2025/3/20 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

作者 | 遼天
來(lái)源 | 阿里巴巴云原生公眾號(hào)

導(dǎo)讀:本文將 rocktmq-spring-boot 的設(shè)計(jì)實(shí)現(xiàn)做一個(gè)簡(jiǎn)單的介紹,讀者可以通過(guò)本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細(xì)節(jié),然后通過(guò)一個(gè)簡(jiǎn)單的示例來(lái)一步一步的講解如何使用這個(gè) spring-boot-starter 工具包來(lái)配置,發(fā)送和消費(fèi) RocketMQ 消息。

在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ 系列文章:

  • 《如何在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ?》
  • 《羅美琪和春波特的故事…》
  • 《RocketMQ-Spring 畢業(yè)兩周年,為什么能成為 Spring 生態(tài)中最受歡迎的 messaging 實(shí)現(xiàn)?》

本文配套可交互教程已登錄阿里云知行動(dòng)手實(shí)驗(yàn)室,PC 端登錄 start.aliyun.com 在瀏覽器中立即體驗(yàn)。

通過(guò)本文,您將了解到:

  • Spring 的消息框架介紹
  • rocketmq-spring-boot 具體實(shí)現(xiàn)
  • 使用示例

前言

上世紀(jì) 90 年代末,隨著 Java EE(Enterprise Edition) 的出現(xiàn),特別是 Enterprise Java Beans 的使用需要復(fù)雜的描述符配置和死板復(fù)雜的代碼實(shí)現(xiàn),增加了廣大開發(fā)者的學(xué)習(xí)曲線和開發(fā)成本,由此基于簡(jiǎn)單的 XML 配置和普通 Java 對(duì)象(Plain Old Java Objects)的 Spring 技術(shù)應(yīng)運(yùn)而生,依賴注入(Dependency Injection), 控制反轉(zhuǎn)(Inversion of Control)和面向切面編程(AOP)的技術(shù)更加敏捷地解決了傳統(tǒng) Java 企業(yè)及版本的不足。

隨著 Spring 的持續(xù)演進(jìn),基于注解(Annotation)的配置逐漸取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式發(fā)布,它基于“約定大于配置”(Convention over configuration)這一理念來(lái)快速地開發(fā)、測(cè)試、運(yùn)行和部署 Spring 應(yīng)用,并能通過(guò)簡(jiǎn)單地與各種啟動(dòng)器(如 spring-boot-web-starter)結(jié)合,讓應(yīng)用直接以命令行的方式運(yùn)行,不需再部署到獨(dú)立容器中。這種簡(jiǎn)便直接快速構(gòu)建和開發(fā)應(yīng)用的過(guò)程,可以使用約定的配置并且簡(jiǎn)化部署,受到越來(lái)越多的開發(fā)者的歡迎。

Apache RocketMQ 是業(yè)界知名的分布式消息和流處理中間件,簡(jiǎn)單地理解,它由 Broker 服務(wù)器和客戶端兩部分組成:

其中客戶端一個(gè)是消息發(fā)布者客戶端(Producer),它負(fù)責(zé)向 Broker 服務(wù)器發(fā)送消息;另外一個(gè)是消息的消費(fèi)者客戶端(Consumer),多個(gè)消費(fèi)者可以組成一個(gè)消費(fèi)組,來(lái)訂閱和拉取消費(fèi) Broker 服務(wù)器上存儲(chǔ)的消息。

為了利用 Spring Boot 的快速開發(fā)和讓用戶能夠更靈活地使用 RocketMQ 消息客戶端,Apache RocketMQ 社區(qū)推出了 spring-boot-starter 實(shí)現(xiàn)。隨著分布式事務(wù)消息功能在 RocketMQ 4.3.0 版本的發(fā)布,近期升級(jí)了相關(guān)的 spring-boot 代碼,通過(guò)注解方式支持分布式事務(wù)的回查和事務(wù)消息的發(fā)送。

本文將對(duì)當(dāng)前的設(shè)計(jì)實(shí)現(xiàn)做一個(gè)簡(jiǎn)單的介紹,讀者可以通過(guò)本文了解將 RocketMQ Client 端集成為 spring-boot-starter 框架的開發(fā)細(xì)節(jié),然后通過(guò)一個(gè)簡(jiǎn)單的示例來(lái)一步一步的講解如何使用這個(gè) spring-boot-starter 工具包來(lái)配置,發(fā)送和消費(fèi) RocketMQ 消息。

Spring 中的消息框架

順便在這里討論一下在 Spring 中關(guān)于消息的兩個(gè)主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它們都能夠與 Spring Boot 整合并提供了一些參考的實(shí)現(xiàn)。和所有的實(shí)現(xiàn)框架一樣,消息框架的目的是實(shí)現(xiàn)輕量級(jí)的消息驅(qū)動(dòng)的微服務(wù),可以有效地簡(jiǎn)化開發(fā)人員對(duì)消息中間件的使用復(fù)雜度,讓系統(tǒng)開發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。

1. Spring Messaging

Spring Messaging 是 Spring Framework 4 中添加的模塊,是 Spring 與消息系統(tǒng)集成的一個(gè)擴(kuò)展性的支持。它實(shí)現(xiàn)了從基于 JmsTemplate 的簡(jiǎn)單的使用 JMS 接口到異步接收消息的一整套完整的基礎(chǔ)架構(gòu),Spring AMQP 提供了該協(xié)議所要求的類似的功能集。在與 Spring Boot 的集成后,它擁有了自動(dòng)配置能力,能夠在測(cè)試和運(yùn)行時(shí)與相應(yīng)的消息傳遞系統(tǒng)進(jìn)行集成。

單純對(duì)于客戶端而言,Spring Messaging 提供了一套抽象的 API 或者說(shuō)是約定的標(biāo)準(zhǔn),對(duì)消息發(fā)送端和消息接收端的模式進(jìn)行規(guī)定,不同的消息中間件提供商可以在這個(gè)模式下提供自己的 Spring 實(shí)現(xiàn):在消息發(fā)送端需要實(shí)現(xiàn)的是一個(gè) XXXTemplate 形式的 Java Bean,結(jié)合 Spring Boot 的自動(dòng)化配置選項(xiàng)提供多個(gè)不同的發(fā)送消息方法;在消息的消費(fèi)端是一個(gè) XXXMessageListener 接口(實(shí)現(xiàn)方式通常會(huì)使用一個(gè)注解來(lái)聲明一個(gè)消息驅(qū)動(dòng)的 POJO),提供回調(diào)方法來(lái)監(jiān)聽和消費(fèi)消息,這個(gè)接口同樣可以使用 Spring Boot 的自動(dòng)化選項(xiàng)和一些定制化的屬性。

如果有興趣深入的了解 Spring Messaging 及針對(duì)不同的消息產(chǎn)品的使用,推薦閱讀這個(gè)文件。參考 Spring Messaging 的既有實(shí)現(xiàn),RocketMQ 的 spring-boot-starter 中遵循了相關(guān)的設(shè)計(jì)模式并結(jié)合 RocketMQ 自身的功能特點(diǎn)提供了相應(yīng)的 API(如順序、異步和事務(wù)半消息等)。

2. Spring Cloud Stream

Spring Cloud Stream 結(jié)合了 Spring Integration 的注解和功能,它的應(yīng)用模型如下:


該圖片引自 spring cloud stream

Spring Cloud Stream 框架中提供一個(gè)獨(dú)立的應(yīng)用內(nèi)核,它通過(guò)輸入(@Input)和輸出(@Output)通道與外部世界進(jìn)行通信,消息源端(Source)通過(guò)輸入通道發(fā)送消息,消費(fèi)目標(biāo)端(Sink)通過(guò)監(jiān)聽輸出通道來(lái)獲取消費(fèi)的消息。這些通道通過(guò)專用的 Binder 實(shí)現(xiàn)與外部代理連接。開發(fā)人員的代碼只需要針對(duì)應(yīng)用內(nèi)核提供的固定的接口和注解方式進(jìn)行編程,而不需要關(guān)心運(yùn)行時(shí)具體的 Binder 綁定的消息中間件。在運(yùn)行時(shí),Spring Cloud Stream 能夠自動(dòng)探測(cè)并使用在 classpath 下找到的Binder。

這樣開發(fā)人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在構(gòu)建時(shí)包含進(jìn)不同的 Binder。在更加復(fù)雜的使用場(chǎng)景中,也可以在應(yīng)用中打包多個(gè) Binder 并讓它自己選擇 Binder,甚至在運(yùn)行時(shí)為不同的通道使用不同的 Binder。

Binder 抽象使得 Spring Cloud Stream 應(yīng)用可以靈活的連接到中間件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的靈活配置配置能力,這樣的配置可以通過(guò)外部配置的屬性和 Spring Boot 支持的任何形式來(lái)提供(包括應(yīng)用啟動(dòng)參數(shù)、環(huán)境變量和 application.yml 或者 application.properties 文件),部署人員可以在運(yùn)行時(shí)動(dòng)態(tài)選擇通道連接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。

Binder SPI 的方式來(lái)讓消息中間件產(chǎn)品使用可擴(kuò)展的 API 來(lái)編寫相應(yīng)的 Binder,并集成到 Spring Cloud Steam 環(huán)境,目前 RocketMQ 還沒(méi)有提供相關(guān)的 Binder,我們計(jì)劃在下一步將完善這一功能,也希望社區(qū)里有這方面經(jīng)驗(yàn)的同學(xué)積極嘗試,貢獻(xiàn) PR 或建議。

spring-boot-starter的實(shí)現(xiàn)

在開始的時(shí)候我們已經(jīng)知道,spring boot starter 構(gòu)造的啟動(dòng)器對(duì)于使用者是非常方便的,使用者只要在 pom.xml引入starter 的依賴定義,相應(yīng)的編譯,運(yùn)行和部署功能就全部自動(dòng)引入。因此常用的開源組件都會(huì)為 Spring 的用戶提供一個(gè) spring-boot-starter 封裝給開發(fā)者,讓開發(fā)者非常方便集成和使用,這里我們?cè)敿?xì)的介紹一下 RocketMQ(客戶端)的 starter 實(shí)現(xiàn)過(guò)程。

1. spring-boot-starter 的實(shí)現(xiàn)步驟

對(duì)于一個(gè) spring-boot-starter 實(shí)現(xiàn)需要包含如下幾個(gè)部分:

1)在 pom.xml 的定義

  • 定義最終要生成的 starter 組件信息
<groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>1.0.0-SNAPSHOT</version>
  • 定義依賴包

它分為兩個(gè)部分:Spring 自身的依賴包和 RocketMQ 的依賴包。

2)配置文件類

定義應(yīng)用屬性配置文件類 RocketMQProperties,這個(gè) Bean 定義一組默認(rèn)的屬性值。用戶在使用最終的 starter 時(shí),可以根據(jù)這個(gè)類定義的屬性來(lái)修改取值,當(dāng)然不是直接修改這個(gè)類的配置,而是 spring-boot 應(yīng)用中對(duì)應(yīng)的配置文件:src/main/resources/application.properties。

3)定義自動(dòng)加載類

定義 src/resources/META-INF/spring.factories 文件中的自動(dòng)加載類, 其目的是讓 spring boot 更具文中中所指定的自動(dòng)化配置類來(lái)自動(dòng)初始化相關(guān)的 Bean、Component 或 Service,它的內(nèi)容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration

在 RocketMQAutoConfiguration 類的具體實(shí)現(xiàn)中,定義開放給用戶直接使用的 Bean 對(duì)象包括:

  • RocketMQProperties 加載應(yīng)用屬性配置文件的處理類;
  • RocketMQTemplate 發(fā)送端用戶發(fā)送消息的發(fā)送模板類;
  • ListenerContainerConfiguration 容器 Bean 負(fù)責(zé)發(fā)現(xiàn)和注冊(cè)消費(fèi)端消費(fèi)實(shí)現(xiàn)接口類,這個(gè)類要求:由 @RocketMQMessageListener 注解標(biāo)注;實(shí)現(xiàn) RocketMQListener 泛化接口。

4)最后具體地進(jìn)行?RpcketMQ 相關(guān)的封裝

在發(fā)送端(producer)和消費(fèi)端(consumer)客戶端分別進(jìn)行封裝,在當(dāng)前的實(shí)現(xiàn)版本提供了對(duì) Spring Messaging 接口的兼容方式。

2. 消息發(fā)送端實(shí)現(xiàn)

1)普通發(fā)送端

發(fā)送端的代碼封裝在 RocketMQTemplate POJO 中,下圖是發(fā)送端的相關(guān)代碼的調(diào)用關(guān)系圖:

為了與 Spring Messaging 的發(fā)送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象類,來(lái)支持相關(guān)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會(huì)代理給 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如異步,單向和順序等方法直接添加到 RoketMQTempalte 中,這些方法直接代理調(diào)用到 RocketMQ 的 Producer API 來(lái)進(jìn)行消息的發(fā)送。

2)事務(wù)消息發(fā)送端

對(duì)于事務(wù)消息的處理,在消息發(fā)送端進(jìn)行了部分的擴(kuò)展,參考上面的調(diào)用關(guān)系類圖。

RocketMQTemplate 里加入了一個(gè)發(fā)送事務(wù)消息的方法 sendMessageInTransaction(),并且最終這個(gè)方法會(huì)代理到 RocketMQ 的 TransactionProducer 進(jìn)行調(diào)用,在這個(gè) Producer 上會(huì)注冊(cè)其關(guān)聯(lián)的 TransactionListener 實(shí)現(xiàn)類,以便在發(fā)送消息后能夠?qū)?TransactionListener 里的方法實(shí)現(xiàn)進(jìn)行調(diào)用。

3. 消息消費(fèi)端實(shí)現(xiàn)

在消費(fèi)端 Spring-Boot 應(yīng)用啟動(dòng)后,會(huì)掃描所有包含 @RocketMQMessageListener 注解的類(這些類需要集成 RocketMQListener 接口,并實(shí)現(xiàn) onMessage()方法),這個(gè) Listener 會(huì)一對(duì)一的被放置到。

DefaultRocketMQListenerContainer 容器對(duì)象中,容器對(duì)象會(huì)根據(jù)消費(fèi)的方式(并發(fā)或順序),將 RocketMQListener 封裝到具體的 RocketMQ 內(nèi)部的并發(fā)或者順序接口實(shí)現(xiàn)。在容器中創(chuàng)建 RocketMQ Consumer 對(duì)象,啟動(dòng)并監(jiān)聽定制的 Topic 消息,如果有消費(fèi)消息,則回調(diào)到 Listener 的 onMessage() 方法。

使用示例

上面的一章介紹了 RocketMQ 在 spring-boot-starter 方式的實(shí)現(xiàn),這里通過(guò)一個(gè)最簡(jiǎn)單的消息發(fā)送和消費(fèi)的例子來(lái)介紹如何使這個(gè) rocketmq-spring-boot-starter。

1. RocketMQ 服務(wù)端的準(zhǔn)備

1)啟動(dòng) NameServer 和 Broker

要驗(yàn)證 RocketMQ 的 Spring-Boot 客戶端,首先要確保 RocketMQ 服務(wù)正確的下載并啟動(dòng)。可以參考 RocketMQ 主站的快速開始來(lái)進(jìn)行操作。確保啟動(dòng) NameServer 和 Broker 已經(jīng)正確啟動(dòng)。

2)創(chuàng)建實(shí)例中所需要的 Topics

在執(zhí)行啟動(dòng)命令的目錄下執(zhí)行下面的命令行操作:

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

2. 編譯 rocketmq-spring-boot-starter

目前的 spring-boot-starter 依賴還沒(méi)有提交的 Maven 的中心庫(kù),用戶使用前需要自行下載 git 源碼,然后執(zhí)行 mvn clean install 安裝到本地倉(cāng)庫(kù)。

git clone https://github.com/apache/rocketmq-externals.git cd rocketmq-spring-boot-starter mvn clean install

3. 編寫客戶端代碼

用戶如果使用它,需要在消息的發(fā)布和消費(fèi)客戶端的 maven 配置文件 pom.xml 中添加如下的依賴:

屬性 spring-boot-starter-rocketmq-version 的取值為:1.0.0-SNAPSHOT, 這與上一步驟中執(zhí)行安裝到本地倉(cāng)庫(kù)的版本一致。

1)消息發(fā)送端的代碼

發(fā)送端的配置文件 application.properties:

發(fā)送端的 Java 代碼:

2)消息消費(fèi)端代碼

消費(fèi)端的配置文件 application.properties:

消費(fèi)端的 Java 代碼:

這里只是簡(jiǎn)單的介紹了使用 spring-boot 來(lái)編寫最基本的消息發(fā)送和接收的代碼,如果需要了解更多的調(diào)用方式,如: 異步發(fā)送,對(duì)象消息體,指定 tag 標(biāo)簽以及指定事務(wù)消息,請(qǐng)參看 github 的說(shuō)明文檔和詳細(xì)的代碼。我們后續(xù)還會(huì)對(duì)這些高級(jí)功能進(jìn)行陸續(xù)的介紹。

作者簡(jiǎn)介

遼天,阿里巴巴技術(shù)專家,Apache RocketMQ 內(nèi)核控,擁有多年分布式系統(tǒng)研發(fā)經(jīng)驗(yàn),對(duì) Microservice、Messaging 和 Storage 等領(lǐng)域有深刻理解, 目前專注 RocketMQ 內(nèi)核優(yōu)化以及 Messaging 生態(tài)建設(shè)。

在 PC 端登錄 start.aliyun.com 知行動(dòng)手實(shí)驗(yàn)室,沉浸式體驗(yàn)在線交互教程。

總結(jié)

以上是生活随笔為你收集整理的使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。