日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Stream 体系及原理介绍

發布時間:2025/4/5 javascript 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Stream 体系及原理介绍 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg

Spring Cloud Stream?在 Spring Cloud 體系內用于構建高度可擴展的基于事件驅動的微服務,其目的是為了簡化消息在 Spring Cloud 應用程序中的開發。

?

Spring Cloud Stream?(后面以 SCS 代替 Spring Cloud Stream)?本身內容很多,而且它還有很多外部的依賴,想要熟悉 SCS,必須要先了解 Spring Messaging 和 Spring Integration 這兩個項目,接下來,文章將從圍繞以下三點進行展開:

?

  • 什么是 Spring Messaging;

  • 什么是 Spring Integration;

  • 什么是 SCS 體系及其原理;

?

Spring Messaging


Spring Messaging 是 Spring Framework 中的一個模塊,其作用就是統一消息的編程模型。

  • 比如消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header:

?

package?org.springframework.messaging;
public?interface?Message<T>?{
? ? T?getPayload();
? ? MessageHeaders?getHeaders();
}

  • 消息通道 MessageChannel 用于接收消息,調用 send 方法可以將消息發送至該消息通道中 :

?

@FunctionalInterface
public?interface?MessageChannel?{
? ? long?INDEFINITE_TIMEOUT?=?-1;
? ? default?boolean?send(Message<?>?message) {

? ? ? ? ?return?send(message,?INDEFINITE_TIMEOUT);

? ? ?}
? ? ?boolean?send(Message<?>?message,?long?timeout);
}

消息通道里的消息如何被消費呢?
  • 由消息通道的子接口可訂閱的消息通道 SubscribableChannel 實現,被 MessageHandler 消息處理器所訂閱:

public?interface?SubscribableChannel?extends?MessageChannel?{
? ? boolean?subscribe(MessageHandler?handler);
? ? boolean?unsubscribe(MessageHandler?handler);
}

  • 由MessageHandler 真正地消費/處理消息:

@FunctionalInterface
public?interface?MessageHandler?{
? ? void?handleMessage(Message<?>?message)?throws?MessagingException;
}

Spring Messaging 內部在消息模型的基礎上衍生出了其它的一些功能,如:

1. 消息接收參數及返回值處理:消息接收參數處理器 HandlerMethodArgumentResolver 配合 @Header, @Payload 等注解使用;消息接收后的返回值處理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;

2. 消息體內容轉換器 MessageConverter;

3. 統一抽象的消息發送模板 AbstractMessageSendingTemplate;

4. 消息通道攔截器 ChannelInterceptor;

?

Spring Integration


Spring Integration 提供了 Spring 編程模型的擴展用來支持企業集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。

它提出了不少新的概念,包括消息路由 MessageRoute、消息分發 MessageDispatcher、消息過濾 Filter、消息轉換 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同時還提供了 MessageChannel 和MessageHandler 的實現,分別包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel 和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等內容。

這里為大家介紹幾種消息的處理方式:
  • 消息的分割:

?

  • 消息的聚合:

?

?

  • 消息的過濾:

?

  • 消息的分發:

?

?

接下來,我們以一個最簡單的例子來嘗試一下 Spring Integration:

這段代碼解釋為:

?

SubscribableChannel messageChannel =new DirectChannel(); // 1

messageChannel.subscribe(msg-> { // 2
?System.out.println("receive: " +msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3

?

1. 構造一個可訂閱的消息通道 messageChannel;

2. 使用 MessageHandler 去消費這個消息通道里的消息;

3. 發送一條消息到這個消息通道,消息最終被消息通道里的 MessageHandler 所消費。

最后控制臺打印出: receive: msg from alibaba;

DirectChannel 內部有個 UnicastingDispatcher 類型的消息分發器,會分發到對應的消息通道 MessageChannel 中,從名字也可以看出來,UnicastingDispatcher 是個單播的分發器,只能選擇一個消息通道。那么如何選擇呢? 內部提供了 LoadBalancingStrategy 負載均衡策略,默認只有輪詢的實現,可以進行擴展。

我們對上段代碼做一點修改,使用多個 MessageHandler 去處理消息:

SubscribableChannel?messageChannel?=?new?DirectChannel();

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

由于 DirectChannel 內部的消息分發器是 UnicastingDispatcher 單播的方式,并且采用輪詢的負載均衡策略,所以這里兩次的消費分別對應這兩個 MessageHandler。控制臺打印出:

receive1: msg from alibaba
receive2: msg from alibaba

既然存在單播的消息分發器 UnicastingDispatcher,必然也會存在廣播的消息分發器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 這個消息通道所使用。廣播消息分發器會把消息分發給所有的 MessageHandler:

SubscribableChannel?messageChannel?=?new?PublishSubscribeChannel();

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});

messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

發送兩個消息,都被所有的 MessageHandler 所消費。控制臺打印:

receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba

?

Spring Cloud Stream


SCS與各模塊之間的關系是:

  • SCS 在 Spring Integration 的基礎上進行了封裝,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;

  • SCS 與 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;

  • SCS 與 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置類;

  • SCS 增強了消息發送失敗的和消費失敗情況下的處理邏輯等功能。

  • SCS 是 Spring Integration 的加強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現細節,希望以統一的一套 API 來進行消息的發送/消費,底層消息中間件的實現細節由各消息中間件的 Binder 完成。

Binder 是提供與外部消息中間件集成的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用于構造生產者和消費者。目前官方的實現有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現了 RocketMQ Binder。

?

從圖中可以看出,Binding 是連接應用程序跟消息中間件的橋梁,用于消息的消費和生產。我們來看一個最簡單的使用 RocketMQ Binder 的例子,然后分析一下它的底層處理原理:

  • 啟動類及消息的發送:

@SpringBootApplication
@EnableBinding({?Source.class,?Sink.class?})?// 1
public?class?SendAndReceiveApplication?{
?
? ? public?static?void?main(String[]?args) {
? ? ? ? SpringApplication.run(SendAndReceiveApplication.class,?args);
? ? }
?
? ? ? ?@Bean?// 2
? ? public?CustomRunner?customRunner() {
? ? ? ? return?new?CustomRunner();
? ? }

? ? public?static?class?CustomRunner?implements?CommandLineRunner?{

? ? ? ? @Autowired
? ? ? ? private?Source?source;

? ? ? ? @Override
? ? ? ? public?void?run(String...?args)?throws?Exception?{
? ? ? ? ? ? int?count?=?5;
? ? ? ? ? ? for?(int?index?=?1;?index?<=?count;?index++) {
? ? ? ? ? ? ? ? source.output().send(MessageBuilder.withPayload("msg-"?+?index).build());?// 3
? ? ? ? ? ? }
? ? ? ? }
? ? }
}

  • 消息的接收:

@Service
public?class?StreamListenerReceiveService?{

? ? @StreamListener(Sink.INPUT)?// 4
? ? public?void?receiveByStreamListener1(String?receiveMsg) {
? ? ? ? System.out.println("receiveByStreamListener: "?+?receiveMsg);
? ? }

}

這段代碼很簡單,沒有涉及到 RocketMQ 相關的代碼,消息的發送和接收都是基于 SCS 體系完成的。如果想切換成 RabbitMQ 或 Kafka,只需修改配置文件即可,代碼無需修改。

我們來分析下這段代碼的原理:

?

1.?@EnableBinding?對應的兩個接口屬性?Source?和?Sink?是 SCS 內部提供的。SCS 內部會基于?Source?和?Sink?構造?BindableProxyFactory,且對應的 output 和 input 方法返回的 MessageChannel 是?DirectChannel。output 和 input 方法修飾的注解對應的 value 是配置文件中 binding 的 name。

public?interface?Source?{
? ? String?OUTPUT?=?"output";
? ? @Output(Source.OUTPUT)
? ? MessageChannel?output();
}
public?interface?Sink?{
? ? String?INPUT?=?"input";
? ? @Input(Sink.INPUT)
? ? SubscribableChannel?input();
}

配置文件里 bindings 的 name 為 output 和 input,對應 Source 和 Sink 接口的方法上的注解里的 value:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1

2. 構造 CommandLineRunner,程序啟動的時候會執行 CustomRunner 的 run 方法。

3. 調用 Source 接口里的 output 方法獲取 DirectChannel,并發送消息到這個消息通道中。這里跟之前 Spring Integration 章節里的代碼一致。

  • Source 里的 output 發送消息到 DirectChannel 消息通道之后會被 AbstractMessageChannelBinder#SendingHandler 這個 MessageHandler 處理,然后它會委托給 AbstractMessageChannelBinder#createProducerMessageHandler 創建的 MessageHandler 處理(該方法由不同的消息中間件實現);

  • 不同的消息中間件對應的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 內部會把 Spring Message 轉換成對應中間件的 Message 模型并發送到對應中間件的 broker;

4. 使用 @StreamListener 進行消息的訂閱。請注意,注解里的 Sink.input 對應的值是 "input",會根據配置文件里 binding 對應的 name 為 input 的值進行配置:

  • 不同的消息中間件對應的 AbstractMessageChannelBinder#createConsumerEndpoint 方法會使用 Consumer 訂閱消息,訂閱到消息后內部會把中間件對應的 Message 模型轉換成 Spring Message;

  • 消息轉換之后會把 Spring Message 發送至 name 為 input 的消息通道中;

  • @StreamListener 對應的 StreamListenerMessageHandler 訂閱了 name 為 input 的消息通道,進行了消息的消費;

這個過程文字描述有點啰嗦,用一張圖總結一下(黃色部分涉及到各消息中間件的 Binder 實現以及 MQ 基本的訂閱發布功能):

?

SCS 章節的最后,我們來看一段 SCS 關于消息的處理方式的一段代碼:

@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='1'")
public?void?receiveByHeader(Message?msg) {
? ? ?System.out.println("receive by headers['index']=='1': "?+?msg);
}

@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='9999'")
public?void?receivePerson(@Payload?Person?person) {
? ? ?System.out.println("receive Person: "?+?person);
}

@StreamListener(value?=?Sink.INPUT)
public?void?receiveAllMsg(String?msg) {
? ? ?System.out.println("receive allMsg by StreamListener. content: "?+?msg);
}

@StreamListener(value?=?Sink.INPUT)
public?void?receiveHeaderAndMsg(@Header("index")?String?index,?Message?msg) {
? ? ?System.out.println("receive by HeaderAndMsg by StreamListener. content: "?+?msg);
}

有沒有發現這段代碼跟 Spring MVC Controller 中接收請求的代碼很像? 實際上他們的架構都是類似的,Spring MVC 對于 Controller 中參數和返回值的處理類分別是org.springframework.web.method.support.HandlerMethodArgumentResolver、 org.springframework.web.method.support.HandlerMethodReturnValueHandler。

Spring Messaging 中對于參數和返回值的處理類之前也提到過,分別是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler。

它們的類名一模一樣,甚至內部的方法名也一樣。

?

總結


?

上圖是 SCS 體系相關類說明的總結,關于 SCS 以及 RocketMQ Binder 更多相關的示例,可以參考 RocketMQ Binder Demos(Demos 地址:點擊“閱讀原文”),包含了消息的聚合、分割、過濾;消息異常處理;消息標簽、SQL過濾;同步、異步消費等等。

下一篇文章,我們將分析消息總線(Spring Cloud Bus) 在 Spring Cloud 體系中的作用,并逐步展開,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何實現 Spring Cloud Stream 標準的。

轉載于:https://www.cnblogs.com/davidwang456/articles/10653269.html

總結

以上是生活随笔為你收集整理的Spring Cloud Stream 体系及原理介绍的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 免费在线观看成年人视频 | 91精品一区二区三区综合在线爱 | 亚洲欧美一区二区精品久久久 | 极品女神无套呻吟啪啪 | 毛片在线免费 | 一道本久久 | 91人人澡人人爽人人精品 | 无码毛片aaa在线 | 久热色 | av网站网址 | 亚洲乱码一区二区 | 在线成人小视频 | 91精品免费视频 | 朝鲜一级黄色片 | 国产后入又长又硬 | 午夜国产一区二区三区 | 欧美不卡视频 | 国产一线二线三线女 | 日日夜夜操av | 91精品久久久久久久99蜜桃 | 黄色日b片 | 影音先锋中文在线 | 国产精品黄色在线观看 | 在线看片成人 | 住在隔壁的她动漫免费观看全集下载 | 鲁一鲁一鲁一鲁一av | 午夜精品久久久久久久99 | 三级av在线播放 | 男女爱爱福利视频 | www.亚洲人| 日本不卡视频在线观看 | 香蕉国产| 欧美三级视频在线 | 色乱码一区二区三区网站 | 一级片啪啪 | 大胸美女无遮挡 | 免费看黄色一级片 | 麻豆成人免费视频 | 国产一区在线观看免费 | 国产在线成人精品午夜 | 福利国产视频 | 成人一级网站 | 人妻久久一区二区三区 | 久久嗨| aa一级片 | 僵尸叔叔在线观看国语高清免费观看 | 亚洲超碰在线观看 | 久久精品这里 | 日韩成人在线免费观看 | 日产欧产va高清 | 男男做爰猛烈啪啪高 | 日韩一级视频在线观看 | 精品久久久无码中文字幕边打电话 | 成人h网站| 中文精品一区二区三区 | 波多野一区二区三区 | 亚洲爽爆 | 国产精品亚洲视频 | 又黄又免费的网站 | 小柔好湿好紧太爽了国产网址 | 在线观看网页视频 | 午夜精品久久久久久久久久 | 午夜免费在线观看 | 亚洲a中文字幕 | 成年人在线播放视频 | 一本色道久久加勒比精品 | 久草操 | 久久精品国产99国产精品 | 三级av免费 | 久久成人在线 | 少妇精品无码一区二区三区 | 大地资源二中文在线影视免费观看 | 久久午夜网 | 在线观看免费看片 | 日韩大片av | 欧美人喂奶吃大乳 | 日本国产一级片 | 日韩欧美一区二区三区免费观看 | 亚洲精品日韩综合观看成人91 | 免费毛片一区二区三区 | 亚洲精品aaaaa | 国产精品自偷自拍 | 中文字幕在线免费看线人 | 韩国美女被c | 国产高清亚洲 | 久久网免费视频 | 国产日日操 | 九九热精品视频在线观看 | 国产视频手机在线 | 在线观看日韩视频 | 久久久久久久久久久久久久久久久久久 | 欧美一级爱爱视频 | 精品人妻一区二区三区四区在线 | 少妇高潮一区二区三区四区 | 中文字字幕在线中文乱码 | 黄色大片视频 | www.亚洲一区 | 天天干天天插天天射 | 欧美日韩黄色一级片 |