日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Cloud Bus 消息总线介绍

發(fā)布時(shí)間:2025/3/20 javascript 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Cloud Bus 消息总线介绍 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

作者 | 洛夜
來源 | 阿里巴巴云原生公眾號(hào)

在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ 系列文章:

  • 《如何在 Spring 生態(tài)中玩轉(zhuǎn) RocketMQ?》
  • 《羅美琪和春波特的故事…》
  • 《RocketMQ-Spring 畢業(yè)兩周年,為什么能成為 Spring 生態(tài)中最受歡迎的 messaging 實(shí)現(xiàn)?》
  • 《使用 rocketmq-spring-boot-starter 來配置、發(fā)送和消費(fèi) RocketMQ 消息》
  • 《Spring Cloud Stream 體系及原理介紹》

本文配套可交互教程已登錄阿里云知行動(dòng)手實(shí)驗(yàn)室,PC 端登錄 start.aliyun.com 在瀏覽器中立即體驗(yàn)

Spring Cloud Bus 對(duì)自己的定位是 Spring Cloud 體系內(nèi)的消息總線,使用 message broker 來連接分布式系統(tǒng)的所有節(jié)點(diǎn)。Bus 官方的 Reference 文檔?比較簡(jiǎn)單,簡(jiǎn)單到連一張圖都沒有。

這是最新版的 Spring Cloud Bus 代碼結(jié)構(gòu)(代碼量比較少):

Bus 實(shí)例演示

在分析 Bus 的實(shí)現(xiàn)之前,我們先來看兩個(gè)使用 Spring Cloud Bus 的簡(jiǎn)單例子。

1. 所有節(jié)點(diǎn)的配置新增

Bus 的例子比較簡(jiǎn)單,因?yàn)?Bus 的 AutoConfiguration 層都有了默認(rèn)的配置,只需要引入消息中間件對(duì)應(yīng)的 Spring Cloud Stream 以及 Spring Cloud Bus 依賴即可,之后所有啟動(dòng)的應(yīng)用都會(huì)使用同一個(gè) Topic 進(jìn)行消息的接收和發(fā)送。

Bus 對(duì)應(yīng)的 Demo 已經(jīng)放到了 github 上,?該 Demo 會(huì)模擬啟動(dòng) 5 個(gè)節(jié)點(diǎn),只需要對(duì)其中任意的一個(gè)實(shí)例新增配置項(xiàng),所有節(jié)點(diǎn)都會(huì)新增該配置項(xiàng)。

Demo 地址:https://github.com/fangjian0423/rocketmq-binder-demo/tree/master/rocketmq-bus-demo

訪問任意節(jié)點(diǎn)提供的 Controller 提供的獲取配置的地址(key 為hangzhou):

curl -X GET 'http://localhost:10001/bus/env?key=hangzhou'

所有節(jié)點(diǎn)返回的結(jié)果都是 unknown,因?yàn)樗泄?jié)點(diǎn)的配置中沒有hangzhou這個(gè) key。

Bus 內(nèi)部提供了EnvironmentBusEndpoint這個(gè) Endpoint 通過 message broker 用來新增/更新配置。

訪問任意節(jié)點(diǎn)該 Endpoint?對(duì)應(yīng)的?url:?/actuator/bus-env?name=hangzhou&value=alibaba 進(jìn)行配置項(xiàng)的新增(比如訪問 node1 的url):

curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json'

然后再次訪問所有節(jié)點(diǎn)/bus/env獲取配置:

$ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' unknown% ~ ? $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' unknown% ~ ? $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' unknown% ~ ? $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' unknown% ~ ? $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' unknown% ~ ? $ curl -X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba' -H 'content-type: application/json' ~ ? $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' alibaba%

可以看到,所有節(jié)點(diǎn)都新增了一個(gè) key 為hangzhou的配置,且對(duì)應(yīng)的 value 是alibaba。這個(gè)配置項(xiàng)是通過 Bus 提供的?EnvironmentBusEndpoint 完成的。

這里引用 程序猿DD 畫的一張圖片,Spring Cloud Config 配合 Bus 完成所有節(jié)點(diǎn)配置的刷新來描述之前的實(shí)例(本文實(shí)例不是刷新,而是新增配置,但是流程是一樣的):

2. 部分節(jié)點(diǎn)的配置修改

比如在 node1 上指定 destination 為 rocketmq-bus-node2 ( node2 配置了 spring.cloud.bus.id 為rocketmq-bus-node2:10002,可以匹配上) 進(jìn)行配置的修改:

curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json'

訪問/bus/env 獲取配置(由于在 node1 上發(fā)送消息,Bus 也會(huì)對(duì)發(fā)送方的節(jié)點(diǎn) node1 進(jìn)行配置修改):

~ ? $ curl -X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu' -H 'content-type: application/json' ~ ? $ curl -X GET 'http://localhost:10005/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10004/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10003/bus/env?key=hangzhou' alibaba% ~ ? $ curl -X GET 'http://localhost:10002/bus/env?key=hangzhou' xihu% ~ ? $ curl -X GET 'http://localhost:10001/bus/env?key=hangzhou' xihu%

可以看到,只有 node1 和 node2 修改了配置,其余的 3 個(gè)節(jié)點(diǎn)配置未改變。

Bus 的實(shí)現(xiàn)

1. Bus 概念介紹

1)事件

Bus 中定義了遠(yuǎn)程事件RemoteApplicationEvent,該事件繼承了 Spring 的事件ApplicationEvent,而且它目前有 4 個(gè)具體的實(shí)現(xiàn):

  • EnvironmentChangeRemoteApplicationEvent:遠(yuǎn)程環(huán)境變更事件。主要用于接收一個(gè) Map<String,String> 類型的數(shù)據(jù)并更新到 Spring 上下文中?Environment 中的事件。文中的實(shí)例就是使用這個(gè)事件并配合?EnvironmentBusEndpoint 和?EnvironmentChangeListener 完成的。
  • AckRemoteApplicationEvent:遠(yuǎn)程確認(rèn)事件。Bus 內(nèi)部成功接收到遠(yuǎn)程事件后會(huì)發(fā)送回AckRemoteApplicationEvent確認(rèn)事件進(jìn)行確認(rèn)。
  • RefreshRemoteApplicationEvent: 遠(yuǎn)程配置刷新事件。配合?@RefreshScope 以及所有的?@ConfigurationProperties注解修飾的配置類的動(dòng)態(tài)刷新。
  • UnknownRemoteApplicationEvent:遠(yuǎn)程未知事件。Bus 內(nèi)部消息體進(jìn)行轉(zhuǎn)換遠(yuǎn)程事件的時(shí)候如果發(fā)生異常會(huì)統(tǒng)一包裝成該事件。

Bus 內(nèi)部還存在一個(gè)非RemoteApplicationEvent事件 -SentApplicationEvent消息發(fā)送事件,配合 Trace 進(jìn)行遠(yuǎn)程消息發(fā)送的記錄。

這些事件會(huì)配合ApplicationListener進(jìn)行操作,比如EnvironmentChangeRemoteApplicationEvent配了EnvironmentChangeListener進(jìn)行配置的新增/修改:

public class EnvironmentChangeListenerimplements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {private static Log log = LogFactory.getLog(EnvironmentChangeListener.class);@Autowiredprivate EnvironmentManager env;@Overridepublic void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {Map<String, String> values = event.getValues();log.info("Received remote environment change request. Keys/values to update "+ values);for (Map.Entry<String, String> entry : values.entrySet()) {env.setProperty(entry.getKey(), entry.getValue());}} }

收到其它節(jié)點(diǎn)發(fā)送來EnvironmentChangeRemoteApplicationEven事件之后調(diào)用EnvironmentManager#setProperty進(jìn)行配置的設(shè)置,該方法內(nèi)部針對(duì)每一個(gè)配置項(xiàng)都會(huì)發(fā)送一個(gè)EnvironmentChangeEvent事件,然后被ConfigurationPropertiesRebinder所監(jiān)聽,進(jìn)行 rebind 操作新增/更新配置。

2)Actuator Endpoint

Bus 內(nèi)部暴露了 2 個(gè) Endpoint,分別是EnvironmentBusEndpoint和RefreshBusEndpoint,進(jìn)行配置的新增/修改以及全局配置刷新。它們對(duì)應(yīng)的 Endpoint id 即 url 是?bus-env和bus-refresh。

3)配置

Bus 對(duì)于消息的發(fā)送必定涉及到 Topic、Group 之類的信息,這些內(nèi)容都被封裝到了BusProperties中,其默認(rèn)的配置前綴為spring.cloud.bus,比如:

  • spring.cloud.bus.refresh.enabled用于開啟/關(guān)閉全局刷新的 Listener。
  • spring.cloud.bus.env.enabled 用于開啟/關(guān)閉配置新增/修改的 Endpoint。
  • spring.cloud.bus.ack.enabled 用于開啟開啟/關(guān)閉AckRemoteApplicationEvent事件的發(fā)送。
  • spring.cloud.bus.trace.enabled 用于開啟/關(guān)閉息記錄 Trace 的 Listener。

消息發(fā)送涉及到的 Topic 默認(rèn)用的是springCloudBus,可以配置進(jìn)行修改,Group 可以設(shè)置成廣播模式或使用 UUID 配合 offset 為 lastest 的模式。

每個(gè) Bus 應(yīng)用都有一個(gè)對(duì)應(yīng)的 Bus id,官方取值方式較復(fù)雜:

KaTeX parse error: Expected '}', got 'EOF' at end of input: …plication.name:{spring.application.name:application}}:KaTeX parse error: Expected '}', got 'EOF' at end of input: …instance_index:{spring.application.index:KaTeX parse error: Expected '}', got 'EOF' at end of input: …al.server.port:{server.port:0}}}}:KaTeX parse error: Expected '}', got 'EOF' at end of input: …on.instance_id:{random.value}}

建議手動(dòng)配置 Bus id,因?yàn)?Bus 遠(yuǎn)程事件中的 destination 會(huì)根據(jù) Bus id 進(jìn)行匹配:

spring.cloud.bus.id=${spring.application.name}-${server.port}

2. Bus 底層分析

Bus 的底層分析無非牽扯到這幾個(gè)方面:

  • 消息是如何發(fā)送的
  • 消息是如何接收的
  • destination 是如何匹配的
  • 遠(yuǎn)程事件收到后如何觸發(fā)下一個(gè) action

BusAutoConfiguration自動(dòng)化配置類被@EnableBinding(SpringCloudBusClient.class)所修飾。

@EnableBinding的用法在文章《Spring Cloud Stream 體系及原理介紹》中已經(jīng)說明,且它的 value 為SpringCloudBusClient.class,會(huì)在SpringCloudBusClient中基于代理創(chuàng)建出 input 和 output 的DirectChannel:

public interface SpringCloudBusClient {String INPUT = "springCloudBusInput";String OUTPUT = "springCloudBusOutput";@Output(SpringCloudBusClient.OUTPUT)MessageChannel springCloudBusOutput();@Input(SpringCloudBusClient.INPUT)SubscribableChannel springCloudBusInput(); }

springCloudBusInput 和 springCloudBusOutput 這兩個(gè) Binding 的屬性可以通過配置文件進(jìn)行修改(比如修改 topic):

spring.cloud.stream.bindings:springCloudBusInput:destination: my-bus-topicspringCloudBusOutput:destination: my-bus-topic

消息的接收和發(fā)送:

// BusAutoConfiguration @EventListener(classes = RemoteApplicationEvent.class) // 1 public void acceptLocal(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event)&& !(event instanceof AckRemoteApplicationEvent)) { // 2this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); // 3} } @StreamListener(SpringCloudBusClient.INPUT) // 4 public void acceptRemote(RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)&& this.applicationEventPublisher != null) { // 5this.applicationEventPublisher.publishEvent(event);}// If it's an ACK we are finished processing at this pointreturn;}if (this.serviceMatcher.isForSelf(event)&& this.applicationEventPublisher != null) { // 6if (!this.serviceMatcher.isFromSelf(event)) { // 7this.applicationEventPublisher.publishEvent(event);}if (this.bus.getAck().isEnabled()) { // 8AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,this.serviceMatcher.getServiceId(),this.bus.getAck().getDestinationService(),event.getDestinationService(), event.getId(), event.getClass());this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());this.applicationEventPublisher.publishEvent(ack);}}if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // 9// We are set to register sent events so publish it for local consumption,// irrespective of the originthis.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,event.getOriginService(), event.getDestinationService(),event.getId(), event.getClass()));} }
  • 利用 Spring 事件的監(jiān)聽機(jī)制監(jiān)聽本地所有的RemoteApplicationEvent遠(yuǎn)程事件(比如bus-env會(huì)在本地發(fā)送EnvironmentChangeRemoteApplicationEvent事件,bus-refresh會(huì)在本地發(fā)送RefreshRemoteApplicationEvent事件,這些事件在這里都會(huì)被監(jiān)聽到)。

  • 判斷本地接收到的事件不是AckRemoteApplicationEvent遠(yuǎn)程確認(rèn)事件(不然會(huì)死循環(huán),一直接收消息,發(fā)送消息…)以及該事件是應(yīng)用自身發(fā)送出去的(事件發(fā)送方是應(yīng)用自身),如果都滿足執(zhí)行步驟 3。

  • 構(gòu)造 Message 并將該遠(yuǎn)程事件作為 payload,然后使用 Spring Cloud Stream 構(gòu)造的 Binding name 為 springCloudBusOutput 的 MessageChannel 將消息發(fā)送到 broker。

  • 4.@StreamListener注解消費(fèi) Spring Cloud Stream 構(gòu)造的 Binding name 為 springCloudBusInput 的 MessageChannel,接收的消息為遠(yuǎn)程消息。

  • 如果該遠(yuǎn)程事件是AckRemoteApplicationEvent遠(yuǎn)程確認(rèn)事件并且應(yīng)用開啟了消息追蹤 trace 開關(guān),同時(shí)該遠(yuǎn)程事件不是應(yīng)用自身發(fā)送的(事件發(fā)送方不是應(yīng)用自身,表示事件是其它應(yīng)用發(fā)送過來的),那么本地發(fā)送AckRemoteApplicationEvent遠(yuǎn)程確認(rèn)事件表示應(yīng)用確認(rèn)收到了其它應(yīng)用發(fā)送過來的遠(yuǎn)程事件,流程結(jié)束。

  • 如果該遠(yuǎn)程事件是其它應(yīng)用發(fā)送給應(yīng)用自身的(事件的接收方是應(yīng)用自身),那么進(jìn)行步驟 7 和 8,否則執(zhí)行步驟 9。

  • 該遠(yuǎn)程事件不是應(yīng)用自身發(fā)送(事件發(fā)送方不是應(yīng)用自身)的話,將該事件以本地的方式發(fā)送出去。應(yīng)用自身一開始已經(jīng)在本地被對(duì)應(yīng)的消息接收方處理了,無需再次發(fā)送。

  • 如果開啟了AckRemoteApplicationEvent遠(yuǎn)程確認(rèn)事件的開關(guān),構(gòu)造AckRemoteApplicationEvent事件并在遠(yuǎn)程和本地都發(fā)送該事件(本地發(fā)送是因?yàn)椴襟E 5 沒有進(jìn)行本地AckRemoteApplicationEvent事件的發(fā)送,也就是自身應(yīng)用對(duì)自身應(yīng)用確認(rèn); 遠(yuǎn)程發(fā)送是為了告訴其它應(yīng)用,自身應(yīng)用收到了消息)。

  • 如果開啟了消息記錄 Trace 的開關(guān),本地構(gòu)造并發(fā)送SentApplicationEvent事件。

  • bus-env觸發(fā)后所有節(jié)點(diǎn)的EnvironmentChangeListener監(jiān)聽到了配置的變化,控制臺(tái)都會(huì)打印出以下信息:

    o.s.c.b.event.EnvironmentChangeListener : Received remote environment change request. Keys/values to update {hangzhou=alibaba}

    如果在本地監(jiān)聽遠(yuǎn)程確認(rèn)事件 AckRemoteApplicationEvent,都會(huì)收到所有節(jié)點(diǎn)的信息,比如 node5 節(jié)點(diǎn)的控制臺(tái)監(jiān)聽到的 AckRemoteApplicationEvent事件如下:

    ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670484,"originService":"rocketmq-bus-node5:10005","destinationService":"**","id":"375f0426-c24e-4904-bce1-5e09371fc9bc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670184,"originService":"rocketmq-bus-node1:10001","destinationService":"**","id":"91f06cf1-4bd9-4dd8-9526-9299a35bb7cc","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670402,"originService":"rocketmq-bus-node2:10002","destinationService":"**","id":"7df3963c-7c3e-4549-9a22-a23fa90a6b85","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670406,"originService":"rocketmq-bus-node3:10003","destinationService":"**","id":"728b45ee-5e26-46c2-af1a-e8d1571e5d3a","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type":"AckRemoteApplicationEvent","timestamp":1554124670427,"originService":"rocketmq-bus-node4:10004","destinationService":"**","id":"1812fd6d-6f98-4e5b-a38a-4b11aee08aeb","ackId":"750d033f-356a-4aad-8cf0-3481ace8698c","ackDestinationService":"**","event":"org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

    那么回到本章節(jié)開頭提到的 4 個(gè)問題,我們分別做一下解答:

    • 消息是如何發(fā)送的: 在BusAutoConfiguration#acceptLocal方法中通過 Spring Cloud Stream 發(fā)送事件到springCloudBustopic 中。
    • 消息是如何接收的: 在BusAutoConfiguration#acceptRemote方法中通過 Spring Cloud Stream 接收springCloudBustopic 的消息。
    • destination 是如何匹配的:?在BusAutoConfiguration#acceptRemote方法中接收遠(yuǎn)程事件方法里對(duì) destination 進(jìn)行匹配。
    • 遠(yuǎn)程事件收到后如何觸發(fā)下一個(gè) action: Bus 內(nèi)部通過 Spring 的事件機(jī)制接收本地的RemoteApplicationEvent具體的實(shí)現(xiàn)事件再做下一步的動(dòng)作(比如EnvironmentChangeListener接收了EnvironmentChangeRemoteApplicationEvent事件,RefreshListener接收了RefreshRemoteApplicationEvent事件)。

    總結(jié)

    Spring Cloud Bus 自身內(nèi)容還是比較少的,不過還是需要提前了解 Spring Cloud Stream 體系以及 Spring 自身的事件機(jī)制,在此基礎(chǔ)上,才能更好地理解 Spring Cloud Bus 對(duì)本地事件和遠(yuǎn)程事件的處理邏輯。

    目前 Bus 內(nèi)置的遠(yuǎn)程事件較少,大多數(shù)為配置相關(guān)的事件,我們可以繼承RemoteApplicationEvent并配合@RemoteApplicationEventScan注解構(gòu)建自身的微服務(wù)消息體系。

    作者簡(jiǎn)介

    方劍(花名:洛夜),GitHub ID @fangjian0423,開源愛好者,阿里巴巴高級(jí)開發(fā)工程師,阿里云產(chǎn)品 EDAS 開發(fā),Spring Cloud Alibaba 開源項(xiàng)目負(fù)責(zé)人之一。

    總結(jié)

    以上是生活随笔為你收集整理的Spring Cloud Bus 消息总线介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。