日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Alibaba RocketMQ 快速入门

發布時間:2023/12/14 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Alibaba RocketMQ 快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spring Cloud Alibaba 消息隊列 RocketMQ 入門_weixin_42073629的博客-CSDN博客_spring cloud alibaba rocketmq1. 概述本文我們來學習Spring Cloud Alibaba提供的Spring Cloud Stream RocketMQ組件,基于Spring Cloud Stream的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。RocketMQ是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用...https://blog.csdn.net/weixin_42073629/article/details/106535675

?

1. 概述

本文我們來學習?Spring Cloud Alibaba?提供的?Spring Cloud Stream RocketMQ?組件,基于?Spring Cloud Stream?的編程模型,接入 RocketMQ 作為消息中間件,實現消息驅動的微服務。

RocketMQ?是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手游、視頻、物聯網、車聯網等。

具有以下特點:

  • 能夠保證嚴格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴展能力
  • 實時的消息訂閱機制
  • 億級消息堆積能力

在開始本文之前,胖友需要對 RocketMQ 進行簡單的學習。可以閱讀《RocketMQ 極簡入門》文章,將第一二小節看完,在本機搭建一個 RocketMQ 服務。

2. Spring Cloud Stream 介紹

Spring Cloud Stream?是一個用于構建基于消息的微服務應用框架,使用?Spring Integration?與 Broker 進行連接。

友情提示:可能有胖友對 Broker 不太了解,我們來簡單解釋下。

一般來說,消息隊列中間件都有一個?Broker Server(代理服務器),消息中轉角色,負責存儲消息、轉發消息。

例如說在 RocketMQ 中,Broker 負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。另外,Broker 也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

Spring Cloud Stream 提供了消息中間件的統一抽象,推出了 publish-subscribe、consumer groups、partition 這些統一的概念。

Spring Cloud Stream 內部有兩個概念:Binder?和?Binding

①?Binder,跟消息中間件集成的組件,用來創建對應的 Binding。各消息中間件都有自己的 Binder 具體實現。

public interface Binder<T, C extends ConsumerProperties, // 消費者配置P extends ProducerProperties> { // 生產者配置// 創建消費者的 BindingBinding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);// 創建生產者的 BindingBinding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);}
  • Kafka 實現了?KafkaMessageChannelBinder
  • RabbitMQ 實現了?RabbitMessageChannelBinder
  • RocketMQ 實現了?RocketMQMessageChannelBinder

②?Binding,包括 Input Binding 和 Output Binding。Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據即可,屏蔽了開發者與底層消息中間件的接觸。

最終整體交互如下圖所示:

可能看完之后,胖友對 Spring Cloud Stream 還是有點懵逼,并且覺得概念怎么這么多呢?不要慌,我們先來快速入個門,會有更加具象的感受。

3. 快速入門

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

本小節,我們一起來快速入門下,會創建 2 個項目,分別作為生產者和消費者。最終項目如下圖所示:

3.1 搭建生產者

創建?labx-06-sca-stream-rocketmq-producer-demo?項目,作為生產者。

3.1.1 引入依賴

創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>labx-06</artifactId><groupId>cn.iocoder.springboot.labs</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>labx-06-sca-stream-rocketmq-producer-demo</artifactId><properties><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source>1.8</maven.compiler.source><spring.boot.version>2.2.4.RELEASE</spring.boot.version><spring.cloud.version>Hoxton.SR1</spring.cloud.version><spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version></properties><!--引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,進行依賴版本的管理,防止不兼容。在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 開發團隊推薦了三者的依賴關系--><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring.cloud.alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- 引入 SpringMVC 相關依賴,并實現對其的自動配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相關依賴,將 RocketMQ 作為消息隊列,并實現對其的自動配置 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency></dependencies></project>

通過引入?spring-cloud-starter-stream-rocketmq?依賴,引入并實現 RocketMQ 的自動配置。在該依賴中,已經幫我們自動引入 RocketMQ 的大量依賴,非常方便,如下圖所示:

3.1.2 配置文件

創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

①?spring.cloud.stream?為 Spring Cloud Stream 配置項,對應?BindingServiceProperties?類。配置的層級有點深,我們一層一層來看看。

②?spring.cloud.stream.bindings?為 Binding 配置項,對應?BindingProperties?Map。其中,key?為 Binding 的名字。要注意,雖然說 Binding 分成 Input 和 Output 兩種類型,但是在配置項中并不會體現出來,而是要在稍后搭配?@Input?還是?@Output?注解,才會有具體的區分。

這里,我們配置了一個名字為?demo01-output?的 Binding。從命名上,我們的意圖是想作為 Output Binding,用于生產者發送消息。

  • destination:目的地。在 RocketMQ 中,使用 Topic 作為目的地。這里我們設置為?DEMO-TOPIC-01。

    主題(Topic):表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是 RocketMQ 進行消息訂閱的基本單位。

  • content-type:內容格式。這里使用 JSON 格式,因為稍后我們將發送消息的類型為 POJO,使用 JSON 進行序列化。

③?spring.cloud.stream.rocketmq?為 Spring Cloud Stream RocketMQ 配置項。

④?spring.cloud.stream.rocketmq.binder?為 RocketMQ Binder 配置項,對應?RocketMQBinderConfigurationProperties?類。

  • name-server:RocketMQ Namesrv 地址。

    名字服務(Name Server):名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP 列表。多個 Namesrv 實例組成集群,但相互獨立,沒有信息交換。

⑤?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項,用于對通用的?spring.cloud.stream.bindings?配置項的增強,實現 RocketMQ Binding 獨特的配置。該配置項對應?RocketMQBindingProperties?Map,其中?key?為 Binding 的名字,需要對應上噢。

這里,我們對名字為?demo01-output?的 Binding 進行增強,進行 Producer 的配置。其中,producer?為 RocketMQ Producer 配置項,對應?RocketMQProducerProperties?類。

  • group:生產者分組。

    生產者組(Producer Group):同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則 Broker 服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。

  • sync:是否同步發送消息,默認為?false?異步。一般業務場景下,使用同步發送消息較多,所以這里我們設置為?true?同步消息。

    使用 RocketMQ 發送三種類型的消息:同步消息(sync)、異步消息(async)和單向消息(oneway)。其中前兩種消息是可靠的,因為會有發送是否成功的應答。

3.1.3 MySource

創建?MySource?接口,聲明名字為 Output Binding。代碼如下:

public interface MySource {@Output("demo01-output")MessageChannel demo01Output();}

這里,我們通過?@Output?注解,聲明了一個名字為?demo01-output?的 Output Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。

同時,@Output?注解的方法的返回結果為?MessageChannel?類型,可以使用它發送消息。MessageChannel 提供的發送消息的方法如下:

@FunctionalInterface public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}boolean send(Message<?> message, long timeout);}

那么,我們是否要實現 MySource 接口呢?答案是不需要,全部交給 Spring Cloud Stream 的?BindableProxyFactory?來解決。BindableProxyFactory 會通過動態代理,自動實現 MySource 接口。 而?@Output?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Output?注解的方法,自動進行創建。

例如說,#demo01Output()?方法被自動創建返回結果為?DirectWithAttributesChannel,它是 MessageChannel 的子類。

友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。

3.1.4 Demo01Message

創建?Demo01Message?類,示例 Message 消息。代碼如下:

public class Demo01Message {/*** 編號*/private Integer id;// ... 省略 setter/getter/toString 方法}

3.1.5 Demo01Controller

創建?Demo01Controller?類,提供發送消息的 HTTP 接口。代碼如下:

@RestController @RequestMapping("/demo01") public class Demo01Controller {@Autowiredprivate MySource mySource; // <X>@GetMapping("/send")public boolean send() {// <1> 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// <2> 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// <3> 發送消息return mySource.demo01Output().send(springMessage);}}
  • <X>?處,使用?@Autowired?注解,注入 MySource Bean。
  • <1>?處,創建 Demo01Message 對象。
  • <2>?處,使用?MessageBuilder?創建 Spring?Message?對象,并設置消息內容為 Demo01Message 對象。
  • <3>?處,通過 MySource 獲得 MessageChannel 對象,然后發送消息。

3.1.6 ProducerApplication

創建?ProducerApplication?類,啟動應用。代碼如下:

@SpringBootApplication @EnableBinding(MySource.class) public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}}

使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySource 接口。

3.2 搭建消費者

創建?labx-06-sca-stream-rocketmq-consumer-demo?項目,作為消費者。

3.2.1 引入依賴

創建?pom.xml?文件中,引入 Spring Cloud Alibaba RocketMQ 相關依賴。

友情提示:和「3.1.1 引入依賴」基本一樣,點擊?鏈接?查看。

3.2.2 配置文件

創建?application.yaml?配置文件,添加 Spring Cloud Alibaba RocketMQ 相關配置。

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

總體來說,和「3.1.2 配置文件」是比較接近的,所以我們只說差異點噢。

①?spring.cloud.stream.bindings?為 Binding 配置項。

這里,我們配置了一個名字為?demo01-input?的 Binding。從命名上,我們的意圖是想作為 Input Binding,用于消費者消費消息。

  • group:消費者分組。

    消費者組(Consumer Group):同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

②?spring.cloud.stream.rocketmq.bindings?為 RocketMQ?自定義?Binding 配置項。

這里,我們對名字為?demo01-input?的 Binding 進行增強,進行 Consumer 的配置。其中,consumer?為 RocketMQ Producer 配置項,對應?RocketMQConsumerProperties?類。

  • enabled:是否開啟消費,默認為?true。在日常開發時,如果在本地環境不想消費,可以通過設置?enabled?為?false?進行關閉。

  • broadcasting: 是否使用廣播消費,默認為?false?使用集群消費。

    • 集群消費(Clustering):集群消費模式下,相同 Consumer Group 的每個 Consumer 實例平均分攤消息。
    • 廣播消費(Broadcasting):廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。

這里一點要注意!!!艿艿加了三個感嘆號,一定要理解集群消費和廣播消費的差異。我們來舉個例子,以有兩個消費者分組 A 和 B 的場景舉例子:

  • 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被兩個消費者分組?"consumer_group_01"?和?"consumer_group_02"?都各自消費一次。
  • 假設每個消費者分組各啟動一個實例,此時我們發送一條消息,該消息會被分組 A 的某個實例消費一次,被分組 B 的某個實例也消費一次

通過集群消費的機制,我們可以實現針對相同 Topic ,不同消費者分組實現各自的業務邏輯。例如說:用戶注冊成功時,發送一條 Topic 為?"USER_REGISTER"?的消息。然后,不同模塊使用不同的消費者分組,訂閱該 Topic ,實現各自的拓展邏輯:

  • 積分模塊:判斷如果是手機注冊,給用戶增加 20 積分。
  • 優惠劵模塊:因為是新用戶,所以發放新用戶專享優惠劵。
  • 站內信模塊:因為是新用戶,所以發送新用戶的歡迎語的站內信。
  • ... 等等

這樣,我們就可以將注冊成功后的業務拓展邏輯,實現業務上的解耦,未來也更加容易拓展。同時,也提高了注冊接口的性能,避免用戶需要等待業務拓展邏輯執行完成后,才響應注冊成功。

同時,相同消費者分組的多個實例,可以實現高可用,保證在一個實例意外掛掉的情況下,其它實例能夠頂上。并且,多個實例都進行消費,能夠提升消費速度

友情提示:如果還不理解的話,沒有關系,我們下面會演示下我們上面舉的例子。

3.2.3 MySink

創建?MySink?接口,聲明名字為 Input Binding。代碼如下:

public interface MySink {String DEMO01_INPUT = "demo01-input";@Input(DEMO01_INPUT)SubscribableChannel demo01Input();}

這里,我們通過?@Input?注解,聲明了一個名字為?demo01-input?的 Input Binding。注意,這個名字要和我們配置文件中的?spring.cloud.stream.bindings?配置項對應上。

同時,@Input?注解的方法的返回結果為?SubscribableChannel?類型,可以使用它訂閱消息來消費。MessageChannel 提供的訂閱消息的方法如下:

public interface SubscribableChannel extends MessageChannel {boolean subscribe(MessageHandler handler); // 訂閱boolean unsubscribe(MessageHandler handler); // 取消訂閱}

那么,我們是否要實現 MySink 接口呢?答案也是不需要,還是全部交給 Spring Cloud Stream 的?BindableProxyFactory?大兄弟來解決。BindableProxyFactory 會通過動態代理,自動實現 MySink 接口。 而?@Input?注解的方法的返回值,BindableProxyFactory 會掃描帶有?@Input?注解的方法,自動進行創建。

例如說,#demo01Input()?方法被自動創建返回結果為?DirectWithAttributesChannel,它也是 SubscribableChannel 的子類。

友情提示:感興趣的胖友,可以在 BindableProxyFactory 的?#afterPropertiesSet()?和?#invoke(MethodInvocation invocation)?方法上,都打上一個斷點,然后進行愉快的調試。

3.2.4 Demo01Message

創建?Demo01Message?類,示例 Message 消息。

友情提示:和「3.1.4 Demo01Message」基本一樣,點擊?鏈接?查看。

3.2.5 Demo01Consumer

創建?Demo01Consumer?類,消費消息。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

在方法上,添加?@StreamListener?注解,聲明對應的?Input?Binding。這里,我們使用?MySink.DEMO01_INPUT。

又因為我們消費的消息是 POJO 類型,所以我們需要添加?@Payload?注解,聲明需要進行反序列化成 POJO 對象。

3.2.6 ConsumerApplication

創建?ConsumerApplication?類,啟動應用。代碼如下:

@SpringBootApplication @EnableBinding(MySink.class) public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}

使用?@EnableBinding?注解,聲明指定接口開啟 Binding 功能,掃描其?@Input?和?@Output?注解。這里,我們設置為 MySink 接口。

3.3 測試單集群多實例的場景

本小節,我們會在一個消費者集群啟動兩個實例,測試在集群消費的情況下的表現。

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。此時在 IDEA 控制臺看到 RocketMQ 相關的日志如下:

2020-02-22 09:32:54.462 INFO 50472 --- [ main] s.b.r.c.RocketMQListenerBindingContainer : running container: RocketMQListenerBindingContainer{consumerGroup='demo01-consumer-group-DEMO-TOPIC-01', nameServer='[127.0.0.1:9876]', topic='DEMO-TOPIC-01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='null', messageModel=CLUSTERING} 2020-02-22 09:32:54.462 INFO 50472 --- [ main] .c.s.b.r.i.RocketMQInboundChannelAdapter : started com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter@1cd3b138

友情提示:因為 IDEA 默認同一個程序只允許啟動 1 次,所以我們需要配置 DemoProviderApplication 為?Allow parallel run。如下圖所示:

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// ConsumerApplication 控制臺 01 2020-02-22 09:39:29.073 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1682643477}] 2020-02-22 09:41:32.754 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=1890257867}]// ConsumerApplication 控制臺 02 2020-02-22 09:41:32.264 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=1401668556}]

符合預期。從日志可以看出,每條消息僅被消費一次。

3.4 測試多集群多實例的場景

本小節,我們會在二個消費者集群啟動兩個實例,測試在集群消費的情況下的表現。

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

② 修改?labx-06-sca-stream-rocketmq-consumer-demo?項目的配置文件,修改?spring.cloud.stream.bindings.demo01-input.group?配置項,將消費者分組改成?X-demo01-consumer-group-DEMO-TOPIC-01。

然后,執行?ConsumerApplication 兩次,再啟動兩個消費者的實例,從而實現在消費者分組?X-demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

③ 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.886 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.237 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.710 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:80 消息內容:Demo01Message{id=412281482}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 01 2020-02-22 10:17:07.887 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-276398167}] 2020-02-22 10:17:08.238 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:51 消息內容:Demo01Message{id=-250975158}]// 消費者分組 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制臺 02 2020-02-22 10:17:08.787 INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:77 消息內容:Demo01Message{id=412281482}]

符合預期。從日志可以看出,每條消息被每個消費者集群都進行了消費,且僅被消費一次。

3.5 小結

至此,我們已經完成了 Stream RocketMQ 的快速入門,是不是還是蠻簡答的噢。現在胖友可以在回過頭看看 Binder 和 Binding 的概念,是不是就清晰一些了。

4. 定時消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

在 RocketMQ 中,提供定時消息的功能。

定時消息,是指消息發到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點或者等待特定的時間后才能被消費。

不過,RocketMQ 暫時不支持任意的時間精度的延遲,而是固化了 18 個延遲級別。如下表格:

延遲級別時間延遲級別時間延遲級別時間
11s73m139m
25s84m1410m
310s95m1520m
430s106m1630m
51m117m171h
62m128m182h

如果胖友想要任一時刻的定時消息,可以考慮借助 MySQL + Job 來實現。又或者考慮使用?DDMQ(滴滴打車基于 RocketMQ 和 Kafka 改造的開源消息隊列)。

下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-demo?發送定時消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。

4.1 Demo01Controller

修改?Demo01Controller?類,增發送定時消息的 HTTP 接口。代碼如下:

private Logger logger = LoggerFactory.getLogger(getClass());@GetMapping("/send_delay") public boolean sendDelay() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <X> 設置延遲級別為 3,10 秒后消費。.build();// 發送消息boolean sendResult = mySource.demo01Output().send(springMessage);logger.info("[sendDelay][發送消息完成, 結果 = {}]", sendResult);return sendResult; }

在?<X>?處,通過添加頭?MessageConst.PROPERTY_DELAY_TIME_LEVEL,設置消息的延遲級別,從而發送定時消息。

4.2 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_delay?接口,發送延遲 10 秒的定時消息。IDEA 控制臺輸出日志如下:

// Producer 的控制臺 2020-02-22 16:32:35.836 INFO 57143 --- [io-18080-exec-5] c.i.s.l.r.p.controller.Demo01Controller : [sendDelay][發送消息完成, 結果 = true]// Consumer 的控制臺 2020-02-22 16:32:45.841 INFO 57133 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:61 消息內容:Demo01Message{id=618574636}]

符合預期。在 Producer 發送的消息之后,Consumer 確實 10 秒后才消費消息。

5. 消費重試

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-retry

RocketMQ 提供消費重試的機制。在消息消費失敗的時候,RocketMQ 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 有機會重新消費消息,實現消費成功。

當然,RocketMQ 并不會無限重新投遞消息給 Consumer 重新消費,而是在默認情況下,達到 16 次重試次數時,Consumer 還是消費失敗時,該消息就會進入到死信隊列

死信隊列用于處理無法被正常消費的消息。當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。

RocketMQ 將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。在 RocketMQ 中,可以通過使用 console 控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費。

每條消息的失敗重試,是有一定的間隔時間。實際上,消費重試是基于「5. 定時消息」?來實現,第一次重試消費按照延遲級別為?3?開始。😈 所以,默認為 16 次重試消費,也非常好理解,畢竟延遲級別最高為 18 呀。

不過要注意,只有集群消費模式下,才有消息重試。

下面,我們來搭建一個 RocketMQ 消息重試的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-retry?來模擬消費失敗后的重試

5.1 復制項目

將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-retry。

5.2 配置文件

修改?application.yml?配置文件,增加消費重試相關的兩個配置項?delay-level-when-next-consume?和?max-attempts。最終配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Consumer 配置項,對應 ConsumerProperties 類consumer:max-attempts: 1# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費delay-level-when-next-consume: 0 # 異步消費消息模式下消費失敗重試策略,默認為 0server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

① 對于?delay-level-when-next-consume?配置項,一共有三種選擇:

  • -1:不重復,直接放入死信隊列
  • 0:RocketMQ Broker 控制重試策略
  • 0:RocketMQ Consumer 控制重試策略

可能胖友對 Broker 和 Consumer 控制重試策略有點懵逼!?每天消息首次消費失敗時,Consumer 會發回給 Broker,并告訴 Broker 按照什么延遲級別開始,不斷重新投遞給 Consumer 直到消費成功或者到達最大延遲級別。

舉個例子,如果這里我們設置了?delay-level-when-next-consume?配置項為 18,則 2 小時后 Broker 會投遞該消息給 Consumer 進行重新消費。

一般情況下,我們設置?delay-level-when-next-consume?配置項為 0 即可,使用 Broker 控制重試策略即可。默認配置下,Broker 會使用延遲級別從 3 開始,10 秒后 Broker 會投遞該消息給 Consumer 進行重新消費。

② 對于?max-attempts?配置項,每次拉取到消息到本地時,如果消費重試,本地重試的最大總次數(包括第一次)。這個是 Spring Cloud Stream 提供的通用消費重試功能,是?Consumer?級別的,而 RocketMQ 提供的獨有消費重試功能,是?Broker?級別的。

因為 Spring Cloud Stream 提供的重試間隔,是通過 sleep 實現,會占掉當前線程,影響 Consumer 的消費速度,所以這里并不推薦使用,因此設置?max-attempts?配置項為 1,禁用 Spring Cloud Stream 提供的重試功能,使用 RocketMQ 提供的重試功能

友情提示:如果胖友無法保證消費重試不會帶來副作用,也就是說無法保證消費的冪等性,建議關閉消費重試功能,即設置?delay-level-when-next-consume?配置項為 -1,max-attempts?配置項為 1。

5.3 Demo01Consumer

修改?Demo01Consumer?類,在消費消息時拋出異常,從而模擬消費錯誤。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}}

5.4 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:

// Demo01Consumer 第一次消費失敗,拋出 RuntimeException 異常 2020-02-22 19:18:52.241 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:18:52.245 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第一次重試消費失敗,拋出 RuntimeException 異常。間隔了 10 秒,對應延遲級別 3 。 2020-02-22 19:19:02.259 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:02.259 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// Demo01Consumer 第二次重試消費失敗,拋出 RuntimeException 異常。間隔了 30 秒,對應延遲級別 4 。 2020-02-22 19:19:32.266 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:69 消息內容:Demo01Message{id=-604160799}] 2020-02-22 19:19:32.266 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略// ... 省略,后續還有重試

符合預期。從日志中,我們可以看到,消息因為消費失敗后,又重試消費了多次。

6. 消費異常處理機制

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-error-handler

在 Spring Cloud Stream 中,提供了通用的消費異常處理機制,可以攔截到消費者消費消息時發生的異常,進行自定義的處理邏輯。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「5. 消費重試」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-retry?復制出?labx-06-sca-stream-rocketmq-consumer-error-handler?來演示消費異常處理機制

6.1 復制項目

將「5. 消費重試」小節的?labx-06-sca-stream-rocketmq-consumer-retry,復制出?labx-06-sca-stream-rocketmq-consumer-error-handler。

6.2 Demo01Consumer

修改?Demo01Consumer?類,增加消費異常處理方法。完整代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT) // 對應 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);// <X> 注意,此處拋出一個 RuntimeException 異常,模擬消費失敗throw new RuntimeException("我就是故意拋出一個異常");}@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")public void handleError(ErrorMessage errorMessage) {logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[handleError][headers:{}]", errorMessage.getHeaders());}@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannelpublic void globalHandleError(ErrorMessage errorMessage) {logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());}}

① 在 Spring Integration 的設定中,若?#onMessage(@Payload Demo01Message message)?方法消費消息發生異常時,會發送錯誤消息(ErrorMessage)到對應的錯誤 Channel(<destination>.<group>.errors)中。同時,所有錯誤 Channel 都橋接到了 Spring Integration 定義的全局錯誤 Channel(errorChannel)

友情提示:先暫時記住 Spring Integration 這樣的設定,艿艿也沒去深究 T T,也是一臉懵逼。

因此,我們有兩種方式來實現異常處理:

  • 局部的異常處理:通過訂閱指定錯誤 Channel
  • 全局的異常處理:通過訂閱全局錯誤 Channel

② 在?#handleError(ErrorMessage errorMessage)?方法上,我們聲明了?@ServiceActivator?注解,訂閱指定錯誤 Channel的錯誤消息,實現?#onMessage(@Payload Demo01Message message)?方法的局部異常處理。如下圖所示:

③ 在?#globalHandleError(ErrorMessage errorMessage)?方法上,我們聲明了?@StreamListener?注解,訂閱全局錯誤 Channel的錯誤消息,實現全局異常處理。

④ 在全局局部異常處理都定義的情況下,錯誤消息僅會被符合條件局部錯誤異常處理。如果沒有符合條件的,錯誤消息才會被全局異常處理。

6.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送一條消息。IDEA 控制臺輸出日志如下:

// onMessage 方法 2020-02-20 00:47:34.487 INFO 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:60 消息內容:Demo01Message{id=-317670393}]// handleError 方法 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][payload:RuntimeException: 我就是故意拋出一個異常] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=3, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=0A258102FE8918B4AAC2620411310017, rocketmq_SYS_FLAG=0, id=dc6dafb1-b303-7931-5977-45f319b935d9, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, rocketmq_BORN_TIMESTAMP=1582130833713, timestamp=1582130854444}]] 2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][headers:{id=cdf37b5d-878c-3d85-1f40-7711a3642a16, timestamp=1582130854489}]

不過要注意,如果異常處理方法成功,沒有重新拋出異常,會認定為該消息被消費成功,所以就不會進行消費重試。

7. 廣播消費

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-broadcasting

在上述的示例中,我們看到的都是使用集群消費,也是最常用的消費模式。而在一些場景下,我們需要使用廣播消費

廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。

例如說,在應用中,緩存了數據字典等配置表在內存中,可以通過 RocketMQ 廣播消費,實現每個應用節點都消費消息,刷新本地內存的緩存。

又例如說,我們基于 WebSocket 實現了 IM 聊天,在我們給用戶主動發送消息時,因為我們不知道用戶連接的是哪個提供 WebSocket 的應用,所以可以通過 RocketMQ 廣播消費,每個應用判斷當前用戶是否是和自己提供的 WebSocket 服務連接,如果是,則推送消息給用戶。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,使用?labx-06-sca-stream-rocketmq-producer-demo?發送消息,從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示廣播消費

7.1 復制項目

將「2. 快速入門」小節的?labx-06-sca-stream-rocketmq-consumer-demo,復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting。

7.2 配置文件

修改?application.yml?配置文件,設置?broadcasting?配置項為?true,開啟廣播消費的模式。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01-X # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: true # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

7.3 簡單測試

① 執行?ConsumerApplication 兩次,啟動兩個消費者的實例,從而實現在消費者分組?demo01-consumer-group-DEMO-TOPIC-01?下有兩個消費者實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口三次,發送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:

// ConsumerApplication 控制臺 01 2020-02-20 01:20:06.886 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:78 消息內容:Demo01Message{id=-1253930234}]// ConsumerApplication 控制臺 02 2020-02-20 01:20:06.884 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-335590634}] 2020-02-20 01:20:18.368 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=283364059}] 2020-02-20 01:20:24.422 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:75 消息內容:Demo01Message{id=-1253930234}]

符合預期。從日志可以看出,每條消息僅被每個消費者消費了一次。

8. 順序消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-orderly
  • 消費者:labx-06-sca-stream-rocketmq-consumer-orderly

RocketMQ 提供了兩種順序級別:

  • 普通順序消息:Producer 將相關聯的消息發送到相同的消息隊列。
  • 完全嚴格順序:在【普通順序消息】的基礎上,Consumer 嚴格順序消費。

官方文檔是這么描述的:

消息有序,指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ 可以嚴格的保證消息有序。

順序消息分為全局順序消息與分區順序消息,全局順序是指某個 Topic 下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。

  • 全局順序:對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
  • 分區順序:對于指定的一個 Topic,所有消息根據 Sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。適用場景:性能要求高,以 Sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。

注意,分區順序就是普通順序消息,全局順序就是完全嚴格順序。

下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息

8.1 搭建生產者

從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-orderly?來演示發送順序消息

8.1.1 配置文件

修改?application.yml?配置文件,添加?partition-key-expression?配置項,設置 Producer 發送順序消息的 Sharding key。完整配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Producer 配置項,對應 ProducerProperties 類producer:partition-key-expression: payload['id'] # 分區 key 表達式。該表達式基于 Spring EL,從消息中獲得分區 key。# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

①?partition-key-expression?配置項,該表達式基于?Spring EL,從消息中獲得 Sharding key。

這里,我們設置該配置項為?payload['id'],表示從 Spring?Message?的 payload 的?id。稍后我們發送的消息的 payload 為 Demo01Message,那么?id?就是?Demo01Message.id。

如果我們想從消息的 headers 中獲得 Sharding key,可以設置為?headers['partitionKey']。

② Spring Cloud Stream 使用?PartitionHandler?進行 Sharding key 的獲得與計算,最終 Sharding key 的結果為?key.hashCode() % partitionCount。

感興趣的胖友,可以閱讀 PartitionHandler 的?#determinePartition(Message<?> message)?方法。

在獲取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的?PartitionMessageQueueSelector?選擇消息發送的隊列。

我們以發送一條?id?為 1 的 Demo01Message 消息為示例,最終會發送到對應 RocketMQ Topic 的隊列為 1。計算過程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表達式,從 Message 中獲得 Sharding key key => 1// 第二步,PartitionHandler 計算最終的 Sharding key // 默認情況下,每個 RocketMQ Topic 的隊列總數是 4。 key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1// 第三步,PartitionMessageQueueSelector 獲得對應 RocketMQ Topic 的隊列 隊列 => queues.get(key) = queues.get(1)

這樣,我們就能保證相同 Sharding Key?的消息,發送到相同的對應 RocketMQ Topic 的隊列中。當前,前提是該 Topic 的隊列總數不能變噢,不然計算的 Sharding Key 會發生變化。

8.1.2 Demo01Controller

修改?Demo01Controller?類,增加發送 3 條順序消息的 HTTP 接口。代碼如下:

@GetMapping("/send_orderly") public boolean sendOrderly() {// 發送 3 條相同 id 的消息int id = new Random().nextInt();for (int i = 0; i < 3; i++) {// 創建 MessageDemo01Message message = new Demo01Message().setId(id);// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }

每次發送的 3 條消息使用相同的?id,配合上我們使用它作為 Sharding key,就可以發送對應 Topic 的相同隊列中。

另外,整列發送的雖然是順序消息,但是和發送普通消息的代碼是一模一樣的。

8.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-broadcasting?來演示順序消費消息

8.2.1 配置文件

修改?application.yml?配置文件,添加?orderly?配置項,設置 Consumer 順序消費消息。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費orderly: true # 是否順序消費,默認為 false 并發消費。server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

8.2.2 Demo01Consumer

修改?Demo01Consumer?類,在消費消息時,打印出消息所在隊列編號線程編號,這樣我們通過隊列編號可以判斷消息是否順序發送,通過線程編號可以判斷消息是否順序消費。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(Message<?> message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

8.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_orderly?接口,發送順序消息。IDEA 控制臺輸出日志如下:

2020-02-20 21:26:52.044 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212037, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD850006, rocketmq_SYS_FLAG=0, id=945725a1-abfb-218a-d480-b220adff9549, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212044}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212039, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD870007, rocketmq_SYS_FLAG=0, id=86a0e912-3cba-8b5b-3928-a7ef0ad80036, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]] 2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212041, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD890008, rocketmq_SYS_FLAG=0, id=b04416a3-60c2-bf42-a5a4-fe3c5079cc55, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]]

id?為 58569988 的消息被發送到 RocketMQ 消息隊列編號為 0,并且在線程編號為 76 的線程中消費。😈 胖友可以自己在多調用幾次接口,繼續嘗試。

9. 消息過濾

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-demo
  • 消費者:labx-06-sca-stream-rocketmq-consumer-filter

RocketMQ 提供了兩種方式給 Consumer 進行消息的過濾:

  • 基于 Tag 過濾

    標簽(Tag):為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。

  • 基于?SQL92?過濾

消息過濾目前是在?Broker?端實現的,優點是減少了 Broker 和 Consumer 之間的無用消息的網絡傳輸,缺點是增加了 Broker 的負擔、而且實現相對復雜。

一般情況下,我們使用 Tag 過濾較多,我們來搭建一個 RocketMQ 使用 Tag 進行消息過濾的示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 修改?labx-06-sca-stream-rocketmq-producer-demo?發送帶有 Tag 的消息
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。

先搭建消費者。

9.1 Demo01Controller

修改?Demo01Controller?類,增加發送 3 條帶 Tag 的消息的 HTTP 接口。代碼如下:

@GetMapping("/send_tag") public boolean sendTag() {for (String tag : new String[]{"yunai", "yutou", "tudou"}) {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, tag) // <X> 設置 Tag.build();// 發送消息mySource.demo01Output().send(springMessage);}return true; }

在?<X>?處,通過添加頭?MessageConst.PROPERTY_TAGS,設置發送消息的?Tag

再搭建消費者。

9.2 復制項目

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter?來使用 Tag 過濾消息來消費。

9.3 配置文件

修改?application.yml?配置文件,設置?tags?配置項為?yunai || yutou,只消費帶有 Tag 為?yunai?或?yutou?的消息。完整配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSONgroup: demo01-consumer-group-DEMO-TOPIC-01 # 消費者分組# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費tags: yunai || yutou # 基于 Tag 訂閱,多個 Tag 使用 || 分隔,默認為空sql: # 基于 SQL 訂閱,默認為空server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

如果胖友想要基于 SQL92 過濾消息,可以通過設置?sql?配置項。

9.4 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:

2020-02-20 22:41:57.639 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=687868446}] 2020-02-20 22:41:57.641 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:76 消息內容:Demo01Message{id=1088622557}]

只消費了兩條消息,目測 Tag 為?tudou?的消息已經被過濾了。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。

9.5 Demo01Consumer

咳咳咳:不知道如何取這標題,暫時用這個噶。

上面我們看到的是 RocketMQ?獨有的?Broker級別的消息過濾機制,而 Spring Cloud Stream 提供了通用的?Consumer?級別的效率過濾器機制。我們只需要使用?@StreamListener?注解的?condition?屬性,設置消息滿足指定 Spring EL 表達式的情況下,才進行消費。

> /** > * A condition that must be met by all items that are dispatched to this method. > * @return a SpEL expression that must evaluate to a {@code boolean} value. > */ > String condition() default ""; >

修改?Demo01Consumer?類,使用?@StreamListener?注解的?condition?屬性來過濾消息。代碼如下:

@Component public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());// @StreamListener(MySink.DEMO01_INPUT) // public void onMessage(@Payload Demo01Message message) { // logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message); // }@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['rocketmq_TAGS'] == 'yunai'")public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);}}

這里我們設置消息的 Header 帶有的?rocketmq_TAGS?值為?yunai?時,才進行消費。

9.6 再次測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_tag?接口,發送帶有 Tag 的消息。IDEA 控制臺輸出日志如下:

/ Tag 為 `yunai` 的消息被消費 2020-02-20 22:59:11.597 INFO 81438 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:64 消息內容:Demo01Message{id=124549390}]// Tag 為 `yutou` 的消息被過濾 2020-02-20 22:59:11.599 WARN 81438 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: 5edff575-b9a7-e011-154a-532077994685

只消費了一條消息,目測 Tag 為?tudou?的消息被 Broker 過濾,Tag 為?yutou?的消息被 Consumer 過濾。要注意,被過濾掉的消息,后續是無法被消費掉了,效果和消費成功是一樣的。

10. 事務消息

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-transaction
  • 消費者:labx-06-sca-stream-rocketmq-consumer-demo

在分布式消息隊列中,目前唯一提供完整的事務消息的,只有 RocketMQ 。關于這一點,還是可以鼓吹下的。

可能會有胖友怒噴艿艿,RabbitMQ 和 Kafka 也有事務消息啊,也支持發送事務消息的發送,以及后續的事務消息的 commit提交或 rollbackc 回滾。但是要考慮一個極端的情況,在本地數據庫事務已經提交的時時候,如果因為網絡原因,又或者崩潰等等意外,導致事務消息沒有被 commit ,最終導致這條事務消息丟失,分布式事務出現問題。

相比來說,RocketMQ 提供事務回查機制,如果應用超過一定時長未 commit 或 rollback 這條事務消息,RocketMQ 會主動回查應用,詢問這條事務消息是 commit 還是 rollback ,從而實現事務消息的狀態最終能夠被 commit 或是 rollback ,達到最終事務的一致性。

這也是為什么艿艿在上面專門加粗“完整的”三個字的原因。可能上述的描述,對于絕大多數沒有了解過分布式事務的胖友,會比較陌生,所以推薦閱讀如下兩篇文章:

  • 《阿里云消息隊列 MQ —— 事務消息》
  • 《芋道 RocketMQ 源碼解析 —— 事務消息》

熱心的艿艿:雖然說 RabbitMQ、Kafka 并未提供完整的事務消息,但是社區里,已經基于它們之上拓展,提供了事務回查的功能。例如說:Myth?,采用消息隊列解決分布式事務的開源框架, 基于 Java 語言來開發(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架進行分布式事務。

下面,我們來搭建一個 RocketMQ 定時消息的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目,修改?labx-06-sca-stream-rocketmq-producer-transaction?發送事務消息,繼續使用?labx-06-sca-stream-rocketmq-consumer-demo?消費消息。

10.1 復制項目

從?labx-06-sca-stream-rocketmq-producer-demo?復制出?labx-06-sca-stream-rocketmq-producer-transaction?來發送事務消息

10.2 配置文件

修改?application.yml?配置文件,添加?transactional?配置項為?true,設置 Producer 發送事務消息。完整配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: DEMO-TOPIC-01 # 目的地。這里使用 RocketMQ Topiccontent-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: test # 生產者分組sync: true # 是否同步發送消息,默認為 false 異步。transactional: true # 是否發送事務消息,默認為 false。server:port: 18080

10.3 Demo01Controller

修改?Demo01Controller?類,增加發送事務消息的 HTTP 接口。代碼如下:

@GetMapping("/send_transaction") public boolean sendTransaction() {// 創建 MessageDemo01Message message = new Demo01Message().setId(new Random().nextInt());// 創建 Spring Message 對象Args args = new Args().setArgs1(1).setArgs2("2");Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader("args", JSON.toJSONString(args)) // <X>.build();// 發送消息return mySource.demo01Output().send(springMessage); }public static class Args { // 這里作為示例,所以直接這么寫了private Integer args1;private String args2;// ... 省略 setter、getter、toString 方法 }

因為 Spring Cloud Stream 在設計時,并沒有考慮事務消息,所以我們只好在?<X>?處,通過 Header 傳遞參數。

又因為 Header 后續會被轉換成 String 類型,導致我們無法獲得正確的真實的原始參數,所以這里我們先使用 JSON 將?args?參數序列化成字符串,這樣后續我們可以使用 JSON 反序列化回來。

10.4 TransactionListenerImpl

創建?TransactionListenerImpl?類,實現 MQ 事務的監聽。代碼如下:

@RocketMQTransactionListener(txProducerGroup = "test") public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 從消息 Header 中解析到 args 參數,并使用 JSON 反序列化Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),Demo01Controller.Args.class);// ... local transaction process, return rollback, commit or unknownlogger.info("[executeLocalTransaction][執行本地事務,消息:{} args:{}]", msg, args);return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return rollback, commit or unknownlogger.info("[checkLocalTransaction][回查消息:{}]", msg);return RocketMQLocalTransactionState.COMMIT;}}

① 在類上,添加?@RocketMQTransactionListener?注解,聲明監聽器的是生產者分組是?"test"?的 Producer 發送的事務消息。因為 RocketMQ 是回查(請求)指定指定生產分組下的 Producer,從而獲得事務消息的狀態,所以一定要正確設置。

② 實現?RocketMQLocalTransactionListener?接口,實現執行本地事務和檢查本地事務的方法。

③ 實現?#executeLocalTransaction(...)?方法,實現執行本地事務。

  • 注意,這是一個模板方法。在調用這個方法之前,Spring Cloud Alibaba Stream RocketMQ 已經使用 Producer 發送了一條事務消息。然后根據該方法執行的返回的?RocketMQLocalTransactionState?結果,提交還是回滾該事務消息。

    友情提示:感興趣的胖友,可以看看 DefaultMQProducerImpl 的?#sendMessageInTransaction(...)?的源碼,整個模板方法是怎么執行的。

  • 😈 這里,我們為了模擬 RocketMQ 回查 Producer 來獲得事務消息的狀態,所以返回了?RocketMQLocalTransactionState.UNKNOWN?未知狀態。

④ 實現?#checkLocalTransaction(...)?方法,檢查本地事務。

  • 在事務消息長事件未被提交或回滾時,RocketMQ 會回查事務消息對應的生產者分組下的 Producer ,獲得事務消息的狀態。此時,該方法就會被調用。
  • 😈 這里,我們直接返回?RocketMQLocalTransactionState.COMMIT?提交狀態。

一般來說,有兩種方式實現本地事務回查時,返回事務消息的狀態。

第一種,通過?msg?消息,獲得某個業務上的標識或者編號,然后去數據庫中查詢業務記錄,從而判斷該事務消息的狀態是提交還是回滾。

第二種,記錄?msg?的事務編號,與事務狀態到數據庫中。

  • 第一步,在?#executeLocalTransaction(...)?方法中,先存儲一條?id?為?msg?的事務編號,狀態為?RocketMQLocalTransactionState.UNKNOWN?的記錄。
  • 第二步,調用帶有事務的業務 Service 的方法。在該 Service 方法中,在邏輯都執行成功的情況下,更新?id?為?msg?的事務編號,狀態變更為?RocketMQLocalTransactionState.COMMIT?。這樣,我們就可以伴隨這個事務的提交,更新?id?為?msg?的事務編號的記錄的狀為?RocketMQLocalTransactionState.COMMIT?,美滋滋。。
  • 第三步,要以?try-catch?的方式,調用業務 Service 的方法。如此,如果發生異常,回滾事務的時候,可以在?catch?中,更新?id?為?msg?的事務編號的記錄的狀態為?RocketMQLocalTransactionState.ROLLBACK?。😭 極端情況下,可能更新失敗,則打印 error 日志,告警知道,人工介入。
  • 如此三步之后,我們在?#executeLocalTransaction(...)?方法中,就可以通過查找數據庫,id?為?msg?的事務編號的記錄的狀態,然后返回。

相比來說,艿艿傾向第二種,實現更加簡單通用,對于業務開發者,更加友好。和有幾個朋友溝通了下,他們也是采用第二種。

10.5 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send_transaction?接口,發送事務消息。IDEA 控制臺輸出日志如下:

// ProduerApplication 控制臺 // ### TransactionListenerImpl 執行 executeLocalTransaction 方法,先執行本地事務的邏輯 2020-02-21 00:14:08.773 INFO 83052 --- [io-18080-exec-1] c.i.s.l.r.p.l.TransactionListenerImpl : [executeLocalTransaction][執行本地事務,消息:GenericMessage [payload=byte[17], headers={args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, id=d8604733-9083-5d19-15b4-bda0c549e9d1, contentType=application/json, timestamp=1582215248772}] args:Args{args1=1, args2='2'}] // ### Producer 發送事務消息成功,但是因為 executeLocalTransaction 方法返回的是 UNKOWN 狀態,所以事務消息并未提交或者回滾 // ### RocketMQ Broker 在發送事務消息 30 秒后,發現事務消息還未提交或是回滾,所以回查 Producer 。此時,checkLocalTransaction 方法返回 COMMIT ,所以該事務消息被提交 2020-02-21 00:14:48.685 INFO 83052 --- [pool-1-thread-1] c.i.s.l.r.p.l.TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1582215248763, args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0A25810200002A9F000000000002868F, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, rocketmq_SYS_FLAG=0, id=62383992-5015-f957-41e7-75ec5ace4496, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582215288685}]]// ConsumerApplication 控制臺 // ### 事務消息被提交,所以該消息被 Consumer 消費 2020-02-21 00:14:48.756 INFO 83058 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:79 消息內容:Demo01Message{id=1950986029}]

整個的執行過程,看看艿艿在日志上添加的說明。

11. 監控端點

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-actuator
  • 消費者:labx-06-sca-stream-rocketmq-consumer-actuator

Spring Cloud Stream 的?endpoint?模塊,基于 Spring Boot Actuator,提供了自定義監控端點?bindings?和?channels,用于獲取 Spring Cloud Stream 的 Binding 和 Channel 信息。

同時,Spring Cloud Alibaba Stream RocketMQ 拓展了 Spring Boot Actuator 內置的?health?端點,通過自定義的?RocketMQBinderHealthIndicator,獲取 RocketMQ 客戶端的健康狀態。

友情提示:對 Spring Boot Actuator 不了解的胖友,可以后續閱讀《芋道 Spring Boot 監控端點 Actuator 入門》文章。

我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。

11.1 搭建生產者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-actuator,查看生產者的監控端點結果。

11.1.1 引入依賴

在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:

<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>

11.1.2 配置文件

修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:

management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。

每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。

11.1.3 簡單測試

① 使用 ProducerApplication 啟動生產者。

② 訪問應用的?bindings?監控端點?http://127.0.0.1:18080/actuator/bindings,返回結果如下圖:

③ 訪問應用的?channels?監控端點?http://127.0.0.1:18080/actuator/channels,返回結果如下圖:

④ 訪問應用的?health?監控端點?http://127.0.0.1:18080/actuator/health,返回結果如下圖:

11.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-filter,查看消費者的監控端點結果。

11.2.1 引入依賴

在?pom.xml?文件中,額外引入 Spring Boot Actuator 相關依賴。代碼如下:

<!-- 實現對 Actuator 的自動化配置 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>

11.2.2 配置文件

修改?application.yaml?配置文件,額外增加 Spring Boot Actuator 配置項。配置如下:

management:endpoints:web:exposure:include: '*' # 需要開放的端點。默認值只打開 health 和 info 兩個端點。通過設置 * ,可以開放所有端點。endpoint:# Health 端點配置項,對應 HealthProperties 配置類health:enabled: true # 是否開啟。默認為 true 開啟。show-details: ALWAYS # 何時顯示完整的健康信息。默認為 NEVER 都不展示。可選 WHEN_AUTHORIZED 當經過授權的用戶;可選 ALWAYS 總是展示。

每個配置項的作用,胖友看下艿艿添加的注釋。如果還不理解的話,后續看下《芋道 Spring Boot 監控端點 Actuator 入門》文章。

112.3 簡單測試

① 使用 ConsumerApplication 啟動消費者,隨機端口為 19541。

② 訪問應用的?bindings?監控端點?http://127.0.0.1:19541/actuator/bindings,返回結果如下圖:

③ 訪問應用的?channels?監控端點?http://127.0.0.1:19541/actuator/channels,返回結果如下圖:

④ 訪問應用的?health?監控端點?http://127.0.0.1:19541/actuator/health,返回結果如下圖:

12. 更多的配置項信息

Spring Cloud Alibaba Stream RocketMQ 提供的配置項挺多的,我們參考文檔將配置項一起梳理下。

RocketMQ Binder Properties

以?spring.cloud.stream.rocketmq.binder?為前綴。

配置項說明默認值
name-serverRocketMQ NameServer 地址127.0.0.1:9876
access-key阿里云賬號 AccessKey
secret-key阿里云賬號 SecretKey
enable-msg-trace是否為 Producer 和 Consumer 開啟消息軌跡功能true
customized-trace-topic消息軌跡開啟后存儲的 Topic 名稱RMQ_SYS_TRACE_TOPIC

RocketMQ Consumer Properties

以?spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.?為前綴。

配置項說明默認值
enable是否啟用 Consumertrue
tagsConsumer 基于 TAGS 訂閱,多個 tag 以?||?分割
sqlConsumer 基于 SQL 訂閱
broadcasting是Consumer 是否是廣播消費模式。如果想讓所有的訂閱者都能接收到消息,可以使用廣播模式false
orderlyConsumer 是否同步消費消息模式false
delayLevelWhenNextConsume異步消費消息模式下消費失敗重試策略:-1, 不重復,直接放入死信隊列;0, Broker 控制重試策略;>0, Client 控制重試策略0
suspendCurrentQueueTimeMillis同步消費消息模式下消費失敗后再次消費的時間間隔1000

RocketMQ Provider Properties

配置項說明默認值
enable是否啟用 Producertrue
groupProducer 分組
maxMessageSize消息發送的最大字節數8249344
transactional是否發送事務消息false
sync是否使用同步得方式發送消息false
vipChannelEnabled是否在 Vip Channel 上發送消息true
sendMessageTimeout發送消息的超時時間(毫秒)3000
compressMessageBodyThreshold消息體壓縮閥值(當消息體超過 4k 的時候會被壓縮)4096
retryTimesWhenSendFailed在同步發送消息的模式下,消息發送失敗的重試次數2
retryTimesWhenSendAsyncFailed在異步發送消息的模式下,消息發送失敗的重試次數2
retryNextServer消息發送失敗的情況下是否重試其它的 Brokerfalse

13.接入阿里云的消息隊列 RocketMQ

示例代碼對應倉庫:

  • 生產者:labx-06-sca-stream-rocketmq-producer-aliyun
  • 消費者:labx-06-sca-stream-rocketmq-consumer-aliyun

在阿里云上,提供消息隊列?RocketMQ?服務。那么,我們是否能夠使用 Spring Cloud Alibaba Stream RocketMQ 實現阿里云 RocketMQ 的消息的發送與消費呢?

答案是可以。在?《阿里云 —— 消息隊列 MQ —— 開源 Java SDK 接入說明》?中,提到目前開源的 Java SDK 可以接入阿里云 RocketMQ 服務。

如果您已使用開源 Java SDK 進行生產,只需參考方法,重新配置參數,即可實現無縫上云。

前提條件

  • 已在阿里云 MQ 控制臺創建資源,包括 Topic、Group ID(GID)、接入點(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
  • 已下載開源 RocketMQ 4.5.1 或以上版本,以支持連接阿里云 MQ。

我們來搭建一個 Stream RocketMQ 監控端點的使用示例。考慮方便,我們直接復用「2. 快速入門」小節的項目:

  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者
  • 從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者

13.1 搭建生產者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為生產者

修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:

spring:application:name: demo-producer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-output:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSON# Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-output:# RocketMQ Producer 配置項,對應 RocketMQProducerProperties 類producer:group: GID_PRODUCER_GROUP_YUNAI_TEST # 生產者分組 <ALIYUN>sync: true # 是否同步發送消息,默認為 false 異步。server:port: 18080

注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Producer Group。

13.2 搭建消費者

從?labx-06-sca-stream-rocketmq-consumer-demo?復制出?labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作為消費者

修改?application.yaml?配置文件,添加?access-key、secret-key?配置項,設置訪問阿里云 RocketMQ 的賬號。完全配置如下:

spring:application:name: demo-consumer-applicationcloud:# Spring Cloud Stream 配置項,對應 BindingServiceProperties 類stream:# Binding 配置項,對應 BindingProperties Mapbindings:demo01-input:destination: TOPIC_YUNAI_TEST # 目的地。這里使用 RocketMQ Topic <ALIYUN>content-type: application/json # 內容格式。這里使用 JSONgroup: GID_PRODUCER_GROUP_YUNAI_TEST # 消費者分組 <ALIYUN># Spring Cloud Stream RocketMQ 配置項rocketmq:# RocketMQ Binder 配置項,對應 RocketMQBinderConfigurationProperties 類binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>access-key: ${ALIYUN_ACCESS_KEY} # 阿里云賬號 AccessKeysecret-key: ${ALIYUN_SECRET_KEY} # 阿里云賬號 SecretKey# RocketMQ 自定義 Binding 配置項,對應 RocketMQBindingProperties Mapbindings:demo01-input:# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類consumer:enabled: true # 是否開啟消費,默認為 truebroadcasting: false # 是否使用廣播消費,默認為 false 使用集群消費server:port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者

注意,<ALIYUN>?處的三個配置項,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Consumer Group。

13.3 簡單測試

① 執行?ConsumerApplication,啟動消費者的實例。

② 執行?ProducerApplication,啟動生產者的實例。

之后,請求?http://127.0.0.1:18080/demo01/send?接口,發送消息。IDEA 控制臺輸出日志如下:

// ConsumerApplication 控制臺 2020-02-21 01:45:16.982 INFO 85901 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][線程編號:89 消息內容:Demo01Message{id=-724066118}]

總結

以上是生活随笔為你收集整理的Spring Cloud Alibaba RocketMQ 快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

激情网站 | 69久久夜色精品国产69 | 97视频免费播放 | 国产高清日韩 | 国产精品久久久久永久免费看 | www日韩欧美| 成年人免费观看在线视频 | 久草精品视频在线观看 | 国产手机视频精品 | 激情综合啪啪 | 在线免费视频 你懂得 | www视频在线播放 | 日韩av电影中文字幕 | 精品国产综合区久久久久久 | 国产成人精品一区二区三区在线观看 | 99久热在线精品 | 国产亚洲精品v | 国产成人精品一区二三区 | 欧美日韩视频在线观看一区二区 | 日日夜夜综合 | 亚洲黄色在线免费观看 | 成人小视频在线观看免费 | 亚洲国产午夜精品 | 国产精品精 | 国产成人福利在线观看 | 国产精品一区二区白浆 | 欧美日韩中文字幕在线视频 | 91av免费观看 | 三级黄色网络 | 婷婷久久国产 | 国产又黄又爽无遮挡 | 在线免费看黄网站 | 丁香综合av | 伊人天堂久久 | 日本精品视频在线观看 | 国产精品久久电影观看 | 国产一区成人在线 | 精品国产一区二区三区久久影院 | 色婷婷激情五月 | 天天操人人要 | 国产 亚洲 欧美 在线 | 国产日韩欧美自拍 | www.av小说 | 狠狠久久综合 | 国产精品h在线观看 | 高清免费在线视频 | 亚洲天堂免费视频 | 久久久久久久久黄色 | 欧美成天堂网地址 | 国产在线传媒 | 黄色的网站在线 | 国产v亚洲v | 久久久久亚洲最大xxxx | 成片免费观看视频999 | 欧美性极品xxxx娇小 | 中文字幕一区二区三区乱码在线 | 欧美极品久久 | 亚洲免费在线播放视频 | 欧美国产日韩在线观看 | 亚洲视频国产 | 97天堂网 | 国产精品v欧美精品v日韩 | 99久久精品免费看国产免费软件 | 丁香 久久 综合 | 六月色丁香 | 国产在线观看免费 | 色妞色视频一区二区三区四区 | 97免费在线视频 | 国产成人高清av | 亚洲另类人人澡 | 欧美精品久久久久久久免费 | 2018亚洲男人天堂 | 又黄又刺激又爽的视频 | 一区二区精品在线观看 | 成人久久18免费网站图片 | 国产中文字幕大全 | 久久不射网站 | 亚洲精品视频在线播放 | 色夜影院 | 国产精品一二三 | 国产人成在线观看 | 99精品黄色片免费大全 | 成人丁香花 | 国产精品网站 | 亚洲a色| 欧美日韩啪啪 | 999在线视频 | 激情欧美xxxx| 久草视频国产 | 久久只精品99品免费久23小说 | 国产精品丝袜 | 97精品国自产拍在线观看 | 日韩av中文在线观看 | 日韩理论片在线 | 91亚色在线观看 | 在线 视频 一区二区 | 久久久久国产a免费观看rela | 日本三级不卡视频 | 欧美视屏一区二区 | 亚洲激情国产精品 | aaa黄色毛片| 911久久 | 亚洲国产高清在线观看视频 | 四虎4hu永久免费 | 免费在线观看黄网站 | 中文字幕日韩有码 | 国产一区视频免费在线观看 | 亚洲精品18日本一区app | 国产成人精品一二三区 | 亚洲成人在线免费 | 中文字幕欲求不满 | 久久99国产精品视频 | 婷婷色伊人 | 超碰97久久 | 成人影音在线 | 欧美视频国产视频 | 欧美精品做受xxx性少妇 | av电影中文| 91久久国产综合精品女同国语 | 国产123区在线观看 国产精品麻豆91 | www.91av在线| 国产高清视频免费最新在线 | 麻豆精品在线 | 狠狠色丁香婷婷综合最新地址 | 国产成人精品在线 | 成人免费观看视频大全 | 国产精品一区欧美 | 成人不用播放器 | 在线精品在线 | 三级毛片视频 | 欧美成人黄色 | 日本黄色免费在线观看 | 久久久午夜电影 | 亚洲欧美视频网站 | 免费视频一区 | 激情网五月婷婷 | 亚洲免费激情 | 欧美一级片免费观看 | 欧美大jb | 色婷婷国产精品 | 久草综合在线观看 | 成 人 黄 色视频免费播放 | 国产 一区二区三区 在线 | 国产福利一区二区三区视频 | 欧美日韩精品久久久 | 91精品国产麻豆国产自产影视 | 天天综合日日夜夜 | 国产精品久久久久久久久久尿 | 国产午夜麻豆影院在线观看 | 国产成人精品电影久久久 | 色综合中文字幕 | 综合网伊人 | 蜜臀久久99精品久久久无需会员 | 亚洲精品在线一区二区 | 五月天视频网 | 国产99精品在线观看 | 久色伊人| 国产精品久久久久久麻豆一区 | 中文字幕电影一区 | 9ⅰ精品久久久久久久久中文字幕 | 久久激情五月丁香伊人 | 国产成人精品在线播放 | 久久久影视 | 色综合天天视频在线观看 | 色婷婷久久 | 精品国产乱码久久久久久1区二区 | 亚洲成人软件 | 色综合咪咪久久网 | 黄色精品网站 | 六月丁香激情综合 | 国产永久免费观看 | 伊人www22综合色 | 国产免费中文字幕 | 一区二区三区免费在线播放 | 欧美一区成人 | 精品免费久久久久久 | 免费国产在线观看 | 成人欧美一区二区三区在线观看 | 99精品在线看 | 国产主播大尺度精品福利免费 | 国产成人黄色网址 | 欧美大片第1页 | 欧洲一区二区在线观看 | 99久久网站 | 精品国产一区二区三区在线观看 | 狠狠狠狠狠狠狠狠 | 精品国产伦一区二区三区观看方式 | 亚洲欧洲日韩 | 在线观看日本高清mv视频 | 麻豆视频免费观看 | 亚洲黄色网络 | 亚洲作爱 | 色伊人网 | 在线观看免费版高清版 | 九九九在线观看视频 | 狠狠干在线| 久久综合久久综合久久 | 久久精品一二区 | 日韩电影一区二区三区在线观看 | 日韩 在线| 久草av在线播放 | 国产精品99久久免费黑人 | 亚洲一区二区三区毛片 | av黄色国产 | 欧美精品在线一区二区 | 五月婷婷电影网 | 国产手机视频在线 | 日本久久片 | 国产福利一区在线观看 | 在线观看亚洲专区 | 国产视频久久久久 | 香蕉91视频 | 国产精品一区二区av影院萌芽 | 天天综合网久久综合网 | 国产色在线,com | 黄色的片子| 国产精品久久久久久久久久久免费 | 开心色激情网 | 久久天天躁狠狠躁夜夜不卡公司 | 人人干在线| 成人一区二区三区在线观看 | 五月综合激情婷婷 | 天天色综合三 | 午夜色影院 | 毛片.com| 国产精品手机播放 | 免费观看mv大片高清 | 天天爱天天操天天干 | 日韩欧美一区二区在线观看 | 国产精品视频免费 | 精品国偷自产在线 | 欧美成人精品三级在线观看播放 | 久久国产精品成人免费浪潮 | 天天爱天天 | 色婷婷电影 | 亚洲第一中文字幕 | 国产激情免费 | a级国产片 | 午夜精品99久久免费 | 免费av网站观看 | 福利视频一区二区 | 欧美激情视频一区二区三区 | 99热99re6国产在线播放 | 日韩最新中文字幕 | 美女视频黄,久久 | 91专区在线观看 | 免费一级特黄毛大片 | 天天艹天天 | 国产vs久久| 在线国产视频观看 | 国产视频久久久久 | 亚洲免费国产视频 | 成人黄色小说网 | 国产不卡免费 | 久草在线一免费新视频 | 99久久99久久 | 91视频高清完整版 | 99精品欧美一区二区三区 | 日韩中文在线观看 | 亚洲欧洲av在线 | 91爱爱电影 | 一区二区久久久久 | 日本中文字幕视频 | 亚洲成人av在线电影 | 国产成人三级 | 99热超碰| 精品少妇一区二区三区在线 | 亚洲日本国产精品 | 中文字幕亚洲高清 | 色久av| 国产91成人 | 久久美女免费视频 | 日韩中文在线视频 | 国产又粗又猛又爽又黄的视频免费 | 丁香婷婷色 | 国产精品黄网站在线观看 | av一级片在线观看 | 深夜精品福利 | 黄色avwww | 欧美亚洲精品在线观看 | 伊人成人精品 | 国产精品欧美在线 | 日韩三级在线 | 国产精品系列在线观看 | 中文字幕亚洲五码 | 伊甸园av在线 | 成人在线观看影院 | 色天天综合久久久久综合片 | 国产一级黄色免费看 | 99久久久国产精品免费99 | 亚洲精品国产精品久久99 | 天天干,天天操 | 黄色影院在线观看 | 黄色国产高清 | 国产一级视频 | 欧美与欧洲交xxxx免费观看 | 色婷婷激情电影 | 久热久草 | 99视频免费在线观看 | 69国产盗摄一区二区三区五区 | 成人免费观看网站 | 国产无遮挡猛进猛出免费软件 | 久草视频在线看 | 国产精品高清一区二区三区 | 久久久久免费精品视频 | 三三级黄色片之日韩 | 久久不见久久见免费影院 | 天天操天天干天天操天天干 | www99精品| 99热99 | 免费看短 | 狠狠色丁香婷婷综合橹88 | 中文字幕一区在线 | 国产伦理久久精品久久久久_ | 成人黄色毛片视频 | 超碰97在线资源 | 国产精品精品久久久 | 久久五月网 | 天天干天天操 | 亚洲综合色婷婷 | 久久久亚洲成人 | 网站免费黄色 | 国产日韩精品一区二区在线观看播放 | av中文字幕在线看 | 有码一区二区三区 | 成人免费在线播放视频 | 欧美日本不卡视频 | www.夜夜操.com | 国产成人精品亚洲精品 | 美女视频永久黄网站免费观看国产 | 国产精品不卡一区 | 夜夜操网| 亚洲人成人99网站 | 精品成人久久 | 日日天天干 | 亚洲黄色av| 精品国产区 | 国产又粗又长又硬免费视频 | 久久综合射 | 一本色道久久综合亚洲二区三区 | 日韩在线电影一区二区 | 国产精品手机看片 | 又湿又紧又大又爽a视频国产 | 精品免费视频123区 午夜久久成人 | 婷婷色综合色 | 亚洲免费观看在线视频 | 亚洲精品在线视频观看 | 久久草 | 亚洲精品视频免费 | 欧美精品做受xxx性少妇 | 日本在线观看中文字幕无线观看 | 又黄又爽又湿又无遮挡的在线视频 | 五月天丁香亚洲 | 日韩激情视频在线 | 激情综合五月 | a级片网站| 免费在线观看亚洲视频 | 亚洲一区美女视频在线观看免费 | 日韩精品久久久久久久电影竹菊 | 国产亚洲人成网站在线观看 | 国产在线理论片 | 国产精品99久久久 | 精品久久久久久一区二区里番 | 久久国产经典 | 97热视频| 久久一本综合 | 91麻豆精品国产91久久久久久 | 最近更新的中文字幕 | av免费福利 | 国产无套精品久久久久久 | 国产精品一区二区白浆 | 最近中文字幕 | 丁香激情婷婷 | 久久五月激情 | 久久久久久久久久久免费视频 | 日本精品久久久久久 | 国产破处在线视频 | 国产91精品一区二区绿帽 | 国产一区二三区好的 | 国内精品久久久 | 日韩一级电影在线观看 | 久久一区二区免费视频 | 中文字幕在线观看完整版 | 国产不卡一二三区 | 日韩理论在线观看 | 狠狠操91 | 天天操天天干天天玩 | 在线观看视频一区二区三区 | 国产高清在线永久 | 久久综合免费视频 | 超碰在线日韩 | 久久伊人八月婷婷综合激情 | 最新在线你懂的 | www狠狠操 | 国产精品第二十页 | 伊人婷婷激情 | 国产高清一区二区 | 中文字幕999| 波多野结衣电影久久 | 久久国产欧美日韩精品 | 欧美美女视频在线观看 | 69久久99精品久久久久婷婷 | 欧美日韩不卡一区二区三区 | 人操人 | 国产黄色精品网站 | 色鬼综合网 | 视频91在线 | 日韩免费高清 | 免费黄色av电影 | 天堂在线成人 | 涩涩网站在线看 | 99精品视频播放 | 日韩一二三在线 | 2022中文字幕在线观看 | 青草视频在线 | 日本中文字幕网 | 毛片永久免费 | 99日精品 | 成x99人av在线www | 91av在线精品 | 丁香婷婷色综合亚洲电影 | www.天天色.com | 日韩免费一区二区在线观看 | 免费高清在线观看电视网站 | 国内精品久久久久久久久久久久 | 亚洲视频免费视频 | 日韩免费电影网站 | 夜夜操网站 | 日韩黄色中文字幕 | 久久久久久久久久电影 | 成人av影视在线 | 狠狠操天天射 | 精品一区二区三区在线播放 | 国产精品成人在线 | 又爽又黄又无遮挡网站动态图 | 国产精品久久久久久久久免费看 | 日日夜夜噜 | 国产精品一区二区三区在线免费观看 | 懂色av一区二区三区蜜臀 | 91伊人久久大香线蕉蜜芽人口 | 国产黄免费 | 亚洲综合小说电影qvod | 久久婷亚洲五月一区天天躁 | 国产一区二区网址 | 黄色aaaaa| 亚洲精品影院在线观看 | 国产在线 一区二区三区 | 操夜夜操| 91视频88av| 久久影视网 | 午夜黄色大片 | 国模视频一区二区三区 | 成人国产一区二区 | 在线观看一级片 | 日韩在线电影 | 亚洲夜夜网 | 亚洲禁18久人片 | 成人三级网站在线观看 | 毛片一级免费一级 | 久久国产一区二区三区 | 国产高清视频免费在线观看 | 亚洲欧美日韩一二三区 | 日日噜噜噜噜夜夜爽亚洲精品 | 五月天色站 | 中文字幕在线不卡国产视频 | 色吧av色av | 天天曰视频 | 99在线观看视频 | 亚洲午夜久久久久久久久久久 | 激情综合亚洲精品 | 国产成人精品亚洲精品 | 免费高清在线一区 | av导航福利 | 深爱激情婷婷网 | 日日射天天射 | 九九热视频在线 | 成人毛片100免费观看 | 精品亚洲欧美一区 | 精品一区 在线 | 国产精品门事件 | 免费久久久久久 | 亚洲精品在线一区二区 | 在线电影91 | 婷婷色伊人 | 久久久久久久影视 | 色a综合 | 色视频网站免费观看 | 波多野结衣在线中文字幕 | 9999毛片| www.玖玖玖 | 色99导航 | 久久激情综合 | 日韩精品视频一二三 | av中文字幕在线播放 | 久久综合精品国产一区二区三区 | 干亚洲少妇 | 日本女人的性生活视频 | 日日夜夜人人精品 | 亚洲欧美日韩一级 | 一区二区三区四区精品 | 久久久久亚洲精品 | 一区二区三区在线观看 | 精品亚洲视频在线观看 | 国产精品丝袜在线 | 激情www | 国产夫妻自拍av | 99爱国产精品 | 久久玖| 久久曰视频 | 超碰97免费 | 九色视频网 | 天天操夜夜干 | 97色免费视频 | 九九日九九操 | 中文字幕色在线视频 | 亚洲精品国精品久久99热 | 在线有码中文字幕 | 免费在线观看成年人视频 | 日韩av一区二区三区四区 | 日日射av | 992tv又爽又黄的免费视频 | 国产精品大片在线观看 | 天天激情站 | 久草爱视频 | 国产精品资源 | 超碰在线人人 | 欧美一级性生活视频 | 国产中文在线观看 | 国产成人精品久久久 | 国产视频网站在线观看 | 日韩毛片在线免费观看 | 国产精品久久久久一区二区三区共 | 日韩成人欧美 | 亚洲一级电影在线观看 | 97色se| 色婷婷亚洲婷婷 | 五月天狠狠操 | 夜夜爽夜夜操 | 国产精品原创视频 | 色香com.| 日日综合网 | av网站在线观看免费 | 四虎在线视频 | 亚洲特级毛片 | 亚洲免费小视频 | 久草免费看 | 五月天网页| 99在线免费观看视频 | 亚洲精品美女久久 | 丁香 婷婷 激情 | 日韩专区中文字幕 | 久久午夜国产 | 极品国产91在线网站 | 国产精品美女www爽爽爽视频 | 成人h电影在线观看 | 免费情趣视频 | 操处女逼| 2023av在线 | 91精品国产综合久久福利不卡 | 在线一区av | 色视频在线免费 | 国产精品日韩精品 | 日韩高清精品免费观看 | 国产精品18久久久久vr手机版特色 | 国产视频二区三区 | 精品一区二区在线观看 | a黄色影院 | 欧美aa在线 | 国产精品伦一区二区三区视频 | 91精品国产99久久久久久久 | 国产精品午夜在线 | av资源中文字幕 | 国产黄色特级片 | 综合在线观看色 | 正在播放日韩 | 日韩黄色在线电影 | 日韩中文字幕免费视频 | 精品九九久久 | 成人性生交视频 | 国产一区二区三区四区大秀 | 国产精品高潮久久av | 中文字幕 91 | 日韩高清观看 | 午夜精品一区二区三区可下载 | 国产91影院| 国产精品99久久久久人中文网介绍 | 日韩三区在线 | 国产裸体永久免费视频网站 | 黄色免费观看网址 | 国产精品99久久久久久久久久久久 | 亚洲老妇xxxxxx| 日韩在线国产 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 国产色拍拍拍拍在线精品 | 91传媒在线观看 | 91精品网站 | 久久久高清 | 亚洲午夜精品久久久 | 亚洲最大免费成人网 | 中文字幕亚洲高清 | 嫩草av影院| 最新91在线视频 | 亚洲视频第一页 | 经典三级一区 | 国产精品18久久久久久久久 | 国产日韩欧美视频在线观看 | 一区 在线 影院 | 色综合天天综合在线视频 | 国产少妇在线观看 | 偷拍福利视频一区二区三区 | 亚洲免费高清视频 | 97av超碰| 一级做a爱片性色毛片www | 丁香六月天 | 日韩欧美一区二区三区视频 | 一级黄色电影网站 | 综合精品久久久 | 国产高清av免费在线观看 | av不卡在线看 | 国产日韩精品在线 | 最近日本mv字幕免费观看 | 久久99精品久久久久蜜臀 | 色窝资源 | 亚洲精品videossex少妇 | 日韩在线观看第一页 | 亚洲精品一区二区三区新线路 | 精品国产乱码久久 | 91亚色视频在线观看 | 精品国产欧美一区二区三区不卡 | 成人在线播放视频 | 超碰人人舔 | 国产视频手机在线 | 五月开心婷婷网 | 99视频+国产日韩欧美 | 性色大片在线观看 | av在线免费网 | 热久久这里只有精品 | 国产精品久久av | 高清av免费看 | 免费看黄的| av导航福利 | 亚洲2019精品 | 天堂av观看 | 国产视频久久久 | 中文字幕视频播放 | 国产成人精品久 | 国产精品 中文在线 | 久久草草热国产精品直播 | 一区二区三区免费在线观看视频 | 成人香蕉视频 | 天海翼一区二区三区免费 | 视频一区久久 | 国产亚洲欧洲 | 91成人精品一区在线播放 | 久久97精品 | 91中文字幕永久在线 | 免费看成年人 | 成人一区二区三区中文字幕 | 成人免费在线播放 | 久久九九久久精品 | 五月黄色 | 97色在线观看 | 天天操天天操 | 亚洲 欧美 国产 va在线影院 | 久久久久免费精品视频 | 91看片麻豆 | 亚洲精品国久久99热 | 国产精品99久久久精品免费观看 | 久久免费视频这里只有精品 | 久久综合久久综合久久综合 | 正在播放国产精品 | 久久综合久久综合这里只有精品 | 色片网站在线观看 | 国产精品美女免费看 | 天堂视频中文在线 | 国产高清小视频 | 国产精品理论片在线播放 | 国产精品国产亚洲精品看不卡 | 国产精品丝袜久久久久久久不卡 | 国产手机精品视频 | 国产高清免费在线播放 | 夜夜夜| 成年人黄色免费视频 | 久久情网| 国产99久久精品一区二区永久免费 | 最新av网址在线观看 | 波多野结衣视频一区 | 99精品国产在热久久下载 | 丁香花在线视频观看免费 | 久久久久久久久久免费视频 | 精品91在线 | 亚洲特级毛片 | 中国一 片免费观看 | 亚洲成人免费 | www狠狠| 亚洲国产一区在线观看 | 国产精品va视频 | 摸阴视频| 在线电影中文字幕 | 国产成人精品女人久久久 | 四虎国产 | 7777精品伊人久久久大香线蕉 | 久草免费电影 | 天天草视频| 国产精品日韩在线播放 | 婷婷激情五月综合 | 欧美激情操| 欧美日韩在线观看不卡 | 高清av免费观看 | 亚洲 av网站| 九九热免费在线观看 | 99久久精品国产观看 | 国产福利在线免费观看 | 日韩精品免费在线观看 | 亚洲免费一级电影 | 久草在线播放视频 | 1024在线看片 | 中文字幕在线看视频国产 | 中文字幕视频三区 | 91丨九色丨勾搭 | 中文字幕乱在线伦视频中文字幕乱码在线 | 超碰com | 成人午夜剧场在线观看 | 伊色综合久久之综合久久 | 久久精品xxx | 国产资源在线视频 | 婷婷六月网 | 激情婷婷丁香 | 精品在线一区二区三区 | av线上看 | 亚洲精品88欧美一区二区 | 成年人免费在线观看 | 97色在线观看免费视频 | 久草免费在线视频观看 | 欧美经典久久 | 午夜久久 | 六月婷操 | 日韩另类在线 | 一级黄色网址 | 欧美色噜噜 | 四虎永久免费在线观看 | 国产精品一区在线 | 伊人天天综合 | 免费黄色网址大全 | 成人黄色小说视频 | 97人人超碰在线 | 黄色软件在线看 | 五月天中文字幕 | 五月天高清欧美mv | 久久一二三四 | 美女视频网 | 国产在线第三页 | 在线观看国产亚洲 | 免费在线色电影 | 亚洲欧美激情精品一区二区 | 嫩模bbw搡bbbb搡bbbb | 天天综合视频在线观看 | 久久免费视频6 | av中文字幕在线观看网站 | 日韩欧美第二页 | 中文在线8资源库 | 国产欧美日韩视频 | 国产女做a爱免费视频 | 96视频免费在线观看 | av久久久久久 | 精品中文字幕在线观看 | 四虎8848免费高清在线观看 | 免费观看十分钟 | 天天干天天射天天插 | 日韩在线观看一区二区三区 | 国产精品免费一区二区三区 | 天天综合操 | 在线观看成人小视频 | 久久精品视频免费观看 | 精品一区二区三区香蕉蜜桃 | 亚洲年轻女教师毛茸茸 | 精品国产一区二区久久 | av在线网站大全 | 久久黄色影视 | 最近日本韩国中文字幕 | 天天综合成人网 | 国产成人在线看 | 欧美 日韩精品 | 亚洲少妇自拍 | 中文在线天堂资源 | 成人av教育 | 成人黄色片免费看 | 国产精品毛片一区视频播 | 亚洲国产美女久久久久 | 欧美资源| 久久免费在线观看视频 | 亚洲视频电影在线 | 最近能播放的中文字幕 | 91网在线看 | 久久久久久中文字幕 | 在线国产日本 | 一级一片免费视频 | 日韩免费在线观看网站 | 黄色成人小视频 | 国产日产精品一区二区三区四区的观看方式 | 日日爱999| 在线观看v片 | 久久乐九色婷婷综合色狠狠182 | 丁香婷婷综合五月 | 亚洲第一香蕉视频 | 91精品国产成人www | 欧美韩日视频 | 怡红院av久久久久久久 | 国产综合91 | 91日韩精品 | 五月婷婷天堂 | 韩日视频在线 | 99久国产| 国产精品18久久久久久久久 | 国产精品高 | 91九色蝌蚪视频网站 | 国产又粗又猛又黄又爽的视频 | 国产青青青 | 国产精品18久久久久白浆 | 国产精品高清免费在线观看 | 91香蕉视频好色先生 | 美女黄频在线观看 | 国产高清久久 | 国产精品美女久久久久久2018 | 国产韩国日本高清视频 | 亚洲精品在线国产 | 国产一区二区在线影院 | 又大又硬又黄又爽视频在线观看 | 中文在线免费看视频 | 日日碰狠狠躁久久躁综合网 | 国产99久久久国产精品免费二区 | 91视频中文字幕 | 人人射av | 最近中文字幕国语免费高清6 | 亚洲精品欧美成人 | 国产短视频在线播放 | 美女视频国产 | av一本久道久久波多野结衣 | 一区二区三区在线免费 | 久久99久久99免费视频 | 一区精品在线 | 欧美大片aaa| 国产精品视频永久免费播放 | 麻豆系列在线观看 | 久久99偷拍视频 | 黄色免费视频在线观看 | 亚洲国产精品久久久久 | 中文字幕在线观看免费高清完整版 | 久久久 精品 | 波多野结衣视频一区二区三区 | 超碰在线免费福利 | 精品福利国产 | 久草在线视频看看 | 在线观看成人网 | 黄色网址国产 | 激情视频免费观看 | 91完整版在线观看 | 97免费在线观看视频 | 亚洲精品美女久久 | 国产精品久久久久久久久软件 | 色网站在线免费 | 最新av电影网站 | 欧美日韩国产三级 | av一区二区三区在线观看 | 午夜视频在线观看一区二区 | 欧美 另类 交 | 亚洲少妇自拍 | 精品国产伦一区二区三区观看方式 | 91精品少妇偷拍99 | 在线免费国产视频 | 免费精品视频在线观看 | 91久久影院| 亚洲天天在线日亚洲洲精 | 精品一区欧美 | 国内精品久久久久影院日本资源 | 天天色综合三 | 中文字幕大全 | 欧美a性 | 亚洲第一久久久 | 最近免费中文字幕mv在线视频3 | 国产99亚洲 | 天天操天天干天天 | 婷婷深爱 | 国产高清不卡一区二区三区 | 日日爽 | 九色91av | 日韩午夜精品 | 成人av电影免费在线播放 | 色黄www小说| 天天色综合1 | 丁香婷婷久久久综合精品国产 | av黄色在线播放 | 日本最新一区二区三区 | 欧美激情综合五月色丁香小说 | 欧美一级免费黄色片 | 国产精品99久久久久久宅男 | 国产小视频91 | 成人h动漫在线看 | 欧美精品亚洲二区 | 久久久午夜视频 | 黄色av网站在线观看 | 天天操夜夜操夜夜操 | 中文字幕美女免费在线 | 亚洲国产黄色片 | 国产一卡二卡四卡国 | 亚洲伦理中文字幕 | 久久久国产在线视频 | 99草视频在线观看 | 日韩区欧美久久久无人区 | 久久香蕉国产 | 少妇搡bbb | 香蕉视频在线免费 | 日韩精品中文字幕有码 | 一区二区影院 | 91精品国产99久久久久 | 麻豆成人小视频 | 91九色视频导航 | 黄色在线看网站 | 久草视频在线免费看 | 亚洲欧洲久久久 | 久久夜色精品国产欧美乱 | 婷婷国产一区二区三区 | 午夜精品久久久久久久99婷婷 | 欧美人人爱 | 国产黄色大片 | 日韩av网站在线播放 | 超碰在线98 | 中文字幕韩在线第一页 | 在线成人看片 | www.五月天婷婷.com | 国产一级性生活视频 | 久草网视频 | 欧美性免费 | 日本精品免费看 | 天天射天天操天天干 | 操少妇视频 | 国产又粗又猛又爽又黄的视频免费 | 最近日本韩国中文字幕 | 国产精品网站 | 成人夜晚看av | 波多野结衣视频一区二区三区 | 精品在线观看免费 | 国产精品综合久久 | 国产亚洲精品v | 狠狠干干| 午夜少妇一区二区三区 | 欧美色图另类 | 免费黄色av片| 久久精品美女视频 | 久产久精国产品 | 91精品国产91p65 | 99精品视频免费观看视频 | 色偷偷88欧美精品久久久 | 国产综合精品一区二区三区 | 午夜在线观看一区 | 欧美一级电影在线观看 | 免费合欢视频成人app | 亚洲精品视频免费 | 国产成人久久77777精品 | 久久久免费在线观看 | 国产区 在线 | 九九涩涩av台湾日本热热 | 青青色影院 | 色91在线视频 | 激情婷婷网| 91人人揉日日捏人人看 | 日韩资源在线观看 | 久久精品之 | 狠狠综合网 | av高清不卡 | 五月天久久久 | 久久精品资源 | 精品国产一区二区三区久久久久久 | 国产在线污 | 免费精品久久久 | 少妇做爰k8经典 | 国产精品福利在线观看 | 91三级在线观看 | 日韩欧美aaa | av在线超碰 | 91视频这里只有精品 | 日日爽天天 | 国产精品久久久久久久久免费看 | 97免费在线观看 | 狠狠色狠狠色 | 国产最新网站 | 伊人va | 免费视频你懂得 | 97操操操 | 天天干天天射天天爽 | 欧美老人xxxx18 | 亚洲综合视频在线 | h视频日本 | 欧洲精品码一区二区三区免费看 | 欧洲视频一区 | 精品国产成人av | 亚洲精品欧美精品 | 人人射人人插 |