日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq消费负载均衡--push消费为例

發(fā)布時間:2025/6/15 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq消费负载均衡--push消费为例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
本文介紹了DefaultMQPushConsumerImpl消費者,客戶端負(fù)載均衡相關(guān)知識點。本文從DefaultMQPushConsumerImpl啟動過程到實現(xiàn)負(fù)載均衡,從源代碼一步一步分析,共分為6個部分進(jìn)行介紹,其中第6個部分?rebalanceByTopic 為負(fù)載均衡的核心邏輯模塊,具體過程運用了圖文進(jìn)行闡述。 介紹之前首先拋出幾個問題: 1. 要做負(fù)載均衡,首先要解決的一個問題是什么? 2. 負(fù)載均衡是Client端處理還是Broker端處理? 個人理解: 1. 要做負(fù)載均衡,首先要做的就是信號收集。 所謂信號收集,就是得知道每一個consumerGroup有哪些consumer,對應(yīng)的topic是誰。信號收集分為Client端信號收集與Broker端信號收集兩個部分。 2. 負(fù)載均衡放在Client端處理。 具體做法是:消費者客戶端在啟動時完善rebalanceImpl實例,同時拷貝訂閱信息存放rebalanceImpl實例對象中,另外也是很重要的一個步驟 -- 通過心跳消息,不停的上報自己到所有Broker,注冊RegisterConsumer,等待上述過程準(zhǔn)備好之后在Client端不斷執(zhí)行的負(fù)載均衡服務(wù)線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費Client),然后分配這些全局信息,獲取當(dāng)前客戶端分配到的消費隊列。 本文具體的內(nèi)容: I. copySubscription Client端信號收集,拷貝訂閱信息。 在DefaultMQPushConsumerImpl.start()時,會將消費者的topic訂閱關(guān)系設(shè)置到rebalanceImpl的SubscriptionInner的map中用于負(fù)載: private void copySubscription() throws MQClientException {try {//注:一個consumer對象可以訂閱多個topicMap<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;case CLUSTERING:final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}}catch (Exception e) {throw new MQClientException("subscription exception", e);}} FilterAPI.buildSubscriptionData接口將訂閱關(guān)系轉(zhuǎn)換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費者的消費模式為集群消費,則會將retry的topic一并放到。 II. 完善rebalanceImpl實例 Client繼續(xù)收集信息: this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); 本文以DefaultMQPushConsumerImpl為例,因此this對象類型為DefaultMQPushConsumerImp。 III. this.rebalanceService.start() 開啟負(fù)載均衡服務(wù)。this.rebalanceService是一個RebalanceService實例對象,它繼承與ServiceThread,是一個線程類。 this.rebalanceService.start()執(zhí)行時,也即執(zhí)行RebalanceService線程體: @Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStoped()) {this.waitForRunning(WaitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}

?

IV. this.mqClientFactory.doRebalance 客戶端遍歷消費組table,對該客戶端上所有消費者獨立進(jìn)行負(fù)載均衡,分發(fā)消費隊列: public void doRebalance() {for (String group : this.consumerTable.keySet()) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null) {try {impl.doRebalance();} catch (Exception e) {log.error("doRebalance exception", e);}}}}

?

V. MQConsumerInner.doRebalance 由于本文以DefaultMQPushConsumerImpl消費過程為例,即DefaultMQPushConsumerImpl.doRebalance: @Overridepublic void doRebalance() {if (this.rebalanceImpl != null) {this.rebalanceImpl.doRebalance();}} 步驟II 中完善了rebalanceImpl實例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。 rebalanceImpl.doRebalance()過程如下: public void doRebalance() {// 前文copySubscription中初始化了SubscriptionInnerMap<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic);} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}

?

VI. rebalanceByTopic -- 核心步驟之一
rebalanceByTopic方法中根據(jù)消費者的消費類型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費負(fù)載均衡的邏輯。
集群消費負(fù)載均衡邏輯主要代碼如下(省略了log等代碼):

//1.從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //2. 從broker端獲取消費該消費組的所有客戶端clientId List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);f (null == mqSet) { ... } if (null == cidAll) { ... } if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);// 3.創(chuàng)建DefaultMQPushConsumer對象時默認(rèn)設(shè)置為AllocateMessageQueueAveragelyAllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費隊列allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,cidAll);} catch (Throwable e) {return;}// 5. 將分配得到的allocateResult 中的隊列放入allocateResultSet 集合Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);} 、//6. 更新updateProcessQueueboolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);if (changed) {this.messageQueueChanged(topic, mqSet, allocateResultSet);} } 注:BROADCASTING邏輯只包含上述的1、6。 集群消費負(fù)載均衡邏輯中的1、2、4這三個點相關(guān)知識為其核心過程,各個點相關(guān)知識如下: 第1點:從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊列

?

第2點: 從broker端獲取消費該消費組的所有客戶端clientId 首先,消費者對象不斷地向所有broker發(fā)送心跳包,上報自己,注冊并更新訂閱關(guān)系以及客戶端ChannelInfoTable;之后,客戶端在做消費負(fù)載均衡時獲取那些消費客戶端,對這些客戶端進(jìn)行負(fù)載均衡,分發(fā)消費的隊列。具體過程如下圖所示:

第4點:調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費隊列

?

注:上圖中cId1、cId2、...、cIdN通過?getConsumerIdListByGroup 獲取,它們在這個ConsumerGroup下所有在線客戶端列表中。 當(dāng)前消費對進(jìn)行負(fù)載均衡策略后獲取對應(yīng)的消息消費隊列。具體的算法很簡單,可以看源碼。

?

轉(zhuǎn)載于:https://www.cnblogs.com/chenjunjie12321/p/7913323.html

總結(jié)

以上是生活随笔為你收集整理的rocketmq消费负载均衡--push消费为例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。