RocketMQ学习(一):简介和QuickStart
RocketMQ是什么?
引用官方描述:
RocketMQ是一款分布式、隊(duì)列模型的消息中間件,具有以下特點(diǎn):
支持嚴(yán)格的消息順序
支持Topic與Queue兩種模式
億級(jí)消息堆積能力
比較友好的分布式特性
同時(shí)支持Push與Pull方式消費(fèi)消息
歷經(jīng)多次天貓雙十一海量消息考驗(yàn)
RocketMQ是純java編寫,基于通信框架Netty。
代碼地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。
下載完代碼后,將各個(gè)模塊導(dǎo)入eclipse,本地嘗試啟動(dòng)看看。
1.啟動(dòng)nameServer,運(yùn)行rocketmq-namesrv的NamesrvStartup,運(yùn)行之前需設(shè)置環(huán)境變量ROCKETMQ_HOME為RocketMQ項(xiàng)目的根目錄,這樣有一個(gè)作用是,指向logback的配置文件路徑,保證在nameServer啟動(dòng)時(shí),logback的正常初始化。我本機(jī)設(shè)置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示啟動(dòng)成功。
2.啟動(dòng)brokerServer,運(yùn)行rocketmq-broker的BrokerStartup,同樣,運(yùn)行之前需設(shè)置環(huán)境變量ROCKETMQ_HOME,然后啟動(dòng)參數(shù)需要帶上【-n “192.168.0.109:9876″】,我本機(jī)的ip是192.168.0.109。如果不帶-n的參數(shù),那么broker會(huì)去訪問http://jmenv.tbsite.net:8080/rocketmq/nsaddr獲取nameServer的地址,這個(gè)地址不是我們自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。
3.這個(gè)非必選項(xiàng),不運(yùn)行也可以。還可以啟動(dòng)rocketmq-srvutil的FiltersrvStartup,這是Consumer使用Java代碼,在服務(wù)器做消息過濾。啟動(dòng)方式和broker一樣,具體的過濾原理以后再詳細(xì)的說。
到此就可以運(yùn)行demo了。
pom.xml依賴:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | <dependencies> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> |
如果依賴包下載不下來,再給個(gè)倉庫地址,開源中國的:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | <repositories> <repository> <id>nexus</id> <name>Nexus</name> <url>http://maven.oschina.net/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> |
貼代碼:
Producer
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.zoo.quickstart; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * Producer,發(fā)送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.0.109:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); Thread.sleep(3000); } } producer.shutdown(); } } |
Consumer
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package com.zoo.quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.0.109:9876"); /** * 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)<br> * 如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi) */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); System.out.println(" Receive Message Size: " + msgs.size()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } } |
因?yàn)閐emo代碼來自于rocketmq-example,所以沒有上傳Github。
ps:以前rocketmq在Github開源的時(shí)候沒有學(xué)習(xí),后來突然有一天發(fā)現(xiàn)Github上404了,心里后悔莫急,這次rocketmq重新開源出來,一定不能錯(cuò)過了。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ学习(一):简介和QuickStart的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 十七UML核心视图动态视图之时序图
- 下一篇: 交互设计新人的核心竞争力