日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java rocketmq消费_rocketmq消费负载均衡--push消费详解

發(fā)布時(shí)間:2025/3/20 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java rocketmq消费_rocketmq消费负载均衡--push消费详解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

本文介紹了DefaultMQPushConsumerImpl消費(fèi)者,客戶端負(fù)載均衡相關(guān)知識(shí)點(diǎn)。本文從DefaultMQPushConsumerImpl啟動(dòng)過(guò)程到實(shí)現(xiàn)負(fù)載均衡,從源代碼一步一步分析,共分為6個(gè)部分進(jìn)行介紹,其中第6個(gè)部分 rebalanceByTopic 為負(fù)載均衡的核心邏輯模塊,具體過(guò)程運(yùn)用了圖文進(jìn)行闡述。

介紹之前首先拋出幾個(gè)問(wèn)題:

1. 要做負(fù)載均衡,首先要解決的一個(gè)問(wèn)題是什么?

2. 負(fù)載均衡是Client端處理還是Broker端處理?

個(gè)人理解:

1. 要做負(fù)載均衡,首先要做的就是信號(hào)收集。

所謂信號(hào)收集,就是得知道每一個(gè)consumerGroup有哪些consumer,對(duì)應(yīng)的topic是誰(shuí)。信號(hào)收集分為Client端信號(hào)收集與Broker端信號(hào)收集兩個(gè)部分。

2. 負(fù)載均衡放在Client端處理。

具體做法是:消費(fèi)者客戶端在啟動(dòng)時(shí)完善rebalanceImpl實(shí)例,同時(shí)拷貝訂閱信息存放rebalanceImpl實(shí)例對(duì)象中,另外也是很重要的一個(gè)步驟 -- 通過(guò)心跳消息,不停的上報(bào)自己到所有Broker,注冊(cè)RegisterConsumer,等待上述過(guò)程準(zhǔn)備好之后在Client端不斷執(zhí)行的負(fù)載均衡服務(wù)線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費(fèi)Client),然后分配這些全局信息,獲取當(dāng)前客戶端分配到的消費(fèi)隊(duì)列。

本文具體的內(nèi)容:

I. copySubscription

Client端信號(hào)收集,拷貝訂閱信息。

在DefaultMQPushConsumerImpl.start()時(shí),會(huì)將消費(fèi)者的topic訂閱關(guān)系設(shè)置到rebalanceImpl的SubscriptionInner的map中用于負(fù)載:

private void copySubscription() throws MQClientException {

try {

//注:一個(gè)consumer對(duì)象可以訂閱多個(gè)topic

Map sub = this.defaultMQPushConsumer.getSubscription();

if (sub != null) {

for (final Map.Entry entry : sub.entrySet()) {

final String topic = entry.getKey();

final String subString = entry.getValue();

SubscriptionData subscriptionData =

FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//

topic, subString);

this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

}

}

if (null == this.messageListenerInner) {

this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();

}

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

break;

case CLUSTERING:

final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());

SubscriptionData subscriptionData =

FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//

retryTopic, SubscriptionData.SUB_ALL);

this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);

break;

default:

break;

}

}

catch (Exception e) {

throw new MQClientException("subscription exception", e);

}

}

FilterAPI.buildSubscriptionData接口將訂閱關(guān)系轉(zhuǎn)換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費(fèi)者的消費(fèi)模式為集群消費(fèi),則會(huì)將retry的topic一并放到。

II. 完善rebalanceImpl實(shí)例

Client繼續(xù)收集信息:

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer

.getAllocateMessageQueueStrategy());

this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

本文以DefaultMQPushConsumerImpl為例,因此this對(duì)象類型為DefaultMQPushConsumerImp。

III. this.rebalanceService.start()

開(kāi)啟負(fù)載均衡服務(wù)。this.rebalanceService是一個(gè)RebalanceService實(shí)例對(duì)象,它繼承與ServiceThread,是一個(gè)線程類。 this.rebalanceService.start()執(zhí)行時(shí),也即執(zhí)行RebalanceService線程體:

@Override

public void run() {

log.info(this.getServiceName() + " service started");

while (!this.isStoped()) {

this.waitForRunning(WaitInterval);

this.mqClientFactory.doRebalance();

}

log.info(this.getServiceName() + " service end");

}

IV. this.mqClientFactory.doRebalance

客戶端遍歷消費(fèi)組table,對(duì)該客戶端上所有消費(fèi)者獨(dú)立進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)隊(duì)列:

public void doRebalance() {

for (String group : this.consumerTable.keySet()) {

MQConsumerInner impl = this.consumerTable.get(group);

if (impl != null) {

try {

impl.doRebalance();

} catch (Exception e) {

log.error("doRebalance exception", e);

}

}

}

}

V. MQConsumerInner.doRebalance

由于本文以DefaultMQPushConsumerImpl消費(fèi)過(guò)程為例,即DefaultMQPushConsumerImpl.doRebalance:

@Override

public void doRebalance() {

if (this.rebalanceImpl != null) {

this.rebalanceImpl.doRebalance();

}

}

步驟II 中完善了rebalanceImpl實(shí)例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。

rebalanceImpl.doRebalance()過(guò)程如下:

public void doRebalance() {

// 前文copySubscription中初始化了SubscriptionInner

Map subTable = this.getSubscriptionInner();

if (subTable != null) {

for (final Map.Entry entry : subTable.entrySet()) {

final String topic = entry.getKey();

try {

this.rebalanceByTopic(topic);

} catch (Exception e) {

if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

log.warn("rebalanceByTopic Exception", e);

}

}

}

}

this.truncateMessageQueueNotMyTopic();

}

VI. rebalanceByTopic -- 核心步驟之一

rebalanceByTopic方法中根據(jù)消費(fèi)者的消費(fèi)類型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費(fèi)負(fù)載均衡的邏輯。

集群消費(fèi)負(fù)載均衡邏輯主要代碼如下(省略了log等代碼):

//1.從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列

Set mqSet = this.topicSubscribeInfoTable.get(topic);

//2. 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId

List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

f (null == mqSet) { ... }

if (null == cidAll) { ... }

if (mqSet != null && cidAll != null) {

List mqAll = new ArrayList();

mqAll.addAll(mqSet);

Collections.sort(mqAll);

Collections.sort(cidAll);

// 3.創(chuàng)建DefaultMQPushConsumer對(duì)象時(shí)默認(rèn)設(shè)置為AllocateMessageQueueAveragely

AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List allocateResult = null;

try {

// 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列

allocateResult = strategy.allocate(

this.consumerGroup,

this.mQClientFactory.getClientId(),

mqAll,

cidAll);

} catch (Throwable e) {

return;

}

// 5. 將分配得到的allocateResult 中的隊(duì)列放入allocateResultSet 集合

Set allocateResultSet = new HashSet();

if (allocateResult != null) {

allocateResultSet.addAll(allocateResult);

}

//6. 更新updateProcessQueue

boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);

if (changed) {

this.messageQueueChanged(topic, mqSet, allocateResultSet);

}

}

注:BROADCASTING邏輯只包含上述的1、6。

集群消費(fèi)負(fù)載均衡邏輯中的1、2、4這三個(gè)點(diǎn)相關(guān)知識(shí)為其核心過(guò)程,各個(gè)點(diǎn)相關(guān)知識(shí)如下:

第1點(diǎn):從topicSubscribeInfoTable列表中獲取與該topic相關(guān)的所有消息隊(duì)列

第2點(diǎn): 從broker端獲取消費(fèi)該消費(fèi)組的所有客戶端clientId

首先,消費(fèi)者對(duì)象不斷地向所有broker發(fā)送心跳包,上報(bào)自己,注冊(cè)并更新訂閱關(guān)系以及客戶端ChannelInfoTable;之后,客戶端在做消費(fèi)負(fù)載均衡時(shí)獲取那些消費(fèi)客戶端,對(duì)這些客戶端進(jìn)行負(fù)載均衡,分發(fā)消費(fèi)的隊(duì)列。具體過(guò)程如下圖所示:

第4點(diǎn):調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當(dāng)前client分配消費(fèi)隊(duì)列

注:上圖中cId1、cId2、...、cIdN通過(guò) getConsumerIdListByGroup 獲取,它們?cè)谶@個(gè)ConsumerGroup下所有在線客戶端列表中。

當(dāng)前消費(fèi)對(duì)進(jìn)行負(fù)載均衡策略后獲取對(duì)應(yīng)的消息消費(fèi)隊(duì)列。具體的算法很簡(jiǎn)單,可以看源碼。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

總結(jié)

以上是生活随笔為你收集整理的java rocketmq消费_rocketmq消费负载均衡--push消费详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。