日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ:Consumer概述及启动流程与消息拉取源码分析

發(fā)布時間:2025/3/21 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ:Consumer概述及启动流程与消息拉取源码分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

    • Consumer
      • 概述
      • 消費(fèi)者核心類
      • 消費(fèi)者啟動流程
      • 消息拉取
        • PullMessageService實(shí)現(xiàn)機(jī)制
        • ProcessQueue實(shí)現(xiàn)機(jī)制
        • 消息拉取基本流程
          • 客戶端發(fā)起消息拉取請求
          • 消息服務(wù)器Broker組裝消息
          • 消息拉取客戶端處理消息
          • 總結(jié)

Consumer

概述

  • 消費(fèi)者組與消費(fèi)模式

    消息消費(fèi)以組的模式展開,一個消費(fèi)組內(nèi)可包含多個消費(fèi)者,每個消費(fèi)組可以訂閱多個主題。消費(fèi)組之間有負(fù)載均衡和廣播兩種模式。負(fù)載均衡模式,主題下的同一條消息只允許被其中一個消費(fèi)者消費(fèi)。廣播模式,主題下的同一條消息,將被所有消費(fèi)者消費(fèi)一次。

  • 消息傳遞模式

    分為推送和拉取兩種模式。

  • 從何處開始消費(fèi)消息,可選參數(shù):

    CONSUME_FROM_LAST_OFFSET:上一次消費(fèi)偏移量

    CONSUME_FROM_FIRST_OFFSET:從頭開始

    CONSUME_FROM_TIMESTAMP:某個時間開始

消費(fèi)者核心類

基于消息推送模式

//發(fā)送消息-如果消息消費(fèi)失敗,將會發(fā)送回Broker,過一段時間(delayLevel)再進(jìn)行消費(fèi) void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName); //根據(jù)主題從消費(fèi)者緩存中獲取消息隊(duì)列 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic); //注冊并發(fā)消息事件監(jiān)聽器 void registerMessageListener(MessageListenerConcurrently messageListener); //注冊順序消息事件監(jiān)聽器 void registerMessageListener(final MessageListenerOrderly messageListener); //基于主題訂閱消息,消息過濾使用表達(dá)式 void subscribe(final String topic, final String subExpression); //基于主題訂閱消息,消息過濾使用類模式 void subscribe(final String topic, final String fullClassName,final String filterClassSource); //訂閱消息,并指定隊(duì)列選擇器 void subscribe(final String topic, final MessageSelector selector); void unsubscribe(final String topic):取消消息訂閱

DefaultMQPushConsumer

//消費(fèi)者組 private String consumerGroup; //消息消費(fèi)模式 private MessageModel messageModel = MessageModel.CLUSTERING; //指定消費(fèi)開始偏移量(最大偏移量、最小偏移量、啟動時間戳)開始消費(fèi) private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; //集群模式下的消息隊(duì)列負(fù)載策略 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; //訂閱信息 private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); //消息業(yè)務(wù)監(jiān)聽器 private MessageListener messageListener; //消息消費(fèi)進(jìn)度存儲器 private OffsetStore offsetStore; //消費(fèi)者最小線程數(shù)量 private int consumeThreadMin = 20; //消費(fèi)者最大線程數(shù)量 private int consumeThreadMax = 20; //并發(fā)消息消費(fèi)時處理隊(duì)列最大跨度 private int consumeConcurrentlyMaxSpan = 2000; //每1000次流控后打印流控日志 private int pullThresholdForQueue = 1000; //推模式下任務(wù)間隔時間 private long pullInterval = 0; //推模式下任務(wù)拉取的條數(shù),默認(rèn)32條 private int pullBatchSize = 32; //每次傳入MessageListener#consumerMessage中消息的數(shù)量 private int consumeMessageBatchMaxSize = 1; //是否每次拉取消息都訂閱消息 private boolean postSubscriptionWhenPull = false; //消息重試次數(shù),-1代表16次 private int maxReconsumeTimes = -1; //消息消費(fèi)超時時間 private long consumeTimeout = 15;

DefaultMQPushConsumerImpl:消息消費(fèi)者默認(rèn)實(shí)現(xiàn)類,應(yīng)用程序中直接調(diào)用該類的實(shí)例完成消息的消費(fèi)。

RebalanceImpl:重新負(fù)載均衡實(shí)現(xiàn)類,實(shí)現(xiàn)消息消費(fèi)端與消息隊(duì)列之間的重新分布。

MQClientInstance:消息客戶端實(shí)例,負(fù)責(zé)與MQ服務(wù)器Broker和NameServer之間的的網(wǎng)絡(luò)交互。

PullAPIWrapper:RocketMQ中,實(shí)際上只有Message Pull模式,而Push模式只是將Pull模式進(jìn)行了封裝。

OffsetStore:消息消費(fèi)進(jìn)度存儲器。

消費(fèi)者啟動流程

DefaultMQPushConsumerImpl#start

switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//校驗(yàn)消息合法性this.checkConfig();//構(gòu)建主題訂閱信息this.copySubscription();/*** 如果消息消費(fèi)模式為集群模式,并且當(dāng)前的實(shí)例名為 DEFAULT,替換為當(dāng)前客戶端進(jìn)程的PID* if (this.instanceName.equals("DEFAULT")) {* this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();* }*/if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}//構(gòu)建MQClientInstance mQClientFactorythis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);//構(gòu)造重新負(fù)載均衡實(shí)現(xiàn)類this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);//拉取消息API封裝this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);//消息消費(fèi)進(jìn)度加載if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING://廣播模式下 將消息消費(fèi)進(jìn)度存儲到本地this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING://集群模式下 將消息消費(fèi)的進(jìn)度存儲到遠(yuǎn)端Broker中this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();//創(chuàng)建順序消息消費(fèi)服務(wù)if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {//順序this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());}//創(chuàng)建并發(fā)消息消費(fèi)服務(wù)else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {//并發(fā)this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}//啟動消息消費(fèi)服務(wù)this.consumeMessageService.start();//注冊到消費(fèi)者實(shí)例到客戶端boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//--------啟動客戶端---------mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());//進(jìn)入消息消費(fèi)狀態(tài)this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break; }//更新訂閱信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //檢測broker狀態(tài) this.mQClientFactory.checkClientInBroker(); //發(fā)送心跳包 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //重新負(fù)載 this.mQClientFactory.rebalanceImmediately();

消息拉取

核心類:PutMessageService,是一個消息拉取的服務(wù)線程。

PullMessageService實(shí)現(xiàn)機(jī)制

PullMessageService#run

//線程狀態(tài)為運(yùn)行狀態(tài) while (!this.isStopped()) {try {//在拉取消息請求隊(duì)列中拉取消息請求PullRequest pullRequest = this.pullRequestQueue.take();//處理拉取消息請求this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);} }

PullRequest

public class PullRequest {//費(fèi)者組private String consumerGroup;//待拉取消息隊(duì)列private MessageQueue messageQueue;//消息處理隊(duì)列private ProcessQueue processQueue;//待拉取的MessageQueue偏移量private long nextOffset;//是否被鎖定private boolean previouslyLocked = false; }

PullMessageService#pullMessage

//找到消費(fèi)者 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) {//強(qiáng)轉(zhuǎn)為推送模式下的消費(fèi)者DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;//拉取消息impl.pullMessage(pullRequest); } else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }

ProcessQueue實(shí)現(xiàn)機(jī)制

ProcessQueue是MessageQueue在消費(fèi)端的快照。PullMessageService從消息服務(wù)器默認(rèn)每次拉取32條消息,按照消息的隊(duì)列偏移量順序放在ProcessQueue中,PullMessageService再將消息提交到消費(fèi)者消費(fèi)線程池,消息消費(fèi)成功之后從ProcessQueue中移除——Queue consumption snapshot。

ProcessQueue

//消息容器 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); //讀寫鎖 private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); //ProcessQueue總消息數(shù)量 private final AtomicLong msgCount = new AtomicLong(); //ProcessQueue隊(duì)列最大偏移量 private volatile long queueOffsetMax = 0L; //當(dāng)前ProcessQueue是否被丟棄 private volatile boolean dropped = false; //上一次拉取時間戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); //上一次消費(fèi)時間戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis(); //移除消費(fèi)超時消息 public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) //添加消息 public boolean putMessage(final List<MessageExt> msgs) //獲取消息最大間隔 public long getMaxSpan() //移除消息 public long removeMessage(final List<MessageExt> msgs) //將consumingMsgOrderlyTreeMap中消息重新放在msgTreeMap,并清空consumingMsgOrderlyTreeMap public void rollback() //將consumingMsgOrderlyTreeMap消息清除,表示成功處理該批消息 public long commit() //重新處理該批消息 public void makeMessageToCosumeAgain(List<MessageExt> msgs) //從processQueue中取出batchSize條消息 public List<MessageExt> takeMessags(final int batchSize)

消息拉取基本流程

客戶端發(fā)起消息拉取請求

DefaultMessagePushConsumerImpl#pullMessage

//獲取消息處理隊(duì)列 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) {log.info("the pull request[{}] is dropped.", pullRequest.toString());return; }//設(shè)置拉取時間戳 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {//判斷服務(wù)是否狀態(tài)正常this.makeSureStateOK(); } catch (MQClientException e) {log.warn("pullMessage exception, consumer state not ok", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);return; }//被掛起--->等待1s if (this.isPause()) {log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);return; }//-----------------------------------流量控制----------------------------------- //獲得最大待處理消息數(shù)量 long cachedMessageCount = processQueue.getMsgCount().get(); //獲得最大待處理消息大小 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);//從數(shù)量進(jìn)行流控->消息數(shù)量大于1000條 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }//從消息大小進(jìn)行流控->消息大小大于100Mb if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {//延遲消息拉取50msthis.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return; }....//獲得主題訂閱信息 final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); //如果主題訂閱信息為空--->延遲3s后繼續(xù)拉取 if (null == subscriptionData) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.warn("find the consumer's subscription failed, {}", pullRequest);return; }final long beginTimestamp = System.currentTimeMillis();//拉取消息回調(diào)函數(shù) PullCallback pullCallback = new PullCallback() {//這一部分實(shí)現(xiàn)在之后進(jìn)行講解 };....//獲取消息系統(tǒng)拉取標(biāo)記 int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, // commitOffsettrue, // suspendsubExpression != null, // subscriptionclassFilter // class filter );try {//與服務(wù)端進(jìn)行交互獲取消息this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,//-----------↓↓↓注意這個回調(diào)函數(shù)-------------pullCallback); } catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }

接下來重點(diǎn)關(guān)注pullAPIWrapper.pullKernelImpl()的核心邏輯:

public PullResult pullKernelImpl(final MessageQueue mq, //消息消費(fèi)隊(duì)列final String subExpression, //消息訂閱子模式subscribe( topicName, "模式")final String expressionType, final long subVersion, //版本final long offset, //pullRequest.getNextOffset()final int maxNums, //defaultMQPushConsumer.getPullBatchSize()->32條final int sysFlag, //系統(tǒng)標(biāo)記final long commitOffset, //當(dāng)前消息隊(duì)列最新偏移量final long brokerSuspendMaxTimeMillis,//運(yùn)行Broker掛起最長時間->15sfinal long timeoutMillis, //超時時間->30sfinal CommunicationMode communicationMode,//Sync/Async/Onewayfinal PullCallback pullCallback //消息拉取后的回調(diào)函數(shù) ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException

PullAPIWrapper#pullKernelImpl

//獲取Broker信息 FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); //如果Broker信息為空 if (null == findBrokerResult) {//從Nameserver更新主題路由表信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());//重新獲取Broker信息findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false); }if (findBrokerResult != null) {{//檢查版本if (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//封裝拉取消息請求PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//根據(jù)brokerAddr、requestHeader等信息利用遠(yuǎn)程網(wǎng)絡(luò)調(diào)用實(shí)例在Broker中對消息進(jìn)行拉取//最后返回一個消息拉取結(jié)果PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult; }

根據(jù)拉取消息的模式是異步或同步來進(jìn)行不同操作。

MQClientAPIImpl#pullMessage

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC://異步拉取->發(fā)送拉取消息請求并立即返回結(jié)果this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC://同步拉取->發(fā)送消息拉取請求并等待結(jié)果返回return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break; } return null;
消息服務(wù)器Broker組裝消息

消息服務(wù)器接收到消費(fèi)者的消息拉取請求之后進(jìn)行消息組裝的時序圖:

消息請求處理器:PullMessageProcessor

public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);//Broker控制器private final BrokerController brokerController; //構(gòu)造函數(shù)存儲容器private List<ConsumeMessageHook> consumeMessageHookList; }

PullMessageProcessor#processRequest

//構(gòu)建消息過濾器 MessageFilter messageFilter; if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); } else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager()); }//調(diào)用MessageStore.getMessage查找消息 final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),//消費(fèi)組名稱requestHeader.getTopic(),//主題名稱requestHeader.getQueueId(),//隊(duì)列IDrequestHeader.getQueueOffset(),//待拉取偏移量requestHeader.getMaxMsgNums(), //最大拉取消息條數(shù)messageFilter //消息過濾器);

DefaultMessageStore#getMessage

GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; //查找下一次隊(duì)列偏移量 long nextBeginOffset = offset; //當(dāng)前消息隊(duì)列最小偏移量 long minOffset = 0; //當(dāng)前消息隊(duì)列最大偏移量 long maxOffset = 0;//懶加載-當(dāng)找到消息再進(jìn)行賦值 GetMessageResult getResult = null;//當(dāng)前commitLog最大偏移量 final long maxOffsetPy = this.commitLog.getMaxOffset();//根據(jù)主題名稱和隊(duì)列編號獲取消息消費(fèi)隊(duì)列 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) {minOffset = consumeQueue.getMinOffsetInQueue();maxOffset = consumeQueue.getMaxOffsetInQueue();//消息偏移量異常情況校對下一次拉取偏移量if (maxOffset == 0) {//表示當(dāng)前消息隊(duì)列中沒有消息status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);} else if (offset < minOffset) {//待拉取消息的偏移量小于隊(duì)列的實(shí)際偏移量status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else if (offset == maxOffset) {//待拉取偏移量為隊(duì)列最大偏移量status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);} else if (offset > maxOffset) {//偏移量越界status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {//偏移量處于正常情況//-------------根據(jù)偏移量從CommitLog中拉取32條消息------------SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);}

PullMessageProcessor#processRequest

//根據(jù)拉取結(jié)果填充responseHeader response.setRemark(getMessageResult.getStatus().name()); responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset()); responseHeader.setMinOffset(getMessageResult.getMinOffset()); responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {//如果當(dāng)前拉取消息是從Slave節(jié)點(diǎn)拉取并且拉取速度較慢responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else {//設(shè)置下一次拉取任務(wù)的ID為主節(jié)點(diǎn)responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break; } //GetMessageResult與Response的Code轉(zhuǎn)換 switch (getMessageResult.getStatus()) {//成功case FOUND:response.setCode(ResponseCode.SUCCESS);break;//消息不存在case MESSAGE_WAS_REMOVING://消息重試response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//沒有找到匹配的主題隊(duì)列case NO_MATCHED_LOGIC_QUEUE://消息隊(duì)列中未包含消息case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;//未找到匹配的消息case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;//消息物理偏移量為空case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset越界case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;//offset過大case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;//offset過小case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break; } .... //如果CommitLog標(biāo)記可用,并且當(dāng)前Broker為主節(jié)點(diǎn),則更新消息消費(fèi)進(jìn)度 boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),requestHeader.getCommitOffset()); } return response;
消息拉取客戶端處理消息

MQClientAPIImpl#processPullResponse

PullStatus pullStatus = PullStatus.NO_NEW_MSG; //判斷響應(yīng)結(jié)果 switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark(), addr); }//解碼響應(yīng)頭 PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);//封裝PullResultExt返回 return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());

PullResult

/*** 拉取結(jié)果狀態(tài):FOUND/NO_NEW_MSG/NO_MATCHED_MSG/OFFSET_ILLEGAL*/ private final PullStatus pullStatus; /*** 下次拉取偏移量*/ private final long nextBeginOffset; /*** 消息隊(duì)列最小偏移量*/ private final long minOffset; /*** 消息隊(duì)列最大偏移量*/ private final long maxOffset; /*** 拉取的消息隊(duì)列*/ private List<MessageExt> msgFoundList;

DefaultMQPushConsumerImpl#pullMessage

PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {switch (pullResult.getPullStatus()) {case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());//消息拉取成功-將消息放入processQueueboolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//提交消息消費(fèi)請求-對消息進(jìn)行處理DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//如果pullInterval大于0,則等待pullInterval毫秒后將pullRequest對象放入到PullMessageService中的pullRequestQueue隊(duì)列中if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}....
總結(jié)

本文僅作為個人學(xué)習(xí)使用,如有不足或錯誤請指正!

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的RocketMQ:Consumer概述及启动流程与消息拉取源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

亚洲一区日韩在线 | 五月亚洲 | 在线免费观看黄网站 | 久久在线视频在线 | 27xxoo无遮挡动态视频 | 一区二区三区在线视频观看58 | 国产女人18毛片水真多18精品 | 亚洲一区美女视频在线观看免费 | 亚洲经典在线 | 国产免费影院 | 久久免费视频99 | 中文在线√天堂 | 超碰在线最新 | 深爱婷婷久久综合 | 国产成人精品久久久久蜜臀 | 欧美日韩另类视频 | 久久电影中文字幕视频 | 日韩久久精品一区二区 | 激情婷婷综合 | 国产高清视频免费观看 | 亚洲免费在线观看视频 | 国产精品一区免费观看 | 丁香综合五月 | 久青草国产在线 | 国产一级久久久 | av福利在线免费观看 | 97在线视频免费观看 | 91在线免费公开视频 | 国产亚洲精品久久网站 | 在线香蕉视频 | 日本在线视频网址 | 日韩av成人在线观看 | 中文字幕电影一区 | 午夜av色| 久久精品久久久久电影 | 韩国av免费在线观看 | 婷婷色网址 | www欧美日韩 | 久久手机精品视频 | 福利网址在线观看 | 一二三久久久 | 国产成人精品日本亚洲999 | 色婷婷亚洲综合 | 狠狠色丁香久久婷婷综 | 五月婷婷狠狠 | 色多多在线观看 | wwwwww国产| 午夜久久成人 | 国产一级做a爱片久久毛片a | 国产蜜臀av | 久久精品视频在线看 | 久草av在线播放 | 国产精品久久久久av福利动漫 | 日韩久久影院 | 国产91精品久久久久久 | 国产精品国产三级国产aⅴ9色 | 日韩欧美高清在线 | 爱干视频 | 视频在线观看99 | 欧美成人影音 | 91精品秘密在线观看 | 操操碰 | 婷婷中文在线 | 天天干夜夜擦 | 国产区久久| 国产精品剧情 | 91av美女| 国产高清成人在线 | 欧美日韩国产成人 | 午夜精品一区二区三区免费 | aaa免费毛片 | 久久天堂影院 | 国产精品亚洲综合久久 | 天天干天天操天天爱 | 91av视频在线观看免费 | 91伊人| 国产日韩精品在线观看 | 婷婷丁香久久五月婷婷 | 激情综合五月天 | 国产精品日韩在线观看 | www欧美色| 亚洲一区二区三区精品在线观看 | 久久99国产精品自在自在app | 综合黄色网 | 在线 视频 亚洲 | 成人在线免费av | 在线高清 | 免费 在线 中文 日本 | 九九热精品视频在线播放 | 黄色h在线观看 | 在线观看视频一区二区三区 | 成人超碰在线 | 99久久精品久久久久久清纯 | 天天综合网久久 | 欧美另类高清 | 色多多在线观看 | 色av男人的天堂免费在线 | 日韩成人看片 | 久久亚洲成人网 | 中文在线字幕观看电影 | 日韩有码在线观看视频 | 日韩毛片在线免费观看 | 国产在线播放一区二区三区 | 91免费在线看片 | 国产精品乱码久久久 | 久久婷婷一区二区三区 | 午夜视频在线观看一区二区三区 | 国产精品亚洲综合久久 | www.日韩免费 | 在线观看91视频 | 97综合视频| 男女视频国产 | 日韩欧美xxx| 亚洲成人第一区 | 911久久香蕉国产线看观看 | 国产黄a三级三级三级三级三级 | 911av视频| 91精品一 | 色综合在 | 狠狠干夜夜爽 | 午夜精品一区二区三区在线播放 | 中文字幕 二区 | 久草视频在线免费播放 | 人人澡人人澡人人 | 五月天色婷婷丁香 | 久草视频一区 | 丁香视频全集免费观看 | 国产日产亚洲精华av | 中文字幕av专区 | 中文av网站| 亚洲人人射| 国产96视频| 精品久久久久国产 | 国产日韩中文在线 | 精品视频97| 色在线网 | 有码视频在线观看 | 中文字幕亚洲欧美日韩2019 | 久久99热精品这里久久精品 | 欧美孕妇视频 | 国产精品视频免费看 | 爱爱av在线 | 成人黄色小说网 | 午夜视频在线观看一区二区三区 | 精品一二三四在线 | 综合铜03| 国产精品 国产精品 | 成人黄色小视频 | 97在线播放视频 | 久久草视频 | 欧美激情视频三区 | 国内精品久久久久久久久久久 | 日本一区二区三区免费看 | 日本护士撒尿xxxx18 | 在线观看网站你懂的 | 久草视频2 | 视频一区在线播放 | 人人看黄色 | 天天射天天干天天爽 | 久草精品电影 | 香蕉手机在线 | 四虎国产精品免费 | 在线免费国产 | 免费看三级黄色片 | 在线观看完整版免费 | 中文字幕日韩高清 | 99精品国产99久久久久久97 | 婷婷五月在线视频 | 久久国产欧美日韩精品 | 久久撸在线视频 | 一区在线观看 | 超碰人人在线观看 | 精品uu | 91精品视频在线免费观看 | 亚洲成人av在线电影 | www.国产视频 | 国产黄色精品网站 | 69视频在线 | 伊人久操 | 国产视频2区 | 国产日韩在线观看一区 | 免费视频91| 在线观看国产亚洲 | 国产精品久久久久久久久婷婷 | 国产精品美女免费看 | 激情网在线观看 | 国产精品一区二区电影 | 国产精品 中文字幕 亚洲 欧美 | 中文字幕精品一区二区三区电影 | 国产91精品高清一区二区三区 | 91中文字幕在线观看 | 国产五月婷婷 | 中国一级片免费看 | 国产91精品高清一区二区三区 | 又长又大又黑又粗欧美 | 亚洲精品午夜一区人人爽 | 五月婷婷丁香激情 | 久久久久久久久久久免费视频 | 中文字幕资源在线观看 | 国产91成人在在线播放 | av三级在线播放 | 亚洲精品日韩一区二区电影 | 999久久| 久久久久久免费毛片精品 | 黄色高清视频在线观看 | 亚洲精品黄色在线观看 | 免费在线色视频 | 亚洲国产精品视频 | 中文字幕在线播放一区二区 | 麻豆一区二区三区视频 | 在线看欧美 | 日韩二区在线播放 | 99视频免费在线观看 | 亚洲一区二区麻豆 | 丁香婷婷激情国产高清秒播 | 狠狠狠色丁香婷婷综合激情 | 亚洲国产资源 | 久草影视在线观看 | 国产麻豆成人传媒免费观看 | 麻豆视传媒官网免费观看 | 亚洲天堂网在线视频观看 | 日本在线观看中文字幕 | 亚洲在线日韩 | 91成人精品一区在线播放 | 亚洲欧美成人综合 | www.天天干.com| 在线影视 一区 二区 三区 | 91在线91拍拍在线91 | 深爱激情av | 国产精品久久久久av福利动漫 | 最近中文字幕免费大全 | 国产精品入口传媒 | 99热这里只有精品国产首页 | 欧美性脚交 | 亚洲在线| 久久超级碰视频 | 国产欧美精品一区二区三区四区 | 久久视频国产精品免费视频在线 | 在线观看免费国产小视频 | 久久久久久免费毛片精品 | 久青草视频在线观看 | 4hu视频 | 久久国产精品视频 | 91传媒91久久久 | www.黄色 | 五月天综合激情网 | 日本久草电影 | 久久在线看 | 视频一区二区视频 | 国产精品日韩在线播放 | 日日夜夜网站 | 五月天开心 | 天堂在线视频免费观看 | 色偷偷88欧美精品久久久 | 日韩中文字幕国产 | 久久不卡av | 黄色小说免费在线观看 | 久久情爱| 在线观看的av网站 | 99精品国产一区二区三区不卡 | 五月婷婷影院 | 成人黄色视 | 精品国产一区二区三区不卡 | 色婷婷国产在线 | 日本中文一区二区 | 亚洲精品自拍视频在线观看 | 国产日韩欧美视频 | 91九色蝌蚪视频在线 | 区一区二区三在线观看 | 91豆麻精品91久久久久久 | 中文资源在线播放 | 中文国产字幕 | 美女久久 | 麻豆一精品传二传媒短视频 | 国语精品免费视频 | 欧美激情在线网站 | 色视频一区| 97视频播放 | 丁香五月亚洲综合在线 | www久久精品| 国产涩图| 日韩xxxx视频 | 久久视频在线 | av福利在线导航 | 免费a视频在线 | 手机看片| 国偷自产视频一区二区久 | 99视频精品 | 人人玩人人添人人澡97 | 亚洲欧美视频网站 | 欧美精品一区在线发布 | 精品一二三四五区 | 日韩欧美一区二区三区在线 | 日韩午夜剧场 | 在线看片91| 一区二区三区影院 | 97国产精品久久 | 亚洲视频每日更新 | 欧洲一区二区在线观看 | 色综合天天狠天天透天天伊人 | 国产精品一区二区在线观看 | 中文字幕中文中文字幕 | 91香蕉视频好色先生 | 精品久久中文 | 综合色综合 | 国产露脸91国语对白 | 久久人人97超碰精品888 | 亚洲欧美在线观看视频 | 99久久9 | 国产一区精品在线 | a黄色影院 | 99久久精品午夜一区二区小说 | 日韩二区三区在线 | 成人a在线观看 | 91av福利视频 | 日韩手机在线 | 毛片美女网站 | 97超碰人人澡 | 看国产黄色大片 | 永久黄网站色视频免费观看w | 福利视频一二区 | 亚洲人成精品久久久久 | 中文字幕在线看视频国产 | 国产精品一区二区三区免费看 | 香蕉网在线 | 狠狠狠综合 | 999成人 | 亚洲黄色三级 | 亚洲精品一区中文字幕乱码 | 中文字幕在 | 欧美伦理一区二区 | 久久精品视频网址 | 一区二区电影在线观看 | 黄色大片免费播放 | 日韩v欧美v日本v亚洲v国产v | 成人黄大片 | 成人黄色短片 | 91成人在线观看喷潮 | 成人资源网 | 91免费看黄色 | 国产成人精品一区二区三区 | 91精品视频导航 | 四虎永久免费在线观看 | 国产精品99久久久久久久久 | 日韩欧美成 | 9在线观看免费 | 婷婷在线综合 | 亚洲视频在线看 | 免费日韩一级片 | 精品视频免费在线 | 精品99免费 | 久久系列 | 在线免费中文字幕 | 日本mv大片欧洲mv大片 | av一级在线观看 | 日免费视频 | 久久www免费视频 | 97国产超碰在线 | 免费无遮挡动漫网站 | 九九热在线观看视频 | 天天操天天弄 | 久久国产精品视频 | 午夜精品久久久久99热app | 69视频在线播放 | 日韩电影中文 | av在观看 | 1000部国产精品成人观看 | 国产手机在线观看视频 | 粉嫩av一区二区三区四区五区 | 91色蜜桃 | 啪一啪在线 | 最新国产精品久久精品 | 国产精品va在线观看入 | 激情五月婷婷综合网 | 国产麻豆剧果冻传媒视频播放量 | 在线国产视频 | 久久黄色网页 | 九九热视频在线免费观看 | 豆豆色资源网xfplay | 国产99久 | 亚洲va欧洲va国产va不卡 | 天天综合色天天综合 | 亚洲春色综合另类校园电影 | 黄av在线 | 在线免费91 | 久久高清国产视频 | 国产一区二区三区黄 | 国产二级视频 | 97超碰站 | 69夜色精品国产69乱 | 狠狠地操 | 玖玖玖精品 | 日本性生活一级片 | 亚洲精品午夜久久久 | 正在播放五月婷婷狠狠干 | 国产精品亚洲片在线播放 | av在线播放快速免费阴 | 精品国偷自产国产一区 | 成人av免费电影 | 天天做日日做天天爽视频免费 | 91av国产视频| 亚洲综合欧美精品电影 | 狠狠gao | 91手机视频在线 | 狠狠干五月天 | 国产99久久久久久免费看 | 国产午夜三级 | 色国产精品一区在线观看 | 精品视频在线播放 | 91人人澡 | 国产一级精品在线观看 | 成年人免费观看在线视频 | 99久久久久免费精品国产 | 国产成人亚洲精品自产在线 | 亚洲精品黄网站 | 国产成年免费视频 | 国产xxxx做受性欧美88 | 99热都是精品 | 日韩久久精品一区 | 激情五月av | 国产视频精品免费 | 精品视频www| 久久不射电影院 | 麻豆国产网站 | 国内精品中文字幕 | 久久 一区 | 亚洲特级毛片 | 亚洲精品乱码白浆高清久久久久久 | 在线成人看片 | 国产中文字幕网 | 黄色成年 | 丁香婷婷电影 | 日韩精品五月天 | www.综合网.com | 五月婷婷在线视频观看 | 成人亚洲免费 | 天堂av最新网址 | 91最新网址在线观看 | 日本 在线 视频 中文 有码 | 欧美精品在线观看 | 又黄又刺激又爽的视频 | 六月久久婷婷 | 色搞搞| 精品国产乱码一区二 | 成人在线免费小视频 | 精品麻豆| 一区 在线观看 | 国产成人91| 天天弄天天干 | 久久噜噜少妇网站 | 久久久999 | 成人午夜网址 | 日韩欧美一区二区三区黑寡妇 | 日韩av成人在线观看 | 久久久久久久久影视 | 久久久视频在线 | 久久狠狠亚洲综合 | 深夜国产在线 | 另类五月激情 | 超碰人人射 | 国产美女精品视频 | 97免费视频在线播放 | 国产高清不卡 | aaa毛片视频| 国产精品免费久久 | 中文字幕在线视频第一页 | 免费看的黄网站软件 | 久久国际影院 | av导航福利 | 亚洲黄色免费在线 | 福利av影院| 精品福利在线视频 | 在线观看一区二区精品 | 91九色国产蝌蚪 | 日本中文在线 | 99视频国产精品免费观看 | 欧美色伊人 | 精品视频在线看 | 日韩成人黄色av | 国产视频1 | 一区二区伦理电影 | 91片在线观看 | 视频在线播放国产 | 国产一级免费在线观看 | 欧美色888 | 国产欧美最新羞羞视频在线观看 | 久久开心激情 | a天堂一码二码专区 | 国产男女免费完整视频 | 国产精品免费大片视频 | 狠狠操狠狠| 午夜精品久久久久久久久久久久久久 | 婷婷在线五月 | 久久精品中文视频 | 久久国产一区二区 | 狠狠操在线 | 亚洲无吗天堂 | 婷婷深爱五月 | 在线导航av | 99精品网站 | 欧美成人xxx| 狠狠操狠狠干2017 | 天天射射天天 | 国产高清不卡av | 中文网丁香综合网 | 97热视频| 久久美女高清视频 | 久久97精品 | 久久99亚洲网美利坚合众国 | 国产 欧美 日产久久 | 午夜精品av在线 | 国产黄色播放 | 麻豆激情电影 | 久久久久日本精品一区二区三区 | 日韩aa视频 | 四虎影视久久久 | 黄色片网站大全 | 91成人在线视频 | 波多野结衣视频一区二区 | 91精品国| 欧美另类交人妖 | 天天摸日日摸人人看 | 99精品热视频只有精品10 | 天天干天天操天天爱 | 久久久国产精品电影 | 国产一卡二卡在线 | 中文字幕在线视频一区二区三区 | 免费成人黄色片 | 久操中文字幕在线观看 | 精品a视频 | 91免费版在线观看 | 中文字幕高清免费日韩视频在线 | 欧美日韩性生活 | mm1313亚洲精品国产 | 欧美在线18 | 亚洲免费资源 | 精品人人人人 | 日韩一二三在线 | www.亚洲激情.com | 欧美va在线观看 | 国产一区在线播放 | 亚洲久草视频 | 日本久久片 | 麻豆国产露脸在线观看 | 久久久国产一区二区三区四区小说 | 欧美久久久久久久久中文字幕 | 99精品视频在线观看 | 国产在线精品播放 | 国产免费一区二区三区最新 | 91精品国产福利在线观看 | 激情网站五月天 | 亚洲91av| 天堂黄色片 | 手机在线黄色网址 | 亚洲精品网站在线 | 精品嫩模福利一区二区蜜臀 | 2022中文字幕在线观看 | 亚洲综合成人av | 日本不卡一区二区三区在线观看 | 久久天堂影院 | 五月天激情开心 | 日韩网站在线观看 | 成人资源在线播放 | 国产在线 一区二区三区 | 中文字幕在线观看av | 国产污视频在线观看 | 国内精品久久天天躁人人爽 | 日本女人在线观看 | 色狠狠综合 | 久久国产剧场电影 | 国产精品成久久久久三级 | 久久久久女教师免费一区 | 激情欧美一区二区三区免费看 | 国产精品久久久久永久免费观看 | 久久久五月婷婷 | 日日夜夜骑 | 免费看搞黄视频网站 | 国产字幕在线看 | 韩国精品福利一区二区三区 | 日本不卡一区二区 | 毛片网在线观看 | 奇米影视在线99精品 | 国产精品每日更新 | 国产精品久久久久av福利动漫 | 国内精品久久久久久久影视麻豆 | a级国产乱理论片在线观看 特级毛片在线观看 | 亚洲资源在线 | 丰满少妇在线观看资源站 | 亚洲aⅴ一区二区三区 | 婷婷色站 | 丁香花在线视频观看免费 | 99精品视频免费在线观看 | 久久9999久久免费精品国产 | 中文字幕一区在线 | 天天综合网在线观看 | 在线看不卡av | 国产婷婷色| 99精品小视频 | 美女免费黄网站 | 国产一区在线免费 | 狠狠色伊人亚洲综合成人 | 亚洲影院天堂 | 国产精品久久久999 国产91九色视频 | 亚洲精品乱码久久久久久写真 | 日韩字幕| 天天操天天摸天天射 | 2021久久| 成人在线视频你懂的 | 色婷婷电影网 | www.久久99| 狠狠色噜噜狠狠狠狠 | 国产亚洲成人网 | 久草在线综合网 | 中文字幕在线不卡国产视频 | 麻豆国产在线视频 | 在线观看91视频 | 一区二区视频在线播放 | 成年人app网址 | 四虎亚洲精品 | 免费看的黄色 | 日韩美精品视频 | 自拍超碰在线 | 成人黄色在线看 | 国产伦精品一区二区三区照片91 | 99热精品国产一区二区在线观看 | av免费网站 | 黄色国产在线 | 视频一区在线播放 | 久久久久日本精品一区二区三区 | 国产91大片 | 久久久精品网 | 人人舔人人舔 | 91九色蝌蚪国产 | 亚洲自拍偷拍色图 | 成人免费观看视频大全 | 99福利影院| 国产精品丝袜久久久久久久不卡 | 国产一区免费观看 | 久久久精品小视频 | 色福利网站| 最近中文字幕视频完整版 | 日韩精品三区四区 | 成人毛片在线观看 | 免费污片 | 亚a在线| 国产精品99爱 | 成 人 黄 色 视频播放1 | 在线观看久久 | 91av视屏| 五月婷久 | 福利视频在线看 | 中文成人字幕 | 久久综合婷婷综合 | 美女精品在线观看 | 久久尤物电影视频在线观看 | 九九九九精品九九九九 | 久久视| 亚洲国产精品成人va在线观看 | 免费欧美高清视频 | 欧美日韩在线电影 | 国产一区二区高清不卡 | 日韩免费精品 | 六月丁香伊人 | 日本在线h | 免费av免费观看 | 精品美女视频 | 欧美日韩精品二区第二页 | 欧美一级黄色网 | 国产精品1区2区3区在线观看 | 狠狠躁夜夜躁人人爽超碰91 | 一区二区三区四区精品 | 天天综合视频在线观看 | 成片人卡1卡2卡3手机免费看 | 欧美91精品久久久久国产性生爱 | 成 人 免费 黄 色 视频 | 深夜免费福利在线 | 操少妇视频 | 9i看片成人免费看片 | 又粗又长又大又爽又黄少妇毛片 | 激情欧美在线观看 | 日韩高清毛片 | 婷婷国产一区二区三区 | 女人久久久久 | 国产va在线| 97色在线观看免费视频 | 免费人成网 | 亚洲伊人色 | 亚洲天天做 | 国产在线97| 免费观看一级特黄欧美大片 | 久久久受www免费人成 | 一区二区电影网 | 蜜臀av性久久久久av蜜臀妖精 | 国产999精品久久久久久麻豆 | 中文字幕丝袜 | 国产亚洲精品久久久久久久久久久久 | 亚洲精品中文字幕视频 | 国产亚洲视频在线免费观看 | 91视频久久久 | 国产美女视频免费观看的网站 | 激情欧美国产 | 亚洲精品在线观看免费 | 天天草夜夜 | 国产成人精品综合久久久 | 日韩精品一区二区在线视频 | 在线精品视频免费观看 | 日本中文字幕免费观看 | 国产小视频91 | 国产精品免费在线视频 | 国产xxxx做受性欧美88 | 亚洲专区一二三 | 日韩一区正在播放 | 99在线视频网站 | ,久久福利影视 | 91综合久久一区二区 | 国产中文字幕免费 | 激情五月婷婷综合 | 在线看成人片 | 91麻豆福利 | 在线高清| 色97在线| 国产视频在线播放 | 中文字幕在线观看视频网站 | 五月综合婷 | 国产日韩欧美在线 | 亚洲国产97在线精品一区 | 欧美日本国产在线观看 | av在线精品 | 午夜狠狠操 | 欧美日本中文字幕 | 久久香蕉国产 | 热久久视久久精品18亚洲精品 | 国内小视频在线观看 | 欧美地下肉体性派对 | 国产中文字幕大全 | 中文字幕av全部资源www中文字幕在线观看 | 欧美性色黄 | 91九色国产 | 国产精品免费看久久久8精臀av | 日韩激情网 | 91最新在线观看 | 亚洲精品国产精品国自产 | 久久99日韩 | 黄色a大片 | 国内精品免费 | 日日干天天爽 | 日本精品视频在线 | 久久伊人综合 | 五月婷婷播播 | 99久久久国产精品免费观看 | 香蕉视频在线免费 | v片在线看 | 久久久美女| 97av视频| 久久在线精品 | 正在播放 久久 | 一区在线观看 | 国产区在线视频 | 五月婷婷激情六月 | 干 操 插 | 午夜三级影院 | 天天超碰 | 免费a级大片 | 黄色小说在线观看视频 | 91av在| 国产高清一级 | 久久无码av一区二区三区电影网 | 国产一区二区三区午夜 | 久久久久久久综合色一本 | 激情丁香在线 | 国产999精品久久久影片官网 | 久久久久国产精品午夜一区 | 欧美日韩视频在线观看一区二区 | 午夜精选视频 | 国产精品18p | 色综合中文综合网 | 99久久日韩精品免费热麻豆美女 | 午夜精品视频免费在线观看 | 国产精品99久久久 | 欧美精品天堂 | 日本久久高清视频 | 制服丝袜天堂 | 国产精品视频全国免费观看 | 99福利影院 | 亚洲在线视频观看 | 亚洲欧美综合精品久久成人 | 亚洲成人影音 | 日韩高清毛片 | 久久麻豆视频 | 天天曰天天 | 亚洲人毛片 | 久久国产视频网站 | 欧美精品一区在线 | 欧美成人影音 | 国产精品久久久久久久久久三级 | 岛国av在线不卡 | 中文字幕第一页在线 | 激情中文字幕 | 日韩精品免费在线观看视频 | 超碰av在线 | 精品免费| 久久乐九色婷婷综合色狠狠182 | av色一区 | 精品美女久久久久久免费 | 免费av大全 | 精品在线观看一区二区三区 | 99在线精品视频在线观看 | 久久综合狠狠综合久久狠狠色综合 | 一区二区三区精品久久久 | 欧美成人tv | 懂色av一区二区在线播放 | 园产精品久久久久久久7电影 | 亚洲欧美视频在线观看 | 天天操天天操天天操天天操天天操 | 久久99国产一区二区三区 | 亚洲免费激情 | 五月激情片 | 日本中文字幕在线免费观看 | 国产成人综合在线观看 | 又大又硬又黄又爽视频在线观看 | 亚洲爽爽网| av在线播放中文字幕 | 最近高清中文在线字幕在线观看 | 亚洲成人999| 国产成人精品午夜在线播放 | 国产免费观看久久 | 在线视频手机国产 | 中文字幕在线专区 | 高清美女视频 | 黄色大片日本 | 精品国产一区二区三区久久 | 国产视频精品免费播放 | 成人免费观看大片 | 青青草国产精品 | 免费a视频| www.99在线观看 | 97超碰中文字幕 | 国产明星视频三级a三级点| 欧美在线视频免费 | 一区二区精品视频 | 久久精品一区二区国产 | 正在播放 国产精品 | 国产精品九九久久99视频 | 最近最新中文字幕 | 国产精品久久99 | 日韩成人欧美 | 制服丝袜在线91 | 中文字幕在线观看的网站 | 99久久婷婷国产精品综合 | 国产精品久久久久久五月尺 | 久久视频二区 | 日日夜夜草 | 丁香婷婷综合激情 | 在线观看日本韩国电影 | 欧美在线1区 | 久久er99热精品一区二区三区 | 日韩av一区二区三区在线观看 | 国产精品免费在线视频 | 911av视频| 一级黄色片在线免费看 | 操操操日日 | 久久免费视频在线观看6 | 亚洲亚洲精品在线观看 | 亚洲人成人天堂h久久 | 日韩激情网 | 日本大片免费观看在线 | 欧美在线观看视频免费 | 99久久这里只有精品 | 午夜视频久久久 | 色操插 | 午夜12点| 欧美精品三级在线观看 | 在线看中文字幕 | 欧美一区二区精美视频 | 国产福利网站 | 在线观看你懂的网址 | 亚洲免费在线看 | 看v片 | 国内久久精品视频 | 天堂av免费观看 | 久久理论片 | 色综合久久久久 | 日本公妇在线观看高清 | 天天操天天操天天操天天操天天操天天操 | 99国产视频在线 | 日本久久久影视 | 中文字幕在线日 | 成人精品在线 | 亚洲好视频 | 亚洲高清在线观看视频 | 又爽又黄又刺激的视频 | 亚洲在线高清 | 激情综合网五月激情 | 91九色在线播放 | 黄色的视频| 亚洲永久精品国产 | 五月婷婷在线播放 | 亚洲一级电影 | 成人黄色小说在线观看 | 日本夜夜草视频网站 | 亚洲天堂毛片 | 久久久久久久久久久影视 | 激情综合网五月 | 国产一区欧美在线 | 日韩理论电影网 | 国产精品久久久电影 | 久久伦理| 激情五月网站 | 中文字幕视频三区 | 中文字幕高清免费日韩视频在线 | 久久免费视频精品 | 在线日韩av | 国产精品国产精品 | 国产91在| 91精品视频导航 | 国产精品剧情在线亚洲 | www.夜夜干.com| 超碰97免费在线 | 免费观看视频的网站 | 九9热这里真品2 | 欧美性生活一级片 | 国产精品入口久久 | 欧美成人在线网站 | 伊人春色电影网 | 日韩福利在线观看 | 国产五十路毛片 | 亚洲综合色站 | 一级免费片 | 99色在线观看视频 | 美女黄久久 | 欧美色图一区 | 国产精品毛片完整版 | 亚洲一级电影在线观看 | 精品国产片 | 免费黄色av | 狠狠网亚洲精品 | av一区二区三区在线播放 | 国产精品视频99 | av在线网站大全 | 在线观看中文字幕网站 | 亚洲成人av电影在线 | 久久免费在线观看 | 国精产品永久999 | 在线播放你懂 | 欧美黄色特级片 | 热久久电影 | 丁香九月婷婷 | 日韩高清 一区 | 少妇激情久久 | 综合精品在线 | 国产码电影 | 成年人在线免费看视频 | 亚洲综合精品在线 | 911国产| 特级黄色片免费看 | 日韩中文字幕免费电影 | 在线影视 一区 二区 三区 | 免费日韩 精品中文字幕视频在线 | 婷婷精品国产一区二区三区日韩 | 不卡精品 | 国产精品va在线观看入 | 日韩欧三级 | 超碰在线人人艹 | 日韩视频在线观看视频 | 国产精品乱码一区二区视频 | 日韩精品专区 | 色网站在线 | 久久五月情影视 | 久久久国产精品视频 | 最近日本中文字幕a | 国产亚洲精品免费 | 国产免费小视频 | 91成人短视频在线观看 | 国产精品成人一区二区三区 | 看av在线 | 亚洲视频 中文字幕 | 97超级碰 | 亚洲资源在线 | 亚洲情感电影大片 | 少妇视频一区 | 日韩专区在线 | 久久久久久久久久网站 | av中文在线 | 日韩在线观看中文字幕 | 天天干天天玩天天操 | 黄色日本片 | 日韩电影一区二区在线 | 天天干天天天天 | 色吧av色av | 国产视频亚洲 | 中中文字幕av| 亚洲欧美日本一区二区三区 | 欧美男女爱爱视频 | 欧美日韩久久不卡 | 成人网页在线免费观看 | 欧美日韩国产高清视频 | 天天射综合网站 | 精品麻豆入口免费 |