日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MQ 之 RocketMQ

發(fā)布時間:2023/12/18 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MQ 之 RocketMQ 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前些天發(fā)現(xiàn)了一個巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點擊跳轉(zhuǎn)到教程。

RocketMQ 是出自 A 公司的開源產(chǎn)品,用 Java 語言實現(xiàn),在設(shè)計時參考了 Kafka,并做出了自己的一些改進(jìn),消息可靠性上比 Kafka 更好,目前,RocketMQ 的文檔仍然不夠豐富?1?2,社區(qū)仍然無法與 Kafka 比肩,但 A 公司已經(jīng)推出了基于 RocketMQ 的云產(chǎn)品?3,相信未來 RocketMQ 也會有不錯的發(fā)展。本文采用 RocketMQ 3.2.6 進(jìn)行實驗,由于 RocketMQ 與 Kafka 很相似,本文很多地方對兩者做出了比較。

基本概念

RocketMQ 由于借鑒了 Kafka 的設(shè)計,包括組件的命名也很多與 Kafka 相似,下面摘抄一段《RocketMQ 原理簡介》中的介紹,可以與 Kafka 的命名比對一下,

  • Producer,消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息。
  • Consumer,消息消費者,負(fù)責(zé)消費消息,一般是后臺系統(tǒng)負(fù)責(zé)異步消費。
  • Push Consumer,Consumer 的一種,應(yīng)用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立 刻回調(diào) Listener 接口方法。
  • Pull Consumer,Consumer 的一種,應(yīng)用通常主動調(diào)用 Consumer 的拉消息方法從 Broker 拉消息,主動權(quán)由應(yīng)用控制。
  • Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發(fā)送一類消息,且發(fā)送邏輯一致。
  • Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費一類消息,且消費邏輯一致。
  • Broker,消息中轉(zhuǎn)角色,負(fù)責(zé)存儲消息,轉(zhuǎn)發(fā)消息,一般也稱為 Server。在 JMS 規(guī)范中稱為 Provider。

《RocketMQ 原理簡介》中還介紹了一些其他的概念,例如,廣播消費和集群消費,廣播消費是 Consumer Group 中對于同一條消息每個 Consumer 都消費,集群消費是 Consumer Group 中對于同一條消息只有一個 Consumer 消費。Kafka 采用的是集群消費,不支持廣播消費(好吧,是我沒有找到)。再例如,普通順序消息和嚴(yán)格順序消息,普通順序消息在 Broker 重啟情況下不會保證消息順序性;嚴(yán)格順序消息即使在異常情況下也會保證消息的順序性。個人理解,所謂普通順序消息,應(yīng)該就是 Kafka 中的 Partition 級別有序,嚴(yán)格順序消息,應(yīng)該是 Topic 級別有序,但文中也提到,這樣的有序級別是要付出代價的,Broker 集群中只要有一臺機(jī)器不可用,則整個集群都不可用,降低服務(wù)可用性。使用這種模式,需要依賴同步雙寫,主備自動切換,但自動切換功能目前還未實現(xiàn)(我猜,自動切換僅僅是沒開源吧)。說白了,嚴(yán)格順序消息不具備生產(chǎn)可用性,自己玩玩還行,其應(yīng)用場景主要是數(shù)據(jù)庫 binlog 同步。

關(guān)于 RocketMQ 和 Kafka 的對比,可以參考 RocketMQ Wiki 中的文章?4,看看就行,不必較真。

關(guān)于順序和分區(qū)

順序性的話題,剛才已經(jīng)提到了一些,RocketMQ 的實現(xiàn)應(yīng)該不弱于 Kafka。對于分區(qū),RocketMQ 似乎有意弱化了這個概念,只有在 Producer 中有一個參數(shù)?defaultTopicQueueNums,分區(qū)在 RocketMQ 中有時被稱為隊列。RocketMQ 的普通順序消息模式,應(yīng)該就是分區(qū)順序性,這點與 Kafka 一致。

關(guān)于高可用

RocketMQ 實現(xiàn)高可用的方式有多種,《RocketMQ 用戶指南》文檔中提到的有:多主模式、多主多從異步復(fù)制模式、多主多從同步復(fù)制模式。多主模式下,性能較好,但是在 Broker 宕機(jī)的時候,該 Broker 上未消費的交易不可消費;多主多從異步復(fù)制模式,與 Kafka 的副本模式比較類似,主 Broker 宕機(jī)后,會自動切換到從 Broker,消息的消費不會出現(xiàn)間斷;多主多從同步復(fù)制模式更進(jìn)一步,采用同步刷盤的方式,避免了主 Broker 宕機(jī)帶來的消息丟失,但是,目前不支持自動切換。

雖然 RocketMQ 提供了多種高可用方式,但是目前能生產(chǎn)使用的就只有多主多從異步復(fù)制模式,即使在這個模式上,其實現(xiàn)也比 Kafka 要差。因為 RocketMQ 的機(jī)制中,主從關(guān)系是人為指定的,主 Broker 上承擔(dān)所有的消息派發(fā),而 Kafka 的主從關(guān)系是通過選舉的方式選出來的,每個分區(qū)的主節(jié)點都是不一樣的,可以從不同的節(jié)點派發(fā)消息。Kafka 的模式是分散模式,有利于負(fù)載均衡,而且當(dāng)一個 Broker 宕機(jī)的時候,只影響部分 Topic,而 RocketMQ 一旦主 Broker 宕機(jī),會影響所有的 Topic。另外,Kafka 可以支持 Broker 間同步復(fù)制(通過設(shè)置 Broker 的?acks?參數(shù)),這樣比的話,RocketMQ 就差太多了。

關(guān)于 RocketMQ 的介紹,網(wǎng)上的文章不算太多,也比較雜,《分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐》5?6?7這篇原理介紹的不錯,推薦。

RocketMQ 的工具和編程接口

RocketMQ 的工具

相比較 Kafka 而言,RocketMQ 提供的工具要少一些,如下,

bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown

除了進(jìn)程啟停之外,常用的運維命令都在?mqadmin?中,詳見《RocketMQ 運維指令》文檔。我實驗中常用的一些命令如下,

sh mqnamesrv &sh mqbroker -c async-broker-a.properties &sh mqbroker -c async-broker-a-s.properties &sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4

RocketMQ 使用了自己的 name server 來做調(diào)度(Kafka 用了 Zookeeper),使用?sh mqnamesrv?來啟動,默認(rèn)監(jiān)聽端口9876,sh mqnamesrv -m?可以查看所有默認(rèn)參數(shù),使用?-c xxxx.properties?參數(shù)來指定自定義配置。sh mqbroker?是用于啟動 Broker 的命令,參數(shù)比較多,詳細(xì)可以通過?sh mqbroker -m?查看默認(rèn)參數(shù),配置項細(xì)節(jié)后文再說。sh mqadmin?是運維命令入口,topicList?是列出所有 Topic;topicRoute?是列出單個 Topic 的詳細(xì)信息;clusterList?是列出集群的信息;deleteTopic?是刪除 Topic。consumerProgress?是查看消費者消費進(jìn)度,deleteSubGroup?是刪除消費者的訂閱,consumerConnection?是查詢消費者訂閱的情況。

Broker 的配置是最多的,實驗中我修改到的部分如下,其他使用默認(rèn),

brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23brokerName=broker-abrokerId=0namesrvAddr=192.168.232.23:9876listenPort=10911deleteWhen=04fileReservedTime=120storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH

配置文件中的多數(shù)配置看例子就可以知道意思,挑幾個說一下。brokerName?和?brokerId, 同名的 Broker,ID 是0的是主節(jié)點,其他是從節(jié)點;deleteWhen,刪除文件時間點,默認(rèn)凌晨4點;fileReservedTime,文件保留時間,設(shè)置為120小時;brokerRole,Broker 的角色,ASYNC_MASTER 是異步復(fù)制主節(jié)點,SYNC_MASTER 是同步雙寫主節(jié)點,SLAVE 是備節(jié)點。

其實,這些工具的寫法也基本一致,都是先做一些檢查,最后運行 Java 程序,JVM 系統(tǒng)上的應(yīng)用應(yīng)該差不多都這樣。

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發(fā)的,因此,其 Java API 相對是比較豐富的,當(dāng)然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  • 廣播消費,這個在之前已經(jīng)提到過;
  • 消息過濾,支持簡單的 Message Tag 過濾,也支持按 Message Header、body 過濾;
  • 順序消費和亂序消費,之前也提到過,這里的順序消費應(yīng)該指的是普通順序性,這一點與 Kafka 相同;
  • Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
  • 事務(wù)消息,這個好像沒有開源,但是 example 代碼中有示例,總之,不推薦用;
  • Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用于表示消息類別,可以用來過濾,但是順序性還是以 Topic 來看;
  • 單看功能的話,即使不算事務(wù)消息,也不算 Tag,RocketMQ 也遠(yuǎn)超 Kafka,Kafka 應(yīng)該只實現(xiàn)了 Pull 模式消費 + 順序消費這2個功能。RocketMQ 的代碼示例在 rocketmq-example 中,注意,代碼是不能直接運行的,因為所有的代碼都少了設(shè)置 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");。

    先來看一下生產(chǎn)者的 API,比較簡單,只有一種,如下,

    import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.23:9876");producer.start();for (int i = 0; i < 10; i++)try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID188",// key("RocketMQ "+String.format("%05d", i)).getBytes());// bodySendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format("%05d", i)+sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

    可以發(fā)現(xiàn),相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產(chǎn)者客戶端,有同步和異步兩種模式,但都是阻塞模式,send?方法返回發(fā)送狀態(tài)的?Future,可以通過?Future?的?get?方法阻塞獲得發(fā)送狀態(tài)。而 RocketMQ 采用的是同步非阻塞模式,發(fā)送之后立刻返回發(fā)送狀態(tài)(而不是?Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發(fā)生主備切換的時候,Kafka 的同步可以等待切換完成并重連,最后返回;而 RocketMQ 只能立刻報錯,由生產(chǎn)者選擇是否重發(fā)。所以,在生產(chǎn)者的 API 上,其實 Kafka 是要強(qiáng)一些的。

    另外,RocketMQ 可以通過指定?MessageQueueSelector?類的實現(xiàn)來指定將消息發(fā)送到哪個分區(qū)去,Kafka 是通過指定生產(chǎn)者的?partitioner.class?參數(shù)來實現(xiàn)的,靈活性上 RocketMQ 略勝一籌。

    再來看消費者的API,由于 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,

    import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {long offset = consumer.fetchConsumeOffset(mq, true);PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null != pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}

    這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分區(qū),而 Kafka 可以自動管理(當(dāng)然也可以手動管理),并且不需要指定分區(qū)(分區(qū)是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore?接口,提供了兩種管理方式,本地文件和遠(yuǎn)程 Broker。這部分感覺兩者差不多。

    下面再看看 Push 模式順序消費,代碼如下,

    import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}

    雖然提供了 Push 模式,RocketMQ 內(nèi)部實際上還是 Pull 模式的 MQ,Push 模式的實現(xiàn)應(yīng)該采用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,

  • 接收消息的監(jiān)聽類要使用?MessageListenerOrderly;
  • ConsumeFromWhere?有幾個參數(shù),表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;
  • 可以控制 offset 的提交,應(yīng)該就是?context.setAutoCommit(false);?的作用;
  • 控制 offset 提交這個特性非常有用,某種程度上擴(kuò)展一下,就可以當(dāng)做事務(wù)來用了,看代碼?ConsumeMessageOrderlyService?的實現(xiàn),其實并沒有那么復(fù)雜,在不啟用 AutoCommit 的時候,只有返回?COMMIT?才 commit offset;啟用 AutoCommit 的時候,返回?COMMIT、ROLLBACK(這個比較扯)、SUCCESS?的時候,都 commit offset。

    后來發(fā)現(xiàn),commit offset 功能在 Kafka 里面也有提供,使用新的 API,調(diào)用?consumer.commitSync。

    再看一個 Push 模式亂序消費 + 消息過濾的例子,消費者的代碼如下,

    import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}


    接收消息的監(jiān)聽類使用的是?MessageListenerConcurrently;這個例子與之前順序消費不同的地方在于,

  • 回調(diào)方法中,使用的是自動 offset commit;
  • 訂閱的時候增加了消息過濾類?MessageFilterImpl;
  • 消息過濾類?MessageFilterImpl?的代碼如下,

    import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg) {String property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if ((id % 3) == 0 && (id > 10)) {return true;}}return false;}}

    RocketMQ 執(zhí)行過濾是在 Broker 端,Broker 所在的機(jī)器會啟動多個 FilterServer 過濾進(jìn)程;Consumer 啟動后,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉消息,FilterServer 將請求轉(zhuǎn)發(fā)給 Broker,FilterServer 從 Broker 收到消息后,按照 Consumer 上傳的 Java 過濾程序做過濾,過濾完成后返回給 Consumer。這種過濾方法可以節(jié)省網(wǎng)絡(luò)流量,但是增加了 Broker 的負(fù)擔(dān)。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結(jié)了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。

    還有一種廣播消費模式,比較簡單,可以去看代碼,不再列出。

    總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。

    RocketMQ 的主備模式

    按之前所說,只有 RocketMQ 的多主多從異步復(fù)制是可以生產(chǎn)使用的,因此只在這個場景下測試。另外,消息采用 Push 順序模式消費。

    假設(shè)集群采用2主2備的模式,需要啟動4個 Broker,配置文件如下,

    brokerName=broker-abrokerId=0listenPort=10911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERbrokerName=broker-abrokerId=1listenPort=10921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRole=SLAVEbrokerName=broker-bbrokerId=0listenPort=20911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRole=ASYNC_MASTERbrokerRole=ASYNC_MASTERbrokerName=broker-bbrokerId=1listenPort=20921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRole=SLAVE

    另外,每個機(jī)構(gòu)共通的配置項如下,

    brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23namesrvAddr=192.168.232.23:9876deleteWhen=04fileReservedTime=120flushDiskType=ASYNC_FLUSH

    其他設(shè)置均采用默認(rèn)。啟動 NameServer 和所有 Broker,并試運行一下 Producer,然后看一下 TestTopic1 當(dāng)前的情況,

    $ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

    可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機(jī)也在運行。下面開始主備切換的實驗,分別啟動 Consumer 和 Producer 進(jìn)程,消息采用 Pull 順序模式消費。在消息發(fā)送接收過程中,使用?kill -9?停掉?broker-a?的主進(jìn)程,模擬突然宕機(jī)。此時,TestTopic1 的狀態(tài)如下,

    $ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

    此時,RocketMQ 已經(jīng)恢復(fù)。

    再來看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,

    ......00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]......01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

    日志中顯示,在發(fā)送完00583條消息之后,開始發(fā)生異常?connect to <192.168.232.23:10911> failed,原因應(yīng)該是?broker-a?的主節(jié)點被 kill 掉。之后,從00596條消息開始,RocketMQ 又恢復(fù)正常,原因是?broker-b?已經(jīng)開始提供服務(wù),承擔(dān)了所有的工作。然后,又重新啟動了?broker-a?主節(jié)點,由于該節(jié)點的加入,從01392條消息開始,broker-a?又開始恢復(fù)工作。實驗中可以驗證,RocketMQ 所謂的多主多備模式,實際上,備機(jī)被弱化到無以復(fù)加,在主節(jié)點宕機(jī)的時候,備機(jī)無法接替主機(jī)的工作,而只是將尚未發(fā)送的數(shù)據(jù)發(fā)送出去,由剩下的主節(jié)點接替工作。也就是說,N 主 N 備的 RocketMQ 集群中,總共有 2N 臺機(jī)器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機(jī)器的利用率太低了。

    再來看一下 Consumer 的日志,如下,

    RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]......RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

    可以看到,Consumer 在?broker-a?宕機(jī)時間的附近,也出現(xiàn)了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分區(qū)上的順序性,但是已經(jīng)某種程度上出現(xiàn)了一些紊亂,例如,將我在實驗之前的數(shù)據(jù)給取了出來(Hello MetaQ的消息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,并未刪除 Topic 的數(shù)據(jù)。之后,broker-a?主機(jī)重啟之后,又恢復(fù)正常。

    RocketMQ Pull模式消費需要手動管理 offset 和指定分區(qū),這個在調(diào)用的時候不覺得,實際運行的時候才會發(fā)現(xiàn)每次總是消費一個分區(qū),消費完之后,才開始消費下一個分區(qū),而下一個分區(qū)可能已經(jīng)堆積了很多消息了,手動做消息分配又比較費事?;蛟S,Push 順序模式消費才是更好的選擇。

    另外還有幾個比較異常的情況,實驗中有幾次出現(xiàn)了?CODE: 17 DESC: topic[TopicTest1] not exist, apply first please!?這樣的錯誤,實際上,這時候我只是關(guān)掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1?明明文檔中說可以用來新增 Topic,而實際上不行。

    補(bǔ)充一下:之后,我又使用 Push 順序模式消費重做了上述實驗,結(jié)論差不多。只是因為有多線程的原因,日志看起來偶爾有錯位,這個問題不大,可以解決。而且,在關(guān)閉重啟 Broker 的附近,往往伴隨著多次的消息重發(fā),不過,RocketMQ 也不保證消息只收到一次就是了。消息重復(fù)的問題,Kafka 要比 RocketMQ 顯得不那么嚴(yán)重一些。Push 順序模式消費不需要指定 offset,不需要指定分區(qū),第二次啟動可以自動從前一次的 offset 后開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 采用的是異步模式。

    RocketMQ 最佳實踐

    實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的文檔,里面提到了一些系統(tǒng)設(shè)計的問題,例如消費者要冪等,一個應(yīng)用對應(yīng)一個 Topic,如此等等。這些經(jīng)驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑒意義。

    后記

    這里談?wù)勎覍x擇 RocketMQ 還是 Kafka 的個人建議。以上已經(jīng)做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發(fā)展中的系統(tǒng),開源社區(qū)比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經(jīng)有了同步復(fù)制,消息可靠性更有保障;還有,Kafka 的分區(qū)機(jī)制,幾乎實現(xiàn)了自動負(fù)載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠(yuǎn)超出 Kafka,但這些功能并不一定都能用得上,而且多數(shù)可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復(fù)的時候,細(xì)節(jié)上要勝過 RocketMQ。當(dāng)然,如果是 A 公司內(nèi)部,或者所在公司使用了 A 公司的云產(chǎn)品,那么 RocketMQ 的企業(yè)級特性更多一些,或許我會選擇 RocketMQ。

    ?

    轉(zhuǎn)自:http://valleylord.github.io/post/201607-mq-rocketmq/

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

    總結(jié)

    以上是生活随笔為你收集整理的MQ 之 RocketMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    在线天堂中文在线资源网 | 久久五月天婷婷 | 在线免费高清 | 天躁狠狠躁 | 日本久久免费电影 | 久久99国产精品久久 | 久草在线资源视频 | 精品九九九 | 国产在线观看黄 | 国内精品视频在线 | 日韩欧美视频免费在线观看 | 午夜精品一区二区三区在线 | 欧美日韩精| 九草在线视频 | 欧美日本在线观看视频 | 色婷婷中文 | 久久官网 | 欧美一级性生活视频 | 国产三级国产精品国产专区50 | 免费日韩视 | 国产99亚洲 | 成人久久18免费网站图片 | 色综合久久中文综合久久牛 | 操久| 一区二区三区日韩视频在线观看 | 日本精品一区二区三区在线观看 | 久久婷婷五月综合色丁香 | 亚洲经典视频 | 中文字幕在线观看视频一区二区三区 | 久久不见久久见免费影院 | 综合国产在线观看 | 成人一级片视频 | 国内精品99 | 国产精品久久久久一区二区三区共 | 操操综合网 | 99精品成人| 国产尤物在线观看 | 日韩av在线资源 | 国产精品成人免费精品自在线观看 | 国产乱对白刺激视频不卡 | 日韩免费一二三区 | 91最新网址| 在线中文字母电影观看 | 成人国产精品免费观看 | 91福利视频免费观看 | 国精产品999国精产品视频 | 五月激情久久久 | 午夜电影久久久 | a级片在线播放 | 日本一区二区高清不卡 | 国产一级二级av | 特黄免费av | 97电影院在线观看 | 五月天,com| 人人干免费 | 国外av在线 | 97超碰免费在线观看 | 欧美日韩国产欧美 | 成人福利在线 | 亚洲国产中文字幕在线观看 | 99久久国产免费,99久久国产免费大片 | 91在线公开视频 | 久久久久久久久久毛片 | 黄色小说在线免费观看 | 日日夜夜免费精品 | 久久久久免费精品视频 | 草久久影院 | 日韩免费精品 | 精久久久久 | 99精品视频免费 | 午夜影视一区 | 免费看的黄网站 | 天天射天天操天天干 | 国产视| 久久精品国产免费看久久精品 | 免费在线观看av电影 | 午夜精品三区 | 中文字幕一区二区在线观看 | 婷婷深爱网 | 欧美了一区在线观看 | 在线观看免费成人av | 久久精品视频在线免费观看 | 在线影院 国内精品 | 日韩成人邪恶影片 | 成年人网站免费观看 | 国产欧美最新羞羞视频在线观看 | 一区三区在线欧 | 欧美久久久久久久久中文字幕 | 激情网站免费观看 | 成年免费在线视频 | 中文字幕av专区 | 91av电影在线观看 | 狠狠操91 | 超碰激情在线 | 成人国产综合 | 在线视频 日韩 | 美女啪啪图片 | 国产精品视频免费在线观看 | 成人午夜电影免费在线观看 | 色偷偷网站视频 | 国内外成人在线视频 | 成人在线播放网站 | 婷婷久久一区 | 五月婷婷色丁香 | 国产精品久久久久久久久久久久久 | 在线观看成人毛片 | 免费视频你懂的 | 中文字幕免费观看全部电影 | 91传媒视频在线观看 | 激情网婷婷| 黄色激情网址 | 在线 视频 一区二区 | 免费手机黄色网址 | 中文在线字幕免费观 | 国产永久免费 | 免费视频网 | www.久久成人 | 免费十分钟| 在线观看亚洲精品 | 国产片免费在线观看视频 | 亚洲久草在线视频 | 99精品久久精品一区二区 | 永久免费的av电影 | 精品久久久久久亚洲综合网站 | 欧美国产亚洲精品久久久8v | 欧美日韩调教 | 中文字幕中文字幕在线中文字幕三区 | 成人毛片在线视频 | 午夜精品视频免费在线观看 | 午夜黄色 | 国产精品一区二区精品视频免费看 | 欧美资源 | 91色蜜桃| 日韩有码在线观看视频 | 久久精品网站视频 | 欧美精品在线观看 | 国产精品白浆视频 | 国产精品国产三级在线专区 | 久久久久久久久久久影院 | 黄色一级大片在线免费看产 | 青青草华人在线视频 | 天天插天天 | 免费看黄在线 | 美国av片在线观看 | 久久夜色精品国产欧美乱极品 | 九九爱免费视频 | 婷婷色伊人 | 中文字幕丝袜一区二区 | 麻豆视频免费在线观看 | av线上看| 色久天 | 欧美亚洲免费在线一区 | 在线观看的a站 | 日韩av免费一区二区 | 久久久久久久综合色一本 | 午夜精品久久久久久久久久久 | 五月天色婷婷丁香 | 四虎影视成人永久免费观看亚洲欧美 | 免费看三级 | 伊人色**天天综合婷婷 | 欧美日韩性视频 | 九九精品视频在线观看 | 国产大片黄色 | 成年人免费在线观看网站 | 久久久国产精华液 | 久久成人午夜 | 日韩欧美精品在线 | 免费看三级黄色片 | 狠狠干天天色 | 337p日本大胆噜噜噜噜 | 久操视频在线免费看 | 97在线免费 | 精品99999| 国产亚洲精品久久久久久无几年桃 | 国产一级久久 | 亚洲视屏在线播放 | 免费网站看v片在线a | 天天草天天草 | 91色偷偷 | 国产福利精品一区二区 | 五月综合网站 | 国产精品九九九 | 国产在线一区二区三区播放 | 亚洲精品99久久久久久 | 成人一区影院 | 国产a精品 | 丁五月婷婷 | 99热在线看 | 日韩欧在线 | 成人免费观看av | 亚洲成av人片在线观看 | 91在线www | 日本在线观看中文字幕无线观看 | 欧美久久久久久久 | 国产成人精品不卡 | 久久久亚洲精华液 | 五月婷婷国产 | 天天干夜夜操视频 | 91精品视频在线免费观看 | av播放在线 | 亚洲日日射 | 网站在线观看日韩 | 丁香五月亚洲综合在线 | 香蕉久久久久久av成人 | 成人va天堂| 亚洲欧美成人综合 | 91色国产在线 | 色婷婷在线观看视频 | 国产伦理一区二区三区 | 国产 欧美 日本 | 在线观看91久久久久久 | 精油按摩av| 国产成人综合图片 | 欧美激情视频一区 | 色天天中文 | 中文字幕网址 | 亚洲女人天堂成人av在线 | 在线观看岛国 | 在线免费观看视频你懂的 | 欧洲一区精品 | 国产黄色视 | 首页av在线| 国产精品日韩欧美一区二区 | www.香蕉| 国产一二三区在线观看 | 欧美成人xxx | 成人国产在线 | 97精品国产91久久久久久 | 久久国产欧美日韩 | 成人av电影在线观看 | 视频在线99re | 狠狠色伊人亚洲综合网站色 | 美女一级毛片视频 | 天天干天天操天天操 | 97色在线观看免费视频 | 天天天天色射综合 | 91视频免费观看 | 亚洲精品乱码久久久久久按摩 | 最新精品国产 | 一级黄色电影网站 | 天天综合导航 | 国产精品第一视频 | 91精品国产九九九久久久亚洲 | 福利一区二区在线 | 日韩av在线免费看 | 亚洲精品大全 | 激情五月看片 | 日韩精品视频在线免费观看 | 成人国产精品 | 中文字幕亚洲不卡 | 精品国产一区二区三区在线观看 | 色综合人人| 欧美性春潮 | 久久网站免费 | a视频在线看| 亚洲欧美偷拍另类 | 亚洲国产小视频在线观看 | 韩国一区二区三区在线观看 | 精品影院一区二区久久久 | 日韩在线视频线视频免费网站 | 黄色大片国产 | 国内精品久久久久久久久久久 | 伊人久久五月天 | 婷色在线 | 亚洲91精品| 日本公妇在线观看 | 天堂va欧美va亚洲va老司机 | 色婷婷狠狠操 | 国产成人在线网站 | av官网在线 | 91麻豆精品国产91久久久无需广告 | 久久精品99久久 | 日韩女同一区二区三区在线观看 | 亚洲精品美女久久17c | 色婷婷久久 | 97超碰人人澡 | 国产精品一区二区久久 | 国产色视频网站 | 丁香婷婷激情国产高清秒播 | 97日日碰人人模人人澡分享吧 | 色综合久久久久 | 91精品视频在线免费观看 | 成人毛片100免费观看 | 狠狠的操狠狠的干 | 伊香蕉大综综综合久久啪 | 免费观看午夜视频 | 久草精品视频 | 伊人va| 免费看毛片网站 | 人人干人人爽 | 国产精品久99 | 久久久免费毛片 | 国产精品av免费在线观看 | 92精品国产成人观看免费 | 91亚洲精品久久久蜜桃 | 国产婷婷vvvv激情久 | 天天草夜夜 | 久久这里有精品 | 国产一级片一区二区三区 | 三级黄色网络 | 少妇性色午夜淫片aaaze | 日韩激情av在线 | 中文字幕一区二区三区久久蜜桃 | 99久久久久久 | 米奇狠狠狠888 | 天天草综合网 | 国产免费资源 | 国内视频在线观看 | 国产伦理一区二区 | 九九免费在线看完整版 | av黄色免费网站 | 麻豆视频免费网站 | 国产精品一区在线观看 | 国产精品第7页 | 国产成人精品一区二区三区福利 | 久久婷婷久久 | www夜夜操 | 在线观看一区二区精品 | 黄色a在线| 91理论片午午伦夜理片久久 | 在线a亚洲视频播放在线观看 | 丁香婷五月 | 99精品一级欧美片免费播放 | 韩日色视频 | 天天综合区 | 女人18毛片a级毛片一区二区 | 欧美日本国产在线观看 | 亚洲在线看 | 亚洲在线日韩 | 欧美日韩高清国产 | 一区二区三区免费在线观看视频 | 国产一级a毛片视频爆浆 | 四虎国产精品永久在线国在线 | 亚洲va韩国va欧美va精四季 | 日韩激情av在线 | 草免费视频 | 国产又粗又猛又爽又黄的视频免费 | 黄色三级网站 | 精品国产免费观看 | 欧美午夜视频在线 | 搡bbbb搡bbb视频 | 日韩在线观看a | 国产一区自拍视频 | 国际av在线 | 91麻豆精品国产自产 | 久久a级片| 精品天堂av| 日韩无在线| 精品视频999| 91成人免费观看视频 | 美女黄濒 | 国内精品国产三级国产aⅴ久 | 在线观看深夜福利 | 中文国产在线观看 | 成片视频在线观看 | 成人av一区二区三区 | 久久新| 国产精品a成v人在线播放 | 天天天综合 | 最新中文字幕 | 亚洲精品动漫成人3d无尽在线 | 91精品久久久久久综合乱菊 | 成人黄色小视频 | 天天操天天干天天摸 | 色操插 | 亚洲无在线 | 精品美女久久 | 亚洲经典在线 | 91传媒91久久久 | 在线播放91 | 国产视频丨精品|在线观看 国产精品久久久久久久久久久久午夜 | 97国产在线视频 | 亚洲精品国产精品国产 | 国产精品久久一区二区无卡 | 精品少妇一区二区三区在线 | 亚洲欧美视频在线 | 久久精品一二区 | 免费在线观看视频a | 97在线观看免费视频 | 国产一区欧美一区 | 精品一区二区三区久久 | 亚洲乱码中文字幕综合 | 日韩精品一区二区电影 | 精品视频999| 有码一区二区三区 | 亚洲va欧美va人人爽春色影视 | 91精品对白一区国产伦 | 二区视频在线观看 | 天天av资源| 国产精品视频地址 | 最新av在线免费观看 | 欧美性天天 | 美女视频黄,久久 | 色综合天 | 日韩成人精品一区二区三区 | 免费精品在线观看 | 成人av在线观| 免费在线观看国产精品 | 99视频在线精品国自产拍免费观看 | 国产精品一区二区三区免费视频 | 国产香蕉视频在线播放 | 欧洲高潮三级做爰 | av黄色在线播放 | 色网站免费在线看 | 亚洲网站在线看 | 久久久网站 | 国产精品18久久久久久首页狼 | 欧美精品视 | 日本不卡一区二区 | 天天操天天干天天玩 | 久久综合狠狠 | 亚洲国产mv| 蜜臀aⅴ国产精品久久久国产 | wwwwww黄 | 欧美日韩啪啪 | 亚洲精品视频免费观看 | 91九色在线| 日韩久久精品一区二区 | 夜夜躁天天躁很躁波 | 天天射综合网视频 | 亚洲一级特黄 | 中文字幕一区二区三区视频 | 国产 日韩 中文字幕 | 精品视频亚洲 | 亚洲欧美日韩在线一区二区 | 深夜免费小视频 | 六月激情婷婷 | 久久看片网站 | 日日碰夜夜爽 | 黄色毛片在线看 | www.婷婷com| 国产高清永久免费 | 免费网站污 | 婷婷丁香激情五月 | 麻豆一二三精选视频 | 午夜男人影院 | 国产精品伦一区二区三区视频 | 国产日韩一区在线 | 国产精品 国产精品 | 国产精品网红直播 | 欧美va天堂在线电影 | 91资源在线播放 | 免费看片网址 | 天天插日日插 | 中文字幕在线免费97 | 久久在线一区 | 国产视频网站在线观看 | 成年人三级网站 | 日日干av | 亚洲欧美日韩精品一区二区 | 日本高清中文字幕有码在线 | 中文视频在线播放 | 美女一二三区 | 久久不卡日韩美女 | 看黄色91 | 久久av高清| 六月婷婷网| 国产无遮挡猛进猛出免费软件 | 亚洲全部视频 | av电影一区 | www.久久久久| 五月婷婷视频在线 | 狠狠狠干 | 成人中心免费视频 | 婷婷久久五月 | 欧美日韩一区二区免费在线观看 | 91精品国自产在线 | 五月在线视频 | 欧美日韩啪啪 | 啪啪av在线 | 午夜.dj高清免费观看视频 | 韩日成人av | 欧美久久久久久久久中文字幕 | 国产免费成人 | 国产精品手机在线播放 | 中文字幕在线观看视频一区二区三区 | av女优中文字幕在线观看 | 午夜18视频在线观看 | 欧美a级在线播放 | 国产精品 中文在线 | 日韩精品观看 | 国产精品美女免费视频 | 免费精品在线 | 精品国产免费av | 欧美人体xx | 国产老太婆免费交性大片 | 中文字幕在线精品 | 又黄又爽又色无遮挡免费 | 亚洲最大激情中文字幕 | 欧美久久久久久久久久久 | 国产成人免费观看久久久 | 97色婷婷 | 免费99精品国产自在在线 | 精产嫩模国品一二三区 | 国产乱对白刺激视频不卡 | 91麻豆精品国产91久久久无需广告 | 成人av资源在线 | 丁香婷五月 | 久久久久9999亚洲精品 | 99riav1国产精品视频 | 超碰公开在线观看 | 能在线观看的日韩av | 天天爱av导航 | 免费一级片视频 | 欧美一进一出抽搐大尺度视频 | 天天操狠狠操网站 | 狠狠狠狠狠干 | 日韩精品电影在线播放 | 亚洲情婷婷| 天堂av网址 | 亚洲 综合 国产 精品 | 豆豆色资源网xfplay | 国产精品精 | 日韩一区二区三区免费电影 | 一级一片免费视频 | 中文字幕 国产视频 | 丁香五月网久久综合 | 2019中文最近的2019中文在线 | 国产一区网址 | 精品高清视频 | 中文在线免费视频 | 91| 日韩激情第一页 | 亚洲男人天堂a | 国产综合片 | 亚洲涩涩网站 | 久久一级电影 | 亚洲一二三区精品 | 亚洲黄色免费电影 | 日韩视频在线不卡 | 人人超碰人人 | 欧美精品中文在线免费观看 | 国产成人333kkk | 久久九精品 | 中文字幕乱码亚洲精品一区 | 婷婷六月久久 | 色视频网站在线观看一=区 a视频免费在线观看 | 色视频在线看 | 久久人人爽人人爽人人片 | 久久婷婷五月综合色丁香 | 国产成人精品一区二区三区网站观看 | 毛片精品免费在线观看 | 久久99精品久久久久久三级 | 亚洲午夜精品在线观看 | 免费看的国产视频网站 | 九九视频免费 | 97在线资源| 丁香婷婷激情啪啪 | 激情网第四色 | 国产中文字幕网 | 99久视频| 视频在线观看一区 | 成人动漫一区二区 | 成人欧美一区二区三区在线观看 | 色综合五月 | 怡红院av| 欧美aaa一级 | 免费av视屏| 99热在| 九九在线视频免费观看 | 美女国内精品自产拍在线播放 | 99在线精品观看 | 91在线播放国产 | 日韩视频图片 | 超碰在线人人 | 日本bbbb摸bbbb| 久久成年人 | 日韩欧美精品在线观看 | 日韩精品免费一线在线观看 | 久久久黄色av | 五月天网页 | 亚洲一区二区三区91 | 久久成人综合视频 | 久草新在线 | 久久99亚洲热视 | 国产精品久久久久亚洲影视 | 日韩激情小视频 | 国产精品1000 | 久久私人影院 | 日韩av中文字幕在线 | 免费网站观看www在线观看 | 久久不卡免费视频 | 亚洲精品高清视频在线观看 | 国产成人av片| 成年人在线免费看 | 国产精品剧情 | 337p日本大胆噜噜噜噜 | 国产另类xxxxhd高清 | 国产一区在线免费 | 五月天电影免费在线观看一区 | 日韩在线免费高清视频 | 国产资源网 | 91片在线观看 | 亚洲黄色av网址 | 久久久久久国产精品免费 | 一区免费观看 | 久久久黄色av | 在线电影 一区 | 狠狠操操| 欧美性生交大片免网 | 91色在线观看 | 成人免费视频播放 | 亚洲欧美国内爽妇网 | 综合国产在线观看 | 久久99久久99精品中文字幕 | 亚洲精品美女在线 | 日日夜夜网站 | 91大神精品视频 | 岛国av在线 | 狠狠干免费 | 久热这里有精品 | 久久福利影视 | 天天色官网 | 在线播放 一区 | 黄色特级片 | 国内精品在线一区 | 国产一区二区久久久 | 国产成人久久77777精品 | 色播五月激情五月 | 国产精品免费观看网站 | 国产专区视频在线观看 | 丝袜足交在线 | 亚洲成人av一区 | 日韩在线观看三区 | 欧美精品一区在线 | 天天操夜夜叫 | 久久夜av | 欧美精品久久久久久久久久白贞 | 日韩和的一区二在线 | 日韩三级在线观看 | 中文字幕乱码电影 | 91视频这里只有精品 | 欧美一区二区视频97 | 三级av免费看 | 怡春院av| 亚洲1区 在线 | 天天射天天操天天色 | 黄色av网站在线观看免费 | 日本精a在线观看 | 成人毛片一区二区三区 | 丁香视频在线观看 | 91av网站在线观看 | 精品国模一区二区 | 婷婷成人亚洲综合国产xv88 | 天天干天天拍天天操天天拍 | 国产又黄又爽无遮挡 | 欧美a级在线 | 免费看一级黄色 | 免费网址在线播放 | 亚洲a在线观看 | 天天操天天拍 | 99热这里只有精品在线观看 | 日韩精品久久久 | 涩涩网站免费 | 中文字幕亚洲精品在线观看 | 91看片在线播放 | 免费在线播放黄色 | 亚洲欧美激情精品一区二区 | 成人午夜影视 | 在线亚洲午夜片av大片 | 欧美日韩在线观看一区二区 | 一区二区激情 | 国产在线国偷精品产拍免费yy | 欧美与欧洲交xxxx免费观看 | 91av在线免费观看 | 亚洲欧美婷婷六月色综合 | 国产一区av在线 | 久久久网址 | 在线免费试看 | 天天色天天综合网 | 国产视频欧美视频 | 国产九九在线 | 91在线精品秘密一区二区 | 日韩在线一区二区免费 | 欧美日韩国产色综合一二三四 | 国产 欧美 在线 | 蜜臀精品久久久久久蜜臀 | 超碰在线最新地址 | 少妇bbw揉bbb欧美 | 久久高清 | 久久久在线视频 | www.伊人色.com| 婷婷在线播放 | 久久久免费精品国产一区二区 | 三级视频片 | 色婷婷综合久久久中文字幕 | 亚洲成av人影院 | 国产午夜精品视频 | 亚洲激情综合网 | 99精品在线| 很黄很黄的网站免费的 | 日本精品一 | 91电影福利 | 啪啪凸凸| www视频在线免费观看 | 91热爆在线观看 | 亚洲三级国产 | 天堂av一区二区 | 日韩中文字幕a | www.五月天婷婷.com | 在线观看免费版高清版 | 国产视频91在线 | 日韩精品中文字幕一区二区 | www.日日操.com| 99精品视频在线观看播放 | 日韩一区二区三区高清免费看看 | 日本精品视频一区二区 | 久草热视频 | 国产 一区二区三区 在线 | 久草视频中文在线 | 成人午夜精品久久久久久久3d | 亚洲精品中文字幕在线观看 | 人人搞人人爽 | 深夜免费福利在线 | 精选久久| 国产精品入口麻豆www | 日本三级吹潮在线 | 最近中文字幕大全 | 国产精品 日韩 欧美 | 在线国产不卡 | 国产性xxxx| 欧美成人免费在线 | 四虎成人精品永久免费av | 国产剧情av在线播放 | free. 性欧美.com| 永久免费的啪啪网站免费观看浪潮 | 免费av看片 | 奇米影视8888 | 麻豆91精品| 国产一区二区在线免费播放 | av在线播放观看 | 又黄又刺激的视频 | 91中文在线视频 | 日韩精品久久久久久久电影竹菊 | 日本不卡一区二区三区在线观看 | 成人免费91 | 久久午夜羞羞影院 | 日韩精品久久久久 | 国产在线精品福利 | 欧美久久久久久久久久久 | 久久人人爽人人爽人人片av免费 | 99电影456麻豆 | 天天添夜夜操 | 日韩视频一区二区在线观看 | 精品色综合 | 激情欧美一区二区三区 | 免费看黄在线 | 天天爽夜夜爽人人爽曰av | 亚洲精品中文字幕视频 | 日韩电影一区二区在线观看 | 亚洲激色 | 视频 国产区| 色婷婷影视| 免费成人短视频 | 国内精品99| 最新日韩在线观看视频 | 日韩中文三级 | 久久久久久久久久久久久国产精品 | 国产成人亚洲精品自产在线 | 999在线精品 | 亚洲免费观看在线视频 | 91传媒视频在线观看 | 97超碰人人澡人人爱 | 国产韩国精品一区二区三区 | 国产在线综合视频 | 国产精品一区二区三区在线 | 在线观看午夜av | 一二区精品| 国产高清免费在线观看 | 日韩午夜av电影 | 久久精品草 | 国产人成看黄久久久久久久久 | 久久亚洲精品国产亚洲老地址 | 久久久久99精品国产片 | 久久情网 | 91热视频 | 九色91av| 国产一区二区高清 | 99在线免费视频 | 亚洲国产电影在线观看 | 爱色av.com | 成年人视频在线免费播放 | 久久久久亚洲精品男人的天堂 | 亚洲欧美日韩国产一区二区 | 91综合视频在线观看 | 欧美日韩不卡在线观看 | 国产精品久久二区 | 在线观看黄色免费视频 | 在线看v片| 国产在线观看,日本 | 久久夜色精品国产欧美乱极品 | 国产精品自拍在线 | 久久激情网站 | 九色最新网址 | 亚洲黄污| 91亚洲精品在线观看 | 久爱综合 | 激情综合五月婷婷 | 久久精品国产久精国产 | 一级黄色片在线 | 草久久久 | 中文字幕乱码电影 | 四虎8848免费高清在线观看 | 黄色在线观看污 | 国产精品18videosex性欧美 | 性色xxxxhd| 中文字幕中文 | 色资源网免费观看视频 | 成人免费中文字幕 | www.888.av| av一级二级 | 精品视频免费观看 | 黄色精品国产 | 欧美激情精品久久久久久变态 | 久久影视网 | 亚洲国产精品人久久电影 | 国产无遮挡又黄又爽馒头漫画 | 成人a免费视频 | 99久久精品国产毛片 | 丝袜+亚洲+另类+欧美+变态 | 91桃花视频 | 婷婷成人综合 | 日韩免费中文字幕 | 亚洲精品午夜视频 | 久久久999精品视频 国产美女免费观看 | 国产专区在线播放 | 欧美日韩综合在线 | 黄色大片国产 | 亚洲欧洲久久久 | 久久久久久久99精品免费观看 | 日韩中文三级 | 天天干天天做 | 手机在线永久免费观看av片 | 99国产精品久久久久久久久久 | 四虎成人精品在永久免费 | 国产精品久久久久久久久久久不卡 | 91大神在线观看视频 | 美女视频黄免费网站 | 成人av高清在线观看 | 99精品国产99久久久久久97 | 在线v片免费观看视频 | 国产黄色片免费观看 | 天堂av最新网址 | 免费在线一区二区 | 色吧久久 | 日韩久久久久久久久久 | 欧美日韩国产亚洲乱码字幕 | 日韩一区二区三区观看 | 天天鲁一鲁摸一摸爽一爽 | 视色网站| 在线观看日本高清mv视频 | 亚洲资源在线网 | 欧美日韩亚洲在线观看 | 在线观看亚洲精品 | 在线免费视频a | 丰满少妇麻豆av | 日本护士三级少妇三级999 | 99精品免费 | 国产一级电影在线 | 日本在线观看黄色 | 亚洲黄在线观看 | 午夜私人影院 | 91超国产| 日本黄色大片免费看 | 亚洲精品国产精品99久久 | 国产中文字幕国产 | 国产aa精品| 国产精品精品国产婷婷这里av | 99re久久精品国产 | 成年人黄色在线观看 | 免费a v网站| 国产人成看黄久久久久久久久 | 一二区电影 | 蜜臀av性久久久久av蜜臀妖精 | 日韩高清网站 | www.看片网站 | 国产精品美女免费看 | 亚洲少妇天堂 | 欧美一区二视频在线免费观看 | 亚洲免费av一区二区 | 91久久国产精品 | 日韩网 | 色99中文字幕 | 国产在线a视频 | 精品国内自产拍在线观看视频 | 精品无人国产偷自产在线 | 欧美性色综合网 | 97视频在线观看免费 | 综合精品久久 | 天天天干夜夜夜操 | 欧美国产一区在线 | 成年人毛片在线观看 | 碰超在线 | www色,com| 91视频下载| 欧产日产国产69 | 久久国产亚洲精品 | 日韩午夜精品 | 在线观看视频一区二区三区 | 黄色成年 | 亚洲一二区精品 | 国产一级久久 | 成人免费观看在线视频 | 香蕉在线视频观看 | 天天干,天天射,天天操,天天摸 | 91污污| 在线 欧美 日韩 | 99精品久久久久久久久久综合 | 成人av在线亚洲 | 中文字幕人成乱码在线观看 | 日韩色在线观看 | 视频一区在线免费观看 | 日本性久久 | 91精品在线视频观看 | 黄色激情网址 | 一区二区三区四区免费视频 | 久草视频在线免费 | 狠狠色丁香婷婷综合视频 | 黄色a大片 | 69国产精品视频 | 国产黄a三级三级三级三级三级 | 亚洲资源在线观看 | 五月综合网站 | 午夜av大片| 国产精品96久久久久久吹潮 | 97视频在线| 超碰人人草人人 | 五月天色婷婷丁香 | 黄视频网站大全 | 激情五月婷婷综合网 | 中文字幕一区二区三区四区在线视频 | 亚洲欧美在线观看视频 | 日韩欧美高清不卡 | 黄色成人在线 | 99久久久久久久久 | 国产成人精品久久久久蜜臀 | 亚洲精品国产精品国自 | 日本最新中文字幕 | 国产一级久久久 | 久久久久亚洲天堂 | 日韩在线免费视频 | 在线观看v片 | 亚洲一级片在线观看 | 亚洲成免费 | 成人黄色小视频 | 中文av影院 | 美州a亚洲一视本频v色道 | 亚欧洲精品视频在线观看 | 中文字幕一区在线 | 男女激情片在线观看 | 国产精品久久电影网 | 亚洲国产精品va在线看 | 日韩亚洲国产精品 | 亚洲少妇xxxx | 亚洲午夜电影网 | 国产在线一区二区 | 久久久三级视频 | 日韩一二三 | 波多野结衣在线观看一区 | 色噜噜在线观看 | 国产精品1024 | 国产在线超碰 | 久久久久久久久精 | 亚洲丝袜一区 | 五月婷婷视频在线观看 | 日韩网站在线看片你懂的 | 久久亚洲综合国产精品99麻豆的功能介绍 | 一区在线观看 | 四月婷婷在线观看 | 成人99免费视频 | 在线看的毛片 | 亚洲一区二区三区四区在线视频 | 国产探花 | 国产资源在线免费观看 | 在线观看久久 | 国产精品免费观看视频 | 天天干夜夜夜 | 亚洲精品一区二区在线观看 | 国产无套一区二区三区久久 | 精品一区二区三区久久久 | 国产黄色大片免费看 | 亚洲欧洲精品一区二区精品久久久 | 亚洲中字幕| 欧美成年人在线观看 | 亚洲在线观看av | 国产精品视频永久免费播放 | 国产精品网在线观看 | 国产精品黄色影片导航在线观看 | 人人射人人爱 | 国产精品免费在线视频 | 天天爽夜夜爽人人爽一区二区 | 91桃色国产在线播放 | 韩日成人av| 一本色道久久综合亚洲二区三区 | 欧美成人h版在线观看 | 久久久三级视频 | 中文字幕a∨在线乱码免费看 | 激情深爱五月 |