日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ学习(一):简介和QuickStart

發布時間:2025/5/22 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ学习(一):简介和QuickStart 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ是什么?

引用官方描述:
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:

支持嚴格的消息順序
支持Topic與Queue兩種模式
億級消息堆積能力
比較友好的分布式特性
同時支持Push與Pull方式消費消息
歷經多次天貓雙十一海量消息考驗

RocketMQ是純java編寫,基于通信框架Netty。

代碼地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。

下載完代碼后,將各個模塊導入eclipse,本地嘗試啟動看看。

1.啟動nameServer,運行rocketmq-namesrv的NamesrvStartup,運行之前需設置環境變量ROCKETMQ_HOME為RocketMQ項目的根目錄,這樣有一個作用是,指向logback的配置文件路徑,保證在nameServer啟動時,logback的正常初始化。我本機設置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示啟動成功。

2.啟動brokerServer,運行rocketmq-broker的BrokerStartup,同樣,運行之前需設置環境變量ROCKETMQ_HOME,然后啟動參數需要帶上【-n “192.168.0.109:9876″】,我本機的ip是192.168.0.109。如果不帶-n的參數,那么broker會去訪問http://jmenv.tbsite.net:8080/rocketmq/nsaddr獲取nameServer的地址,這個地址不是我們自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。

3.這個非必選項,不運行也可以。還可以啟動rocketmq-srvutil的FiltersrvStartup,這是Consumer使用Java代碼,在服務器做消息過濾。啟動方式和broker一樣,具體的過濾原理以后再詳細的說。

到此就可以運行demo了。

pom.xml依賴:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<dependencies>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.0.13</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>3.2.2</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

如果依賴包下載不下來,再給個倉庫地址,開源中國的:

1

2

3

4

5

6

7

8

9

10

11

12

13

<repositories>

<repository>

<id>nexus</id>

<name>Nexus</name>

<url>http://maven.oschina.net/content/groups/public/</url>

<releases>

<enabled>true</enabled>

</releases>

<snapshots>

<enabled>true</enabled>

</snapshots>

</repository>

</repositories>

貼代碼:
Producer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,發送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.109:9876");

producer.start();

for (int i = 0; i < 1000; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(3000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}

Consumer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

package com.zoo.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* Consumer,訂閱消息

*/

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("192.168.0.109:9876");

/**

* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>

* 如果非第一次啟動,那么按照上次消費的位置繼續消費

*/

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyContext context) {

System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

System.out.println(" Receive Message Size: " + msgs.size());

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

因為demo代碼來自于rocketmq-example,所以沒有上傳Github。

ps:以前rocketmq在Github開源的時候沒有學習,后來突然有一天發現Github上404了,心里后悔莫急,這次rocketmq重新開源出來,一定不能錯過了。


總結

以上是生活随笔為你收集整理的RocketMQ学习(一):简介和QuickStart的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。