MQ 之 RocketMQ
前些天發(fā)現(xiàn)了一個(gè)巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點(diǎn)擊跳轉(zhuǎn)到教程。
RocketMQ 是出自 A 公司的開源產(chǎn)品,用 Java 語言實(shí)現(xiàn),在設(shè)計(jì)時(shí)參考了 Kafka,并做出了自己的一些改進(jìn),消息可靠性上比 Kafka 更好,目前,RocketMQ 的文檔仍然不夠豐富?1?2,社區(qū)仍然無法與 Kafka 比肩,但 A 公司已經(jīng)推出了基于 RocketMQ 的云產(chǎn)品?3,相信未來 RocketMQ 也會(huì)有不錯(cuò)的發(fā)展。本文采用 RocketMQ 3.2.6 進(jìn)行實(shí)驗(yàn),由于 RocketMQ 與 Kafka 很相似,本文很多地方對(duì)兩者做出了比較。
基本概念
RocketMQ 由于借鑒了 Kafka 的設(shè)計(jì),包括組件的命名也很多與 Kafka 相似,下面摘抄一段《RocketMQ 原理簡(jiǎn)介》中的介紹,可以與 Kafka 的命名比對(duì)一下,
- Producer,消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息。
- Consumer,消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。
- Push Consumer,Consumer 的一種,應(yīng)用通常向 Consumer 對(duì)象注冊(cè)一個(gè) Listener 接口,一旦收到消息,Consumer 對(duì)象立 刻回調(diào) Listener 接口方法。
- Pull Consumer,Consumer 的一種,應(yīng)用通常主動(dòng)調(diào)用 Consumer 的拉消息方法從 Broker 拉消息,主動(dòng)權(quán)由應(yīng)用控制。
- Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發(fā)送一類消息,且發(fā)送邏輯一致。
- Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)一類消息,且消費(fèi)邏輯一致。
- Broker,消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息,一般也稱為 Server。在 JMS 規(guī)范中稱為 Provider。
《RocketMQ 原理簡(jiǎn)介》中還介紹了一些其他的概念,例如,廣播消費(fèi)和集群消費(fèi),廣播消費(fèi)是 Consumer Group 中對(duì)于同一條消息每個(gè) Consumer 都消費(fèi),集群消費(fèi)是 Consumer Group 中對(duì)于同一條消息只有一個(gè) Consumer 消費(fèi)。Kafka 采用的是集群消費(fèi),不支持廣播消費(fèi)(好吧,是我沒有找到)。再例如,普通順序消息和嚴(yán)格順序消息,普通順序消息在 Broker 重啟情況下不會(huì)保證消息順序性;嚴(yán)格順序消息即使在異常情況下也會(huì)保證消息的順序性。個(gè)人理解,所謂普通順序消息,應(yīng)該就是 Kafka 中的 Partition 級(jí)別有序,嚴(yán)格順序消息,應(yīng)該是 Topic 級(jí)別有序,但文中也提到,這樣的有序級(jí)別是要付出代價(jià)的,Broker 集群中只要有一臺(tái)機(jī)器不可用,則整個(gè)集群都不可用,降低服務(wù)可用性。使用這種模式,需要依賴同步雙寫,主備自動(dòng)切換,但自動(dòng)切換功能目前還未實(shí)現(xiàn)(我猜,自動(dòng)切換僅僅是沒開源吧)。說白了,嚴(yán)格順序消息不具備生產(chǎn)可用性,自己玩玩還行,其應(yīng)用場(chǎng)景主要是數(shù)據(jù)庫 binlog 同步。
關(guān)于 RocketMQ 和 Kafka 的對(duì)比,可以參考 RocketMQ Wiki 中的文章?4,看看就行,不必較真。
關(guān)于順序和分區(qū)
順序性的話題,剛才已經(jīng)提到了一些,RocketMQ 的實(shí)現(xiàn)應(yīng)該不弱于 Kafka。對(duì)于分區(qū),RocketMQ 似乎有意弱化了這個(gè)概念,只有在 Producer 中有一個(gè)參數(shù)?defaultTopicQueueNums,分區(qū)在 RocketMQ 中有時(shí)被稱為隊(duì)列。RocketMQ 的普通順序消息模式,應(yīng)該就是分區(qū)順序性,這點(diǎn)與 Kafka 一致。
關(guān)于高可用
RocketMQ 實(shí)現(xiàn)高可用的方式有多種,《RocketMQ 用戶指南》文檔中提到的有:多主模式、多主多從異步復(fù)制模式、多主多從同步復(fù)制模式。多主模式下,性能較好,但是在 Broker 宕機(jī)的時(shí)候,該 Broker 上未消費(fèi)的交易不可消費(fèi);多主多從異步復(fù)制模式,與 Kafka 的副本模式比較類似,主 Broker 宕機(jī)后,會(huì)自動(dòng)切換到從 Broker,消息的消費(fèi)不會(huì)出現(xiàn)間斷;多主多從同步復(fù)制模式更進(jìn)一步,采用同步刷盤的方式,避免了主 Broker 宕機(jī)帶來的消息丟失,但是,目前不支持自動(dòng)切換。
雖然 RocketMQ 提供了多種高可用方式,但是目前能生產(chǎn)使用的就只有多主多從異步復(fù)制模式,即使在這個(gè)模式上,其實(shí)現(xiàn)也比 Kafka 要差。因?yàn)?RocketMQ 的機(jī)制中,主從關(guān)系是人為指定的,主 Broker 上承擔(dān)所有的消息派發(fā),而 Kafka 的主從關(guān)系是通過選舉的方式選出來的,每個(gè)分區(qū)的主節(jié)點(diǎn)都是不一樣的,可以從不同的節(jié)點(diǎn)派發(fā)消息。Kafka 的模式是分散模式,有利于負(fù)載均衡,而且當(dāng)一個(gè) Broker 宕機(jī)的時(shí)候,只影響部分 Topic,而 RocketMQ 一旦主 Broker 宕機(jī),會(huì)影響所有的 Topic。另外,Kafka 可以支持 Broker 間同步復(fù)制(通過設(shè)置 Broker 的?acks?參數(shù)),這樣比的話,RocketMQ 就差太多了。
關(guān)于 RocketMQ 的介紹,網(wǎng)上的文章不算太多,也比較雜,《分布式開放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐》5?6?7這篇原理介紹的不錯(cuò),推薦。
RocketMQ 的工具和編程接口
RocketMQ 的工具
相比較 Kafka 而言,RocketMQ 提供的工具要少一些,如下,
bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown除了進(jìn)程啟停之外,常用的運(yùn)維命令都在?mqadmin?中,詳見《RocketMQ 運(yùn)維指令》文檔。我實(shí)驗(yàn)中常用的一些命令如下,
sh mqnamesrv &sh mqbroker -c async-broker-a.properties &sh mqbroker -c async-broker-a-s.properties &sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4RocketMQ 使用了自己的 name server 來做調(diào)度(Kafka 用了 Zookeeper),使用?sh mqnamesrv?來啟動(dòng),默認(rèn)監(jiān)聽端口9876,sh mqnamesrv -m?可以查看所有默認(rèn)參數(shù),使用?-c xxxx.properties?參數(shù)來指定自定義配置。sh mqbroker?是用于啟動(dòng) Broker 的命令,參數(shù)比較多,詳細(xì)可以通過?sh mqbroker -m?查看默認(rèn)參數(shù),配置項(xiàng)細(xì)節(jié)后文再說。sh mqadmin?是運(yùn)維命令入口,topicList?是列出所有 Topic;topicRoute?是列出單個(gè) Topic 的詳細(xì)信息;clusterList?是列出集群的信息;deleteTopic?是刪除 Topic。consumerProgress?是查看消費(fèi)者消費(fèi)進(jìn)度,deleteSubGroup?是刪除消費(fèi)者的訂閱,consumerConnection?是查詢消費(fèi)者訂閱的情況。
Broker 的配置是最多的,實(shí)驗(yàn)中我修改到的部分如下,其他使用默認(rèn),
brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23brokerName=broker-abrokerId=0namesrvAddr=192.168.232.23:9876listenPort=10911deleteWhen=04fileReservedTime=120storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH配置文件中的多數(shù)配置看例子就可以知道意思,挑幾個(gè)說一下。brokerName?和?brokerId, 同名的 Broker,ID 是0的是主節(jié)點(diǎn),其他是從節(jié)點(diǎn);deleteWhen,刪除文件時(shí)間點(diǎn),默認(rèn)凌晨4點(diǎn);fileReservedTime,文件保留時(shí)間,設(shè)置為120小時(shí);brokerRole,Broker 的角色,ASYNC_MASTER 是異步復(fù)制主節(jié)點(diǎn),SYNC_MASTER 是同步雙寫主節(jié)點(diǎn),SLAVE 是備節(jié)點(diǎn)。
其實(shí),這些工具的寫法也基本一致,都是先做一些檢查,最后運(yùn)行 Java 程序,JVM 系統(tǒng)上的應(yīng)用應(yīng)該差不多都這樣。
RocketMQ 的 Java API
RocketMQ 是用 Java 語言開發(fā)的,因此,其 Java API 相對(duì)是比較豐富的,當(dāng)然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,
單看功能的話,即使不算事務(wù)消息,也不算 Tag,RocketMQ 也遠(yuǎn)超 Kafka,Kafka 應(yīng)該只實(shí)現(xiàn)了 Pull 模式消費(fèi) + 順序消費(fèi)這2個(gè)功能。RocketMQ 的代碼示例在 rocketmq-example 中,注意,代碼是不能直接運(yùn)行的,因?yàn)樗械拇a都少了設(shè)置 name server 的部分,需要自己手動(dòng)加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");。
先來看一下生產(chǎn)者的 API,比較簡(jiǎn)單,只有一種,如下,
import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.23:9876");producer.start();for (int i = 0; i < 10; i++)try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID188",// key("RocketMQ "+String.format("%05d", i)).getBytes());// bodySendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format("%05d", i)+sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}可以發(fā)現(xiàn),相比 Kafka 的 API,只多了 Tag,但實(shí)際上行為有很大不同。Kafka 的生產(chǎn)者客戶端,有同步和異步兩種模式,但都是阻塞模式,send?方法返回發(fā)送狀態(tài)的?Future,可以通過?Future?的?get?方法阻塞獲得發(fā)送狀態(tài)。而 RocketMQ 采用的是同步非阻塞模式,發(fā)送之后立刻返回發(fā)送狀態(tài)(而不是?Future)。正常情況下,兩者使用上差別不大,但是在高可用場(chǎng)景中發(fā)生主備切換的時(shí)候,Kafka 的同步可以等待切換完成并重連,最后返回;而 RocketMQ 只能立刻報(bào)錯(cuò),由生產(chǎn)者選擇是否重發(fā)。所以,在生產(chǎn)者的 API 上,其實(shí) Kafka 是要強(qiáng)一些的。
另外,RocketMQ 可以通過指定?MessageQueueSelector?類的實(shí)現(xiàn)來指定將消息發(fā)送到哪個(gè)分區(qū)去,Kafka 是通過指定生產(chǎn)者的?partitioner.class?參數(shù)來實(shí)現(xiàn)的,靈活性上 RocketMQ 略勝一籌。
再來看消費(fèi)者的API,由于 RocketMQ 的功能比較多,我們先看 Pull 模式消費(fèi)的API,如下,
import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {long offset = consumer.fetchConsumeOffset(mq, true);PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null != pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}這部分的 API 其實(shí)是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分區(qū),而 Kafka 可以自動(dòng)管理(當(dāng)然也可以手動(dòng)管理),并且不需要指定分區(qū)(分區(qū)是在 Kafka 訂閱的時(shí)候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore?接口,提供了兩種管理方式,本地文件和遠(yuǎn)程 Broker。這部分感覺兩者差不多。
下面再看看 Push 模式順序消費(fèi),代碼如下,
import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}雖然提供了 Push 模式,RocketMQ 內(nèi)部實(shí)際上還是 Pull 模式的 MQ,Push 模式的實(shí)現(xiàn)應(yīng)該采用的是長(zhǎng)輪詢,這點(diǎn)與 Kafka 一樣。使用該方式有幾個(gè)注意的地方,
控制 offset 提交這個(gè)特性非常有用,某種程度上擴(kuò)展一下,就可以當(dāng)做事務(wù)來用了,看代碼?ConsumeMessageOrderlyService?的實(shí)現(xiàn),其實(shí)并沒有那么復(fù)雜,在不啟用 AutoCommit 的時(shí)候,只有返回?COMMIT?才 commit offset;啟用 AutoCommit 的時(shí)候,返回?COMMIT、ROLLBACK(這個(gè)比較扯)、SUCCESS?的時(shí)候,都 commit offset。
后來發(fā)現(xiàn),commit offset 功能在 Kafka 里面也有提供,使用新的 API,調(diào)用?consumer.commitSync。
再看一個(gè) Push 模式亂序消費(fèi) + 消息過濾的例子,消費(fèi)者的代碼如下,
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
接收消息的監(jiān)聽類使用的是?MessageListenerConcurrently;這個(gè)例子與之前順序消費(fèi)不同的地方在于,
消息過濾類?MessageFilterImpl?的代碼如下,
import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg) {String property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if ((id % 3) == 0 && (id > 10)) {return true;}}return false;}}RocketMQ 執(zhí)行過濾是在 Broker 端,Broker 所在的機(jī)器會(huì)啟動(dòng)多個(gè) FilterServer 過濾進(jìn)程;Consumer 啟動(dòng)后,會(huì)向 FilterServer 上傳一個(gè)過濾的 Java 類;Consumer 從 FilterServer 拉消息,FilterServer 將請(qǐng)求轉(zhuǎn)發(fā)給 Broker,FilterServer 從 Broker 收到消息后,按照 Consumer 上傳的 Java 過濾程序做過濾,過濾完成后返回給 Consumer。這種過濾方法可以節(jié)省網(wǎng)絡(luò)流量,但是增加了 Broker 的負(fù)擔(dān)。可惜我沒有實(shí)驗(yàn)出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結(jié)了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個(gè)很方便的功能。
還有一種廣播消費(fèi)模式,比較簡(jiǎn)單,可以去看代碼,不再列出。
總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。
RocketMQ 的主備模式
按之前所說,只有 RocketMQ 的多主多從異步復(fù)制是可以生產(chǎn)使用的,因此只在這個(gè)場(chǎng)景下測(cè)試。另外,消息采用 Push 順序模式消費(fèi)。
假設(shè)集群采用2主2備的模式,需要啟動(dòng)4個(gè) Broker,配置文件如下,
brokerName=broker-abrokerId=0listenPort=10911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERbrokerName=broker-abrokerId=1listenPort=10921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRole=SLAVEbrokerName=broker-bbrokerId=0listenPort=20911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRole=ASYNC_MASTERbrokerRole=ASYNC_MASTERbrokerName=broker-bbrokerId=1listenPort=20921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRole=SLAVE另外,每個(gè)機(jī)構(gòu)共通的配置項(xiàng)如下,
brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23namesrvAddr=192.168.232.23:9876deleteWhen=04fileReservedTime=120flushDiskType=ASYNC_FLUSH其他設(shè)置均采用默認(rèn)。啟動(dòng) NameServer 和所有 Broker,并試運(yùn)行一下 Producer,然后看一下 TestTopic1 當(dāng)前的情況,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}可見,TestTopic1 在2個(gè) Broker 上,且每個(gè) Broker 備機(jī)也在運(yùn)行。下面開始主備切換的實(shí)驗(yàn),分別啟動(dòng) Consumer 和 Producer 進(jìn)程,消息采用 Pull 順序模式消費(fèi)。在消息發(fā)送接收過程中,使用?kill -9?停掉?broker-a?的主進(jìn)程,模擬突然宕機(jī)。此時(shí),TestTopic1 的狀態(tài)如下,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}此時(shí),RocketMQ 已經(jīng)恢復(fù)。
再來看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,
......00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]......01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]日志中顯示,在發(fā)送完00583條消息之后,開始發(fā)生異常?connect to <192.168.232.23:10911> failed,原因應(yīng)該是?broker-a?的主節(jié)點(diǎn)被 kill 掉。之后,從00596條消息開始,RocketMQ 又恢復(fù)正常,原因是?broker-b?已經(jīng)開始提供服務(wù),承擔(dān)了所有的工作。然后,又重新啟動(dòng)了?broker-a?主節(jié)點(diǎn),由于該節(jié)點(diǎn)的加入,從01392條消息開始,broker-a?又開始恢復(fù)工作。實(shí)驗(yàn)中可以驗(yàn)證,RocketMQ 所謂的多主多備模式,實(shí)際上,備機(jī)被弱化到無以復(fù)加,在主節(jié)點(diǎn)宕機(jī)的時(shí)候,備機(jī)無法接替主機(jī)的工作,而只是將尚未發(fā)送的數(shù)據(jù)發(fā)送出去,由剩下的主節(jié)點(diǎn)接替工作。也就是說,N 主 N 備的 RocketMQ 集群中,總共有 2N 臺(tái)機(jī)器,實(shí)際工作的只有 N 臺(tái),如果有一臺(tái)掛了,就只有 N-1 臺(tái)工作了,機(jī)器的利用率太低了。
再來看一下 Consumer 的日志,如下,
RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]......RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]可以看到,Consumer 在?broker-a?宕機(jī)時(shí)間的附近,也出現(xiàn)了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分區(qū)上的順序性,但是已經(jīng)某種程度上出現(xiàn)了一些紊亂,例如,將我在實(shí)驗(yàn)之前的數(shù)據(jù)給取了出來(Hello MetaQ的消息)。可是,我在實(shí)驗(yàn)前,明明做過刪除這個(gè) Topic 的動(dòng)作,看來 RocketMQ 所謂的刪除,并未刪除 Topic 的數(shù)據(jù)。之后,broker-a?主機(jī)重啟之后,又恢復(fù)正常。
RocketMQ Pull模式消費(fèi)需要手動(dòng)管理 offset 和指定分區(qū),這個(gè)在調(diào)用的時(shí)候不覺得,實(shí)際運(yùn)行的時(shí)候才會(huì)發(fā)現(xiàn)每次總是消費(fèi)一個(gè)分區(qū),消費(fèi)完之后,才開始消費(fèi)下一個(gè)分區(qū),而下一個(gè)分區(qū)可能已經(jīng)堆積了很多消息了,手動(dòng)做消息分配又比較費(fèi)事。或許,Push 順序模式消費(fèi)才是更好的選擇。
另外還有幾個(gè)比較異常的情況,實(shí)驗(yàn)中有幾次出現(xiàn)了?CODE: 17 DESC: topic[TopicTest1] not exist, apply first please!?這樣的錯(cuò)誤,實(shí)際上,這時(shí)候我只是關(guān)掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1?明明文檔中說可以用來新增 Topic,而實(shí)際上不行。
補(bǔ)充一下:之后,我又使用 Push 順序模式消費(fèi)重做了上述實(shí)驗(yàn),結(jié)論差不多。只是因?yàn)橛卸嗑€程的原因,日志看起來偶爾有錯(cuò)位,這個(gè)問題不大,可以解決。而且,在關(guān)閉重啟 Broker 的附近,往往伴隨著多次的消息重發(fā),不過,RocketMQ 也不保證消息只收到一次就是了。消息重復(fù)的問題,Kafka 要比 RocketMQ 顯得不那么嚴(yán)重一些。Push 順序模式消費(fèi)不需要指定 offset,不需要指定分區(qū),第二次啟動(dòng)可以自動(dòng)從前一次的 offset 后開始消費(fèi)。功能上這個(gè)與 Kafka 的 Consumer 更類似,雖然 RocketMQ 采用的是異步模式。
RocketMQ 最佳實(shí)踐
實(shí)際上,RocketMQ 自己就有一份《RocketMQ 最佳實(shí)踐》的文檔,里面提到了一些系統(tǒng)設(shè)計(jì)的問題,例如消費(fèi)者要冪等,一個(gè)應(yīng)用對(duì)應(yīng)一個(gè) Topic,如此等等。這些經(jīng)驗(yàn)不僅僅是對(duì) RocketMQ 有用,對(duì) Kafka 也頗有借鑒意義。
后記
這里談?wù)勎覍?duì)選擇 RocketMQ 還是 Kafka 的個(gè)人建議。以上已經(jīng)做了多處 RocketMQ 和 Kafka 的對(duì)比,我個(gè)人覺得,Kafka 是一個(gè)不斷發(fā)展中的系統(tǒng),開源社區(qū)比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經(jīng)有了同步復(fù)制,消息可靠性更有保障;還有,Kafka 的分區(qū)機(jī)制,幾乎實(shí)現(xiàn)了自動(dòng)負(fù)載均衡,這絕對(duì)是個(gè)殺手級(jí)特性;RocketMQ 雖然提供了很多易用的功能,遠(yuǎn)超出 Kafka,但這些功能并不一定都能用得上,而且多數(shù)可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復(fù)的時(shí)候,細(xì)節(jié)上要?jiǎng)龠^ RocketMQ。當(dāng)然,如果是 A 公司內(nèi)部,或者所在公司使用了 A 公司的云產(chǎn)品,那么 RocketMQ 的企業(yè)級(jí)特性更多一些,或許我會(huì)選擇 RocketMQ。
?
轉(zhuǎn)自:http://valleylord.github.io/post/201607-mq-rocketmq/
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的MQ 之 RocketMQ的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 自动化设备的软件框架
- 下一篇: 查看、关闭当前服务器上启动服务 / 进程