日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq 初探(一)

發布時間:2023/12/10 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq 初探(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大家好,我是烤鴨:

????今天看下rocketmq。這篇主要是簡單介紹下 rocketmq以及idea 本地調試 rocketmq。

項目架構

感興趣的可以下載源碼看下。

https://github.com/apache/rocketmq

項目結構圖。

rocketmq-acl: acl 秘鑰方式的鑒權,用在broker端。
rocketmq-broker:整個mq的核心,他能夠接受producer和consumer的請求,并調用store層服務對消息進行處理。HA服務的基本單元,支持同步雙寫,異步雙寫等模式。
rocketmq-client:mq客戶端實現,目前官方僅僅開源了java版本的mq客戶端,c++,go客戶端有社區開源貢獻。
rocketmq-common:一些模塊間通用的功能類,比如一些配置文件、常量。
rocketmq-distribution:腳本、配置模塊。
rocketmq-example:官方提供的例子。
rocketmq-filtersrv:消息過濾服務,相當于在broker和consumer中間加入了一個filter代理。
rocketmq-logappender:日志
rocketmq-logging:日志
rocketmq-namesrv:NameServer,類似服務注冊中心,broker在這里注冊,consumer和producer在這里找到broker地址
rocketmq-openmessaging:RocketMQ支持openmessaging,詳見:https://rocketmq.apache.org/docs/openmessaging-example/
rocketmq-remoting:基于netty的底層通信實現,所有服務間的交互都基于此模塊。
rocketmq-srvut:解析命令行的工具類。
rocketmq-store:存儲層實現,同時包括了索引服務,高可用HA服務實現。
rocketmq-tools:命令行工具,提供了消息查詢等功能。

下面重點說一下幾個模塊:

注冊中心 namesrv、broker、client 和 store,先看一下關系。

看這個圖是不是有點相似,沒錯,跟 dubbo 很像,除了多了 broker。

nameserver 是注冊中心,用來記錄broker信息、broker和topic關系。

producer 從nameserver 獲取broker信息,進行消息發送。

consumer 從nameserver 獲取broker信息,進行消息消費。

idea 導入源碼,本地調試

設置 rocketmq _home 目錄,后邊的namesrv和broker會用到。新建conf目錄,并將 rocket-distribution 的conf里的broker.conf、logback_broker.xml、

logback_namesrv.xml、logback_tools.xml 復制到新建的conf目錄中。我這里設置的目錄是 E:\my\rocketmq

我這里修改了日志目錄,方便查看日志。

啟動 NamesrvStartup

Connected to the target VM, address: '127.0.0.1:58819', transport: 'socket' Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation Disconnected from the target VM, address: '127.0.0.1:58819', transport: 'socket'

啟動參數配置 rocketmq_home 的環境變量,ROCKETMQ_HOME=E:\my\rocketmq

啟動成功:

Connected to the target VM, address: '127.0.0.1:50261', transport: 'socket' The Name Server boot success. serializeType=JSON

會發現 rocketmq_home 目錄下生成了 logs/rocketmqlogs 目錄,存放的是日志文件。

啟動broker

設置啟動參數和 rocketmq_home 的環境變量 :

autoCreateTopicEnable=true 是為了測試的時候可以發送時創建topic,默認是 false(不建議開啟,避免并發發送時,topic重復問題)

-c E:\my\rocketmq\conf\broker.conf -n localhost:9876 autoCreateTopicEnable=true ROCKETMQ_HOME=E:\my\rocketmq

會發現 rocketmq_home 目錄下生成了 store 目錄,存放的是broker維護的信息,像消費者的偏移量、延遲隊列的偏移量、topic。

啟動consumer

rocketmq-example 項目下,example\src\main\java\org\apache\rocketmq\example\quickstart\Consumer.java

指定broker地址:

consumer.setNamesrvAddr("localhost:9876");

啟動producer并發送消息

rocketmq-example 項目下,example\src\main\java\org\apache\rocketmq\example\quickstart\Producer.java

指定broker地址,修改循環次數為2次:

producer.setNamesrvAddr("localhost:9876");

發送成功:

SendResult [sendStatus=SEND_OK, msgId=7F000001395C18B4AAC22C7A99940000, offsetMsgId=0AA80D1200002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=7F000001395C18B4AAC22C7A99D80001, offsetMsgId=0AA80D1200002A9F00000000000000C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

消費端接收成功:

ConsumeMessageThread_1 Receive_1 New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1625815032213, bornHost=/10.168.13.18:57729, storeTimestamp=1625815032241, storeHost=/10.168.13.18:10911, msgId=0AA80D1200002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1625815055025, UNIQ_KEY=7F000001395C18B4AAC22C7A99940000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] ConsumeMessageThread_2 Receive_1 New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1625815032280, bornHost=/10.168.13.18:57729, storeTimestamp=1625815032282, storeHost=/10.168.13.18:10911, msgId=0AA80D1200002A9F00000000000000C9, commitLogOffset=201, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1625815056025, UNIQ_KEY=7F000001395C18B4AAC22C7A99D80001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]

當發送的時候 store目錄下會生成 commitLog 目錄(消息內容)和consumequeue目錄(存的是topic和queueId)

commitLog目錄 默認上來生成兩個文件,2個G。

consumequeue目錄,一級子目錄是topic,二級子目錄是queueId

mq 控制臺

rocketmq-console 在另一個倉庫,地址:

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

啟動成功,畫面還是比較清新的:

模擬發送100條,可以看到每個broker的數量:

console功能還是有很多可以再開發的地方,官方基本不維護了,需要的可能得二次開發了。

像我們就開發了類似報表、報警、監控等一些功能,還是比較方便的。

總結

以上是生活随笔為你收集整理的rocketmq 初探(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。