RocketMQ学习(一):简介和QuickStart
RocketMQ是什么?
引用官方描述:
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
支持嚴格的消息順序
支持Topic與Queue兩種模式
億級消息堆積能力
比較友好的分布式特性
同時支持Push與Pull方式消費消息
歷經多次天貓雙十一海量消息考驗
RocketMQ是純java編寫,基于通信框架Netty。
代碼地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。
下載完代碼后,將各個模塊導入eclipse,本地嘗試啟動看看。
1.啟動nameServer,運行rocketmq-namesrv的NamesrvStartup,運行之前需設置環境變量ROCKETMQ_HOME為RocketMQ項目的根目錄,這樣有一個作用是,指向logback的配置文件路徑,保證在nameServer啟動時,logback的正常初始化。我本機設置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示啟動成功。
2.啟動brokerServer,運行rocketmq-broker的BrokerStartup,同樣,運行之前需設置環境變量ROCKETMQ_HOME,然后啟動參數需要帶上【-n “192.168.0.109:9876″】,我本機的ip是192.168.0.109。如果不帶-n的參數,那么broker會去訪問http://jmenv.tbsite.net:8080/rocketmq/nsaddr獲取nameServer的地址,這個地址不是我們自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。
3.這個非必選項,不運行也可以。還可以啟動rocketmq-srvutil的FiltersrvStartup,這是Consumer使用Java代碼,在服務器做消息過濾。啟動方式和broker一樣,具體的過濾原理以后再詳細的說。
到此就可以運行demo了。
pom.xml依賴:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | <dependencies> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> |
如果依賴包下載不下來,再給個倉庫地址,開源中國的:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | <repositories> <repository> <id>nexus</id> <name>Nexus</name> <url>http://maven.oschina.net/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> |
貼代碼:
Producer
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.zoo.quickstart; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * Producer,發送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.0.109:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); Thread.sleep(3000); } } producer.shutdown(); } } |
Consumer
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package com.zoo.quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.0.109:9876"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); System.out.println(" Receive Message Size: " + msgs.size()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } } |
因為demo代碼來自于rocketmq-example,所以沒有上傳Github。
ps:以前rocketmq在Github開源的時候沒有學習,后來突然有一天發現Github上404了,心里后悔莫急,這次rocketmq重新開源出來,一定不能錯過了。
總結
以上是生活随笔為你收集整理的RocketMQ学习(一):简介和QuickStart的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 十七UML核心视图动态视图之时序图
- 下一篇: 交互设计新人的核心竞争力