RocketMQ:Consumer概述及启动流程与消息拉取源码分析
文章目錄
- Consumer
- 概述
- 消費(fèi)者核心類
- 消費(fèi)者啟動流程
- 消息拉取
- PullMessageService實(shí)現(xiàn)機(jī)制
- ProcessQueue實(shí)現(xiàn)機(jī)制
- 消息拉取基本流程
- 客戶端發(fā)起消息拉取請求
- 消息服務(wù)器Broker組裝消息
- 消息拉取客戶端處理消息
- 總結(jié)
Consumer
概述
-
消費(fèi)者組與消費(fèi)模式
消息消費(fèi)以組的模式展開,一個消費(fèi)組內(nèi)可包含多個消費(fèi)者,每個消費(fèi)組可以訂閱多個主題。消費(fèi)組之間有負(fù)載均衡和廣播兩種模式。負(fù)載均衡模式,主題下的同一條消息只允許被其中一個消費(fèi)者消費(fèi)。廣播模式,主題下的同一條消息,將被所有消費(fèi)者消費(fèi)一次。
-
消息傳遞模式
分為推送和拉取兩種模式。
-
從何處開始消費(fèi)消息,可選參數(shù):
CONSUME_FROM_LAST_OFFSET:上一次消費(fèi)偏移量
CONSUME_FROM_FIRST_OFFSET:從頭開始
CONSUME_FROM_TIMESTAMP:某個時間開始
消費(fèi)者核心類
基于消息推送模式
//發(fā)送消息-如果消息消費(fèi)失敗,將會發(fā)送回Broker,過一段時間(delayLevel)再進(jìn)行消費(fèi) void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName); //根據(jù)主題從消費(fèi)者緩存中獲取消息隊(duì)列 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic); //注冊并發(fā)消息事件監(jiān)聽器 void registerMessageListener(MessageListenerConcurrently messageListener); //注冊順序消息事件監(jiān)聽器 void registerMessageListener(final MessageListenerOrderly messageListener); //基于主題訂閱消息,消息過濾使用表達(dá)式 void subscribe(final String topic, final String subExpression); //基于主題訂閱消息,消息過濾使用類模式 void subscribe(final String topic, final String fullClassName,final String filterClassSource); //訂閱消息,并指定隊(duì)列選擇器 void subscribe(final String topic, final MessageSelector selector); void unsubscribe(final String topic):取消消息訂閱DefaultMQPushConsumer
//消費(fèi)者組 private String consumerGroup; //消息消費(fèi)模式 private MessageModel messageModel = MessageModel.CLUSTERING; //指定消費(fèi)開始偏移量(最大偏移量、最小偏移量、啟動時間戳)開始消費(fèi) private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; //集群模式下的消息隊(duì)列負(fù)載策略 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; //訂閱信息 private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); //消息業(yè)務(wù)監(jiān)聽器 private MessageListener messageListener; //消息消費(fèi)進(jìn)度存儲器 private OffsetStore offsetStore; //消費(fèi)者最小線程數(shù)量 private int consumeThreadMin = 20; //消費(fèi)者最大線程數(shù)量 private int consumeThreadMax = 20; //并發(fā)消息消費(fèi)時處理隊(duì)列最大跨度 private int consumeConcurrentlyMaxSpan = 2000; //每1000次流控后打印流控日志 private int pullThresholdForQueue = 1000; //推模式下任務(wù)間隔時間 private long pullInterval = 0; //推模式下任務(wù)拉取的條數(shù),默認(rèn)32條 private int pullBatchSize = 32; //每次傳入MessageListener#consumerMessage中消息的數(shù)量 private int consumeMessageBatchMaxSize = 1; //是否每次拉取消息都訂閱消息 private boolean postSubscriptionWhenPull = false; //消息重試次數(shù),-1代表16次 private int maxReconsumeTimes = -1; //消息消費(fèi)超時時間 private long consumeTimeout = 15;DefaultMQPushConsumerImpl:消息消費(fèi)者默認(rèn)實(shí)現(xiàn)類,應(yīng)用程序中直接調(diào)用該類的實(shí)例完成消息的消費(fèi)。
RebalanceImpl:重新負(fù)載均衡實(shí)現(xiàn)類,實(shí)現(xiàn)消息消費(fèi)端與消息隊(duì)列之間的重新分布。
MQClientInstance:消息客戶端實(shí)例,負(fù)責(zé)與MQ服務(wù)器Broker和NameServer之間的的網(wǎng)絡(luò)交互。
PullAPIWrapper:RocketMQ中,實(shí)際上只有Message Pull模式,而Push模式只是將Pull模式進(jìn)行了封裝。
OffsetStore:消息消費(fèi)進(jìn)度存儲器。
消費(fèi)者啟動流程
DefaultMQPushConsumerImpl#start
switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//校驗(yàn)消息合法性this.checkConfig();//構(gòu)建主題訂閱信息this.copySubscription();/*** 如果消息消費(fèi)模式為集群模式,并且當(dāng)前的實(shí)例名為 DEFAULT,替換為當(dāng)前客戶端進(jìn)程的PID* if (this.instanceName.equals("DEFAULT")) {* this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();* }*/if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//構(gòu)建MQClientInstance mQClientFactorythis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);//構(gòu)造重新負(fù)載均衡實(shí)現(xiàn)類this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);//拉取消息API封裝this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);//消息消費(fèi)進(jìn)度加載if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費(fèi)進(jìn)度存儲到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費(fèi)的進(jìn)度存儲到遠(yuǎn)端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();//創(chuàng)建順序消息消費(fèi)服務(wù)if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {//順序this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());}//創(chuàng)建并發(fā)消息消費(fèi)服務(wù)else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {//并發(fā)this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//啟動消息消費(fèi)服務(wù)this.consumeMessageService.start();//注冊到消費(fèi)者實(shí)例到客戶端boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//--------啟動客戶端---------mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());//進(jìn)入消息消費(fèi)狀態(tài)this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break; }//更新訂閱信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //檢測broker狀態(tài) this.mQClientFactory.checkClientInBroker(); //發(fā)送心跳包 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //重新負(fù)載 this.mQClientFactory.rebalanceImmediately();消息拉取
核心類:PutMessageService,是一個消息拉取的服務(wù)線程。
PullMessageService實(shí)現(xiàn)機(jī)制
PullMessageService#run
//線程狀態(tài)為運(yùn)行狀態(tài) while (!this.isStopped()) {try {//在拉取消息請求隊(duì)列中拉取消息請求PullRequest pullRequest = this.pullRequestQueue.take();//處理拉取消息請求this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);} }PullRequest
public class PullRequest {//費(fèi)者組private String consumerGroup;//待拉取消息隊(duì)列private MessageQueue messageQueue;//消息處理隊(duì)列private ProcessQueue processQueue;//待拉取的MessageQueue偏移量private long nextOffset;//是否被鎖定private boolean previouslyLocked = false; }PullMessageService#pullMessage
//找到消費(fèi)者 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) {//強(qiáng)轉(zhuǎn)為推送模式下的消費(fèi)者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//拉取消息impl.pullMessage(pullRequest); } else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }ProcessQueue實(shí)現(xiàn)機(jī)制
ProcessQueue是MessageQueue在消費(fèi)端的快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按照消息的隊(duì)列偏移量順序放在ProcessQueue中,PullMessageService再將消息提交到消費(fèi)者消費(fèi)線程池,消息消費(fèi)成功之后從ProcessQueue中移除——Queue consumption snapshot。
ProcessQueue
//消息容器 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //讀寫鎖 private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //ProcessQueue總消息數(shù)量 private final AtomicLong msgCount = new AtomicLong(); //ProcessQueue隊(duì)列最大偏移量 private volatile long queueOffsetMax = 0L; //當(dāng)前ProcessQueue是否被丟棄 private volatile boolean dropped = false; //上一次拉取時間戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); //上一次消費(fèi)時間戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); //移除消費(fèi)超時消息 public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) //添加消息 public boolean putMessage(final List<MessageExt> msgs) //獲取消息最大間隔 public long getMaxSpan() //移除消息 public long removeMessage(final List<MessageExt> msgs) //將consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap public void rollback() //將consumingMsgOrderlyTreeMap消息清除,表示成功處理該批消息 public long commit() //重新處理該批消息 public void makeMessageToCosumeAgain(List<MessageExt> msgs) //從processQueue中取出batchSize條消息 public List<MessageExt> takeMessags(final int batchSize)消息拉取基本流程
客戶端發(fā)起消息拉取請求
DefaultMessagePushConsumerImpl#pullMessage
//獲取消息處理隊(duì)列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return; }//設(shè)置拉取時間戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {//判斷服務(wù)是否狀態(tài)正常this.makeSureStateOK(); } catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return; }//被掛起--->等待1s if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return; }//-----------------------------------流量控制----------------------------------- //獲得最大待處理消息數(shù)量 long cachedMessageCount = processQueue.getMsgCount().get(); //獲得最大待處理消息大小 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//從數(shù)量進(jìn)行流控->消息數(shù)量大于1000條 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }//從消息大小進(jìn)行流控->消息大小大于100Mb if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }....//獲得主題訂閱信息 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); //如果主題訂閱信息為空--->延遲3s后繼續(xù)拉取 if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return; }final long beginTimestamp = System.currentTimeMillis();//拉取消息回調(diào)函數(shù) PullCallback pullCallback = new PullCallback() {//這一部分實(shí)現(xiàn)在之后進(jìn)行講解 };....//獲取消息系統(tǒng)拉取標(biāo)記 int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter );try {//與服務(wù)端進(jìn)行交互獲取消息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,//-----------↓↓↓注意這個回調(diào)函數(shù)-------------pullCallback); } catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }接下來重點(diǎn)關(guān)注pullAPIWrapper.pullKernelImpl()的核心邏輯:
public PullResult pullKernelImpl(final MessageQueue mq, //消息消費(fèi)隊(duì)列final String subExpression, //消息訂閱子模式subscribe( topicName, "模式")final String expressionType, final long subVersion, //版本final long offset, //pullRequest.getNextOffset()final int maxNums, //defaultMQPushConsumer.getPullBatchSize()->32條final int sysFlag, //系統(tǒng)標(biāo)記final long commitOffset, //當(dāng)前消息隊(duì)列最新偏移量final long brokerSuspendMaxTimeMillis,//運(yùn)行Broker掛起最長時間->15sfinal long timeoutMillis, //超時時間->30sfinal CommunicationMode communicationMode,//Sync/Async/Onewayfinal PullCallback pullCallback //消息拉取后的回調(diào)函數(shù) ) throws MQClientException, RemotingException, MQBrokerException, InterruptedExceptionPullAPIWrapper#pullKernelImpl
//獲取Broker信息 FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); //如果Broker信息為空 if (null == findBrokerResult) {//從Nameserver更新主題路由表信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取Broker信息findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); }if (findBrokerResult != null) {{//檢查版本if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//封裝拉取消息請求PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//根據(jù)brokerAddr、requestHeader等信息利用遠(yuǎn)程網(wǎng)絡(luò)調(diào)用實(shí)例在Broker中對消息進(jìn)行拉取//最后返回一個消息拉取結(jié)果PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult; }根據(jù)拉取消息的模式是異步或同步來進(jìn)行不同操作。
MQClientAPIImpl#pullMessage
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC://異步拉取->發(fā)送拉取消息請求并立即返回結(jié)果this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC://同步拉取->發(fā)送消息拉取請求并等待結(jié)果返回return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break; } return null;消息服務(wù)器Broker組裝消息
消息服務(wù)器接收到消費(fèi)者的消息拉取請求之后進(jìn)行消息組裝的時序圖:
消息請求處理器:PullMessageProcessor
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);//Broker控制器private final BrokerController brokerController; //構(gòu)造函數(shù)存儲容器private List<ConsumeMessageHook> consumeMessageHookList; }PullMessageProcessor#processRequest
//構(gòu)建消息過濾器 MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); } else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); }//調(diào)用MessageStore.getMessage查找消息 final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),//消費(fèi)組名稱requestHeader.getTopic(),//主題名稱requestHeader.getQueueId(),//隊(duì)列IDrequestHeader.getQueueOffset(),//待拉取偏移量requestHeader.getMaxMsgNums(), //最大拉取消息條數(shù)messageFilter //消息過濾器);DefaultMessageStore#getMessage
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; //查找下一次隊(duì)列偏移量 long nextBeginOffset = offset; //當(dāng)前消息隊(duì)列最小偏移量 long minOffset = 0; //當(dāng)前消息隊(duì)列最大偏移量 long maxOffset = 0;//懶加載-當(dāng)找到消息再進(jìn)行賦值 GetMessageResult getResult = null;//當(dāng)前commitLog最大偏移量 final long maxOffsetPy = this.commitLog.getMaxOffset();//根據(jù)主題名稱和隊(duì)列編號獲取消息消費(fèi)隊(duì)列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) {minOffset = consumeQueue.getMinOffsetInQueue();maxOffset = consumeQueue.getMaxOffsetInQueue();//消息偏移量異常情況校對下一次拉取偏移量if (maxOffset == 0) {//表示當(dāng)前消息隊(duì)列中沒有消息status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);} else if (offset < minOffset) {//待拉取消息的偏移量小于隊(duì)列的實(shí)際偏移量status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else if (offset == maxOffset) {//待拉取偏移量為隊(duì)列最大偏移量status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);} else if (offset > maxOffset) {//偏移量越界status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {//偏移量處于正常情況//-------------根據(jù)偏移量從CommitLog中拉取32條消息------------SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);}PullMessageProcessor#processRequest
//根據(jù)拉取結(jié)果填充responseHeader response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {//如果當(dāng)前拉取消息是從Slave節(jié)點(diǎn)拉取并且拉取速度較慢responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else {//設(shè)置下一次拉取任務(wù)的ID為主節(jié)點(diǎn)responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break; } //GetMessageResult與Response的Code轉(zhuǎn)換 switch (getMessageResult.getStatus()) {//成功case FOUND:response.setCode(ResponseCode.SUCCESS);break;//消息不存在case MESSAGE_WAS_REMOVING://消息重試response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//沒有找到匹配的主題隊(duì)列case NO_MATCHED_LOGIC_QUEUE://消息隊(duì)列中未包含消息case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;//未找到匹配的消息case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//消息物理偏移量為空case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset越界case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;//offset過大case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset過小case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break; } .... //如果CommitLog標(biāo)記可用,并且當(dāng)前Broker為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度 boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),requestHeader.getCommitOffset()); } return response;消息拉取客戶端處理消息
MQClientAPIImpl#processPullResponse
PullStatus pullStatus = PullStatus.NO_NEW_MSG; //判斷響應(yīng)結(jié)果 switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark(), addr); }//解碼響應(yīng)頭 PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);//封裝PullResultExt返回 return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());PullResult
/*** 拉取結(jié)果狀態(tài):FOUND/NO_NEW_MSG/NO_MATCHED_MSG/OFFSET_ILLEGAL*/ private final PullStatus pullStatus; /*** 下次拉取偏移量*/ private final long nextBeginOffset; /*** 消息隊(duì)列最小偏移量*/ private final long minOffset; /*** 消息隊(duì)列最大偏移量*/ private final long maxOffset; /*** 拉取的消息隊(duì)列*/ private List<MessageExt> msgFoundList;DefaultMQPushConsumerImpl#pullMessage
PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {switch (pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//消息拉取成功-將消息放入processQueueboolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//提交消息消費(fèi)請求-對消息進(jìn)行處理DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//如果pullInterval大于0,則等待pullInterval毫秒后將pullRequest對象放入到PullMessageService中的pullRequestQueue隊(duì)列中if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}....總結(jié)
本文僅作為個人學(xué)習(xí)使用,如有不足或錯誤請指正!
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的RocketMQ:Consumer概述及启动流程与消息拉取源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:消息消费队列与索引文件
- 下一篇: RocketMQ:消费端的消息消息队列负