RocketMQ简介
RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。
RocketMQ優點
1 RocketMQ去除對zk的依賴
2 RocketMQ支持異步和同步兩種方式刷磁盤
3 RocketMQ單機支持的隊列或者topic數量是5w
4 RocketMQ支持消息重試
5 RocketMQ支持嚴格按照一定的順序發送消息
6 RocketMQ支持定時發送消息
7 RocketMQ支持根據消息ID來進行查詢
8 RocketMQ支持根據某個時間點進行消息的回溯
9 RocketMQ支持對消息服務端的過濾
10 RocketMQ消費并行度:順序消費 取決于queue數量,亂序消費 取決于consumer數量
RocketMQ架構原理
RocketMQ專業名詞
Producer 生產者角色—投遞消息給mq。
Producer Group 生產者組 —
Consumer 消費者 采用拉取/mq推送方式 獲取消息offset
Consumer Group 消費者組----在同一個組中,是不允許多個不同的消費者消費同一條消息。
多個消費者消費同一條消息呢? 兩個分組 多個不同的分組中可以允許有不同分組中消費者消費同一條消息的。----
以組的名義關聯該組消費的offset位置—
Topic
業務隊列存放消息
異步發送短信
異步發送郵件
郵件Topic
短信Topic
業務區分不同的隊列消息
Queue
會將一個topic主題中的消息存放在多個不同的Queue 與kafka分區模型是一樣
中。
Message
生產者投遞消息會自動對給消息生成一個全局消息id,后期的可以根據該消息全局id實現業務的防止重復執行------冪等性概念。
Tag
–區分 過濾
Broker
Mq服務器端
Name Server
與zk相同思想,作為rocketmq注冊中心 存放生產者消費者 topic主題信息。
Producer
消息生產者,位于用戶的進程內,Producer通過NameServer獲取所有Broker的路由信息,根據負載均衡策略選擇將消息發到哪個Broker,然后調用Broker接口提交消息。
Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。
Consumer
消息消費者,位于用戶進程內。Consumer通過NameServer獲取所有broker的路由信息后,向Broker發送Pull請求來獲取消息數據。Consumer可以以兩種模式啟動,廣播(Broadcast)和集群(Cluster),廣播模式下,一條消息會發送給所有Consumer,集群模式下消息只會發送給一個Consumer。
Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 Consumer 實例組成一個消費者組。
Topic
Topic用于將消息按主題做劃分,Producer將消息發往指定的Topic,Consumer訂閱該Topic就可以收到這條消息。Topic跟發送方和消費方都沒有強關聯關系,發送方可以同時往多個Topic投放消息,消費方也可以訂閱多個Topic的消息。在RocketMQ中,Topic是一個上邏輯概念。消息存儲不會按Topic分開。
Message
代表一條消息,使用MessageId唯一識別,用戶在發送時可以設置messageKey,便于之后查詢和跟蹤。一個 Message 必須指定 Topic,相當于寄信的地址。Message 還有一個可選的 Tag 設置,以便消費端可以基于 Tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 Broker 上的消息,方便在開發過程中診斷問題。
Tag
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。
Broker
Broker是RocketMQ的核心模塊,負責接收并存儲消息,同時提供Push/Pull接口來將消息發送給Consumer。Consumer可選擇從Master或者Slave讀取數據。多個主/從組成Broker集群,集群內的Master節點之間不做數據交互。Broker同時提供消息查詢的功能,可以通過MessageID和MessageKey來查詢消息。Borker會將自己的Topic配置信息實時同步到NameServer。
Queue
Topic和Queue是1對多的關系,一個Topic下可以包含多個Queue,主要用于負載均衡。發送消息時,用戶只指定Topic,Producer會根據Topic的路由信息選擇具體發到哪個Queue上。Consumer訂閱消息時,會根據負載均衡策略決定訂閱哪些Queue的消息。
Offset
RocketMQ在存儲消息時會為每個Topic下的每個Queue生成一個消息的索引文件,每個Queue都對應一個Offset記錄當前Queue中消息條數。
NameServer
NameServer可以看作是RocketMQ的注冊中心,它管理兩部分數據:集群的Topic-Queue的路由配置;Broker的實時配置信息。其它模塊通過Nameserv提供的接口獲取最新的Topic配置和路由信息。
Producer/Consumer :通過查詢接口獲取Topic對應的Broker的地址信息
Broker : 注冊配置信息到NameServer, 實時更新Topic信息到NameServe
RocketMQ環境搭建
注意:一定要配置rocketmq 環境變量 不然啟動 mqnamesrv.cmd
報錯: Please set the ROCKETMQ_HOME variable in your environment!
啟動mqnamesrv
系統環境變量配置
變量名:ROCKETMQ_HOME
變量值:MQ解壓路徑\MQ文件夾名
eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release
4.啟動 mqnamesrv.cmd
啟動mqBroker
Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關閉。
啟動Rocketmq-console
1.下載rocketmq-externals-master
,進入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夾,打開‘application.properties’進行配置。
2. 新增:rocketmq.config.namesrvAddr=127.0.0.1:9876
3. 執行
用CMD進入‘\rocketmq-externals\rocketmq-console’文件夾,執行‘mvn clean package -Dmaven.test.skip=true’,編譯生成。
編譯成功之后,Cmd進入‘target’文件夾,執行‘java -jar rocketmq-console-ng-2.0.0.jar’,啟動‘rocketmq-console-ng-1.0.0.jar’。
4.瀏覽器中輸入‘127.0.0.1:配置端口’,成功后即可查看。
eg:http://127.0.0.1:8088
RocketMQ集群環境
1.集群支持:
RocketMQ天生對集群的支持非常友好
2.單Master:
優點:除了配置簡單沒什么優點
缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用
3.多Master:
優點:配置簡單,性能最高
缺點:可能會有少量消息丟失(配置相關),單臺機器重啟或宕機期間,該機器下未被消費的消息在機器恢復前不可訂閱,影響消息實時性
4.多Master多Slave異步模式:
每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短消息延遲,毫秒級
優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預
缺點:Master宕機或磁盤損壞時會有少量消息丟失
Rocketmq偽集群搭建
集群:Master節點、Slave節點
Master節點:讀寫 生產者可以投遞消息到Master節點、消費者讀取該Master節點消費消息。
Slave節點:只能讀 不能夠寫 消費者只能夠讀,生產者不能夠將消息投遞到Slave節點中。
broker-a.conf 配置
brokerClusterName=DefaultCluster
#從節點的brokerName必須和主節點一樣
brokerName=broker-a
#0表示是一個主節點, >0表示Slave
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876
#允許自動創建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認的10911相差5以上
listenPort=10911
#存儲路徑
storePathRootDir=D:\rocketmq\store_master
#commitLog存儲路徑
storePathCommitLog=D:\rocketmq\store_master\commitLog
#消費隊列存儲路徑
storePathConsumerQueue=D:\rocketmq\store_master\consumerqueue
#消息索引存儲路徑
storePathIndex=D:\rocketmq\store_master\index
#checkpoint 文件存儲路徑
storeCheckpoint=D:\rocketmq\store_master\checkpoint
#abort 文件存儲路徑
abortFile=D:\rocketmq\store_master\abort
broker-b.conf配置
brokerClusterName=DefaultCluster
#從節點的brokerName必須和主節點一樣
brokerName=broker-a
#0表示是一個主節點, >0表示Slave
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876
#允許自動創建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認的10911相差5以上
listenPort=10921
#存儲路徑
storePathRootDir=D:\rocketmq\store_slave
#commitLog存儲路徑
storePathCommitLog=D:\rocketmq\store_slave\commitLog
#消費隊列存儲路徑
storePathConsumerQueue=D:\rocketmq\store_slave\consumerqueue
#消息索引存儲路徑
storePathIndex=D:\rocketmq\store_slave\index
#checkpoint 文件存儲路徑
storeCheckpoint=D:\rocketmq\store_slave\checkpoint
#abort 文件存儲路徑
abortFile=D:\rocketmq\store_slave\abort
啟動broker集群
啟動broker-a—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-a.conf
The broker[broker-a, 192.168.31.1:10911] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
啟動broker-b—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-b.conf
The broker[broker-a, 192.168.31.1:10921] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
展示界面
Rocketmq集群同步方案
在RocketMQ中 Broker需要實現集群保證高可用(HA)
生產者投遞消息都是存放在主的Broker中
從Broker每次定時同步主Brokercommitlog日志文件。
A.如果實現一主一從讀寫分離消費模型----
消費者訂閱從節點消費消息,可能會存在延遲問題。(網絡數據傳輸過程中,延遲必然)
B. 消費者直接訂閱我們主Broker消費消息 延遲概率比較低
默認的情況下,消費者:訂閱我們主的Broker消費消息,如果主的Broker節點物理內存占用達到40%,開始采用訂閱從節點實現消費,可以提高讀寫性能。----讀寫分離消費模型架構物理內存占用達到40%----消息堆積—
如果主Broker宕機的情況下,生產者是無法投遞消息,而我們消費者可以
訂閱我們從節點實現數據的消費。
消費者 消費進度同步問題
在每個Broker服務器中, 在C:\Users\Administrator\store\config
consumerOffset.json 記錄每個分組消費主題消息隊列 消費進度。
“topic_2020_mayikt@mayikt-group23”:{0:2,1:1,2:3,3:0
topic_2020_mayikt 主題名稱
mayikt-group23消費者分組名稱
0:2,1:1,
0:隊列id 消費進度 offset為2的位置
1:隊列id 消費者進度 offset為1的位置
1.從節點定時的形式同步主節點消費記錄信息,不管消費者訂閱主節點還是從節點
最終都會優先的將消費記錄結果給主節點,如果節點真的宕機的情況下,先記錄在
從節點。
2.如果主節點現在突然存活的情況下,從哪個位置開始消費呢?
如果我們消費者內存中有緩存消費進度的情況下,連接到主節點修改最新消費者進度記錄。
如果消費者內存沒有緩存消費進度的情況下,可能會發生重復消費的問題。
在RocketMQ中 Broker需要實現集群保證高可用(HA)
----Slave會同步Master節點中所有commitlog文件數據。
如果當Master節點宕機了,這時候Master節點就無法寫入數據(生產者無法投遞數據),
但是消費者可以根據Slave(備份)節點消費數據,注意的是 從Slave(備份)節點無法
寫入數據。
設置該參數:slaveReadEnable 從服務器不允許讀;
RocketMQ讀寫分離機制:
讀寫分離消費模型,提高IO讀寫的性能。
RocketMQ消費者進度如何同步:
Springboot整合方式
注意springboot整合rocketmq server端 版本一定要與rocketmq 不然可能啟動報錯
Maven依賴
org.springframework.boot
spring-boot-starter-parent
2.2.4.RELEASE
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.1
生產者
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
- 普通消息投遞 單向發送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
消費者
/**
-
@ClassName RocketMQConsumer
-
@Author 螞蟻課堂余勝軍 QQ644064779 www.mayikt.com
-
@Version V1.0
log.info("消費者監聽到消息:<msg:{}>", msgEntity);
**/
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group5”, topic = “topic_meite”)
public class RocketMQConsumer implements RocketMQListener {
@Override
public void onMessage(MsgEntity msgEntity) {}
}
配置文件
spring:
application:
name: mayikt-rocketmq
server:
port: 8000
rocketmq:
rocketmq地址
name-server: 127.0.0.1:9876
producer:
# 必須填寫 group
group: mayikt-group
Rocketmq配置文件詳解
所屬集群名字
brokerClusterName=rocketmq-cluster
此處需手動更改
broker名字,注意此處不同的配置文件填寫的不一樣
附加:按配置文件文件名來匹配
brokerName=broker-a
0 表示Master, > 0 表示slave
brokerId=0
此處許手動更改
(此處nameserver跟host配置相匹配,9876為默認rk服務默認端口)nameServer 地址,分號分割
附加:broker啟動時會跟nameserver建一個長連接,broker通過長連接才會向nameserver發新建的topic主題,然后java的客戶端才能跟nameserver端發起長連接,向nameserver索取topic,找到topic主題之后,判斷其所屬的broker,建立長連接進行通訊,這是一個至關重要的路由的概念,重點,也是區別于其它版本的一個重要特性
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
是否允許Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
Broker 對外服務的監聽端口
listenPort=10911
刪除文件時間點,默認是凌晨4點
deleteWhen=04
文件保留時間,默認48小時
fileReservedTime=120
commitLog每個文件的大小默認1G
附加:消息實際存儲位置,和ConsumeQueue是mq的核心存儲概念,之前搭建2m環境的時候創建在store下面,用于數據存儲,consumequeue是一個邏輯的概念,消息過來之后,consumequeue并不是把消息所有保存起來,而是記錄一個數據的位置,記錄好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
存儲路徑
storePathRootDir=/usr/local/rocketmq/store
commitLog存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消費隊列存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存儲路徑
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
? ASYNC_MASTER 異步復制Master
? SYNC_MASTER 同步雙寫Master
? SLAVE
brokerRote=ASYNC_MASTER
刷盤方式
? ASYNC_FLUSH 異步刷盤
? SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
發消息線程池數量
sendMessageTreadPoolNums=128
拉消息線程池數量
pullMessageTreadPoolNums=128
Rocketmq隊列分區模型
Rocketmq 底層存儲結構中 將一個主題分成n多個不同的
隊列來實現存放消息。
創建了一個主題:需要指定隊列個數 默認是4和16
寫隊列數量: 對我們生產者寫投遞隊列數量 16
讀隊列數量: 對我們消費者獲取消息隊列數量 16
在rocketmq中,如果一個topic只有一個隊列的情況下支持的并行能力比較弱,所以會將一個topic分成分成n多個不同的隊列queue來實現存放, 類似于Kafka的分區模型概念。
writeQueueNums:寫隊列數,表示producer發送到的MessageQueue的隊列個數
readQueueNums:讀隊列數,表示Consumer讀取消息的MessageQueue隊列個數
注:這兩個值需要相等,在集群模式下如果不相等,writeQueueNums=6,readQueueNums=3, 那么每個broker上會有3個queue的消息是無法消費的。
perm 隊列的權限:2表示w、4表示r 6表示rw
架構模擬圖
注意我們rocketmq與kafka設計思想原理相同,消息存儲在mq服務器端中,消費者不管消費失敗還是成功,最終消息還是會存在mq服務器端,可以通過日志策略清理消息。
RocketMQ解決方案
如果現在消費者獲取消息、處理器都是同一個線程的情況下,
有可能會影響到我們消費者速率—
消費者獲取消息10毫秒—處理消息5s 5.001s
Rocketmq中 消費者消費的消息,采用多線程的形式,需要注意消息順序一致性的問題。
與kafka思想是一樣的,消費者不管消費成功還是失敗,最終消息不會立即被刪除。
后期都是通過日志刪除策略,定時刪除消息。
NameServer原理
NameServer注冊中心 類似于zookeeper、nacos 存儲 服務節點信息。
Rocketmq 不使用Zookeeper而使用Nameserver實現注冊中心呢?
Cp(每個節點副本必須要保證數據一致性) ap(每個節點副本數據不一致
,保證可用性。)
Kafka:副本機制 副本選舉依賴 控制器 選舉依賴Zookeeper—選舉過程
在Rocketmq中 人為在配置文件中指定那個Broker 為主節點 ,而在kafka中依賴
與zookeeper實現選舉的。
NameServer 每個節點相互之間沒有數據過程、都是獨立的 (ap模式)
保證可用性、不保證每個NameServer 數據的一致性問題。
NameServer 與Broker 、生產者、消費者關系。
NameServer 中存放那些信息呢?
NameServer 與Broker 關系
nameServer如何知道Broker 宕機呢?
發送一個心跳續約包給Broker 告訴我還在存活狀態
與nacos、eureka 實現服務注冊中心原理是一樣的。
生產者:
發送請求
Broker 與 nameserver心跳續約間隔30s
如果生產者獲取到一個故障的Broker地址,實現發送消息如果失敗的情況下
如何處理
答案:
實現調用 存儲消息。
NameServer類似于zookeeper實現服務注冊中心
為什么RocketMQ不使用zookeeper?而使用NameServer作為注冊中心呢?
Zookeeper實現注冊中心 模式CP模式 保證數據的一致性
NameServer 保證AP模式 可用性
特點:
NameServer集群之間不需要數據同步
注冊分析:
Broker啟動的時候,會讀取Broker.config 配置文件,獲取到多個不同的nameServer集群的地址列表,會將Broker信息注冊給所有的NameServer存儲與所有的NameServer保持長連接。
注冊信息:
Broker 的IP和端口號 主題信息、集群信息 過濾器等。
NameSever如何知道我們Broker宕機呢?
續約機制:
Broker:
Rocketmq底層基于netty實現的,每個Broker在默認每隔30
S時間給nameServer發送一個心跳續約包,告訴我Broker還在存活。
Name 每隔10s時間檢測,故障Broker節點,則剔除。
Broker關系
每個NameServer相互之間都是獨立,不會做任何數據同步,采用ap模式思想
Broker信息注冊:Broker啟動的時候,會根據配置文件讀取到多個NameServer地址,輪詢的將Broker信息注冊到每個NameServer上,這樣每個NameServer都有該Broker信息。
剔除:Broker需要每隔30s時間給每個NameServer發送一個心跳續約,如果沒有發送心跳續約的話,NameServer 會有一個定時器每隔每10s中掃描一次,檢測故障的Broker,則會剔除。
最終每個Broker與NameSserver保持長連接。
生產者關系
如果是每隔30s發送續約,也就是意味著30s后 才能夠剔除該Broker
真好在這時候 生產者發送到該故障節點如何處理?
使用ack模式 確認刷盤成功 才屬于生產者消息投遞成功。
相關疑問
生產者發送消息三種模式
Producer發送消息有三種方式:同步、異步和單向
三種發送方式
單向 生產者投遞消息到mq中,不需要返回結果。
優點:延遲概率比較低
缺點:丟失消息數據
投遞消息過程比較耗時時間5毫秒
異步 生產者投遞消息到mq中,使用回調形式返回。
投遞消息過程比較耗時時間5毫秒
補償----
同步
生產者投遞消息到mq中,采用同步的形式獲取到返回消息是否有
投遞成功的結果,導致接口延遲概率比較大。
投遞消息過程比較耗時時間10毫秒
發送請求 基于請求與響應
1.同步發送:發送請求模式屬于同步的,發送該條消息不需等待該條消息發送成功之后,才可以繼續發送下一條。
2.異步發送:采用異步的發送模式,不需要同步阻塞等待,通過回調的形式監聽生產者消息投遞結果
3單向發送:只負責發送消息給mq,不管是否有發送成功。
同步發送
/**
- 同步發送
- @throws Exception
*/
@GetMapping("/sync")
public void sync() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
log.info(“同步發送字符串{}, 發送結果{}”, msg.toString(), sendResult);
}
異步發送
/**
-
異步發送
-
@throws Exception
@Overridepublic void onException(Throwable var1) {log.info("異步發送失敗{}", var1);}
*/
@GetMapping(“async”)
public void async() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
log.info(">msg:<<" + msg);
rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
log.info(“異步發送成功{}”, var1);
}});
}
單向發送
/**
- 普通消息投遞 單向發送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
順序消息
Rocketmq中,消費者處理消息業務邏輯的時候 是采用多線程。
如何解決消息順序一致性的問題?
實際上做業務邏輯開發中,很少有需要保證消息順序一致性問題。
1.在rocketmq 消費者默認是多線程異步消費的,開發者需要設定指定保證消息順序一致性問,也就是同一個隊列消息最終被同一個線程實現消費。
相關代碼
生產者
String uuid = UUID.randomUUID().toString();
SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “insert”, uuid);
log.info(“insert:” + result1.toString());
SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “update”, uuid);
log.info(“update:” + result2.toString());
SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “delete”, uuid);
log.info(“delete:” + result3.toString());
消費者
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group20”, topic = “topic_seq”, consumeMode = ConsumeMode.ORDERLY
)
public class RocketMQConsumer01 implements RocketMQListener {
@Override
public void onMessage(String msg) {
try {
Random r = new Random(100);
int i = r.nextInt(500);
Thread.sleep(i);
} catch (Exception e) {
}
結果
消息存儲結構
在win的安裝rocketmq,消息物理存放在
C:\Users\Administrator\store
commitlog:消息的存儲目錄
config:運行期間一些配置信息
consumequeue:消息消費隊列存儲目錄
index:消息索引文件存儲目錄
abort:如果存在abort文件說明Broker非正常關閉,該文件默認啟動時創建,正常退出時刪除
checkpoint:文件檢測點。存儲commitlog文件最后一次刷盤時間戳、consumequeue最后一次刷盤時間、index索引文件最后一次刷盤時間戳。
學習kafka 的時候,topic主題消息分成n多個不同的partition 分區存放,
而在我們的rocketmq中,將一個topic主題的消息分成多個不同的Queue存放。
前提條件:commitlog日志文件沒有滿的情況下:
在rocketmq中所有topic主題對應的隊列的消息都會存放在同一個commitlog日志文件中,
消費者在消費消息的時候 不會直接與commitlog日志文件打交道。
Rocketmq 提供消費隊列 (邏輯概念)
Commitlog日志文件 存放消息內容主體----Commitlogoffset
ConsumeQueue 不存放消息主體,只存放消息的Commitlogoffset、msgsize、msgtag
Queue offset與Commitlogoffset 之間區別?
Queue offset—消息存放在ConsumeQueue 消費偏移量的位置
Commitlogoffset ----消息物理存放位置
[queueId=2, storeSize=242, queueOffset=25, sysFlag=0, bornTimestamp=1611665447721, bornHost=/192.168.31.1:55706, storeTimestamp=1611665447722, storeHost=/192.168.31.1:10911, msgId=C0A81F0100002A9F0000000000039330, commitLogOffset=234288, bodyCRC=336380854, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_ide_mayikt’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26,
生產者如何投遞消息:
消費者如何消費:
消息的commitlogoffset 如何存放在不同的consumequeue中。
Consumequeue==16
投遞消息 消息key
消息key%16=1
Consumequeue 中 Consumeoffset 對應 一條消息(沒有對應消息主體)—commitlogoffset
消費者消費我們消息 在(kafka、rocketmq)中 消費成功或者失敗都不會立即將該消息刪除,日志清理策略刪除。
Rocketmq消息存儲結構
1.(commitlog日志文件沒有滿的情況下)在rocketmq中所有的消息日志,都存放在同一個commitlog日志文件中,(默認是1GB來存儲 好處1零拷貝映射 好處2 分段 非常好方式管理清理日志文件 commitlog命名就是上一個commitlog日志文件中最后一個commitlog-offset值)
,ConsumeQueue類似于在kafka中的 partition 分區模型, 而ConsumeQueue存放的是commitlog開始的commitLogoffset、msgsize、tag。
2.每次消費者讀取消息的時候,先讀取ConsumeQueue中獲取到commitLogoffset,在根據該
commitLogoffset查找commitLog日志文件獲取到消息體返回給消費者客戶端。
3.與kafka的設計不同
根據阿里巴巴消息中間件團隊的測試,如果每個topic中的partition 分區存儲的消息過多
,可能會影響到磁盤io的讀寫性能,所以采用ConsumeQueue存放少量的數據,消息讀取還是通過commitlog文件中查找。
Rocketmq讀寫使用:MappedByteBuffer 零拷貝
因為rocketmq 中日志文件存儲采用文件映射機制+mmap 減少用戶態與內核態來回拷貝的次數,從而可以提高性能
MappedByteBuffer 文件存儲映射,只能映射用戶態1.5GB-2GB 所以RocketMQ日志文件存儲commitlog默認為1G大小。
commitLog文件
commitlog文件的存儲地址:KaTeX parse error: Undefined control sequence: \store at position 5: HOME\?s?t?o?r?e?\commitlog{fileName},每個文件的大小默認1G =102410241024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。
冪等問題
整合方式
消費者冪等問題
當消費者消費消息失敗的時候,可以返回繼續重復消費,這時候rocketmq會給
該消費者不斷的重試,重試的過程中需要注意冪等性問題。
在rocketmq中,根據返回ack 狀態:
ConsumeConcurrentlyStatus
CONSUME_SUCCESS --消費成功
RECONSUME_LATER— 消費失敗繼續重試消費。
Messageid
對我們每條消息生成一個messageid------根據該id 實現消費者冪等性問題
Messageid 生成規則:關聯存放在那個mq服務器端、commitlog-offset值。
Messageid --C0A812E0495C18B4AAC276054FC20005 轉碼
獲取到commitlog-offset —查找物理對應消息位置。
Rocketmq中的 messageid 長度一共占用16個字節,其中包含消息存儲的ip和端口號,
消息commitLogOffset,客戶端發送請求傳遞messageid 給mq服務器端,mq服務器端
根據該commitlogoffset值查找對應的消息返回給客戶端。
消息過濾
1.Tag就是相當于給消息打一個標簽,比如我的文章 屬于那些標簽下,用戶如果訂閱了該標簽就可能刷到該文章。
2. 一個消息可能打上了多個tag標簽,消費者訂閱該主題和tag標簽,標簽如果一致的情況下就可以獲取到該消息。
原理:在我們ConsumeQueue中,存儲結構為 commitoffset、size、messagetag 會根據消費者傳遞的tag hash值與隊列中每條消息的tag hash 做比較,如果相等的情況則獲取該消息,如果不相等的情況下,則不會獲取該消息。
消費者組
與kafka基本原理相同,同一個消費者中,只能有一個消費者消費消息,
多個不同的消費者組可以消費同一條消息。
消費成功之后,提交offset。
分布式事務
消息的可用性
刷盤:將數據從pagecache中刷盤到硬盤中存儲
,避免數據的丟失。
刷盤模式:
1.同步刷盤:
生產者必須等待該消息 從pagecache刷盤到硬盤中,在返回
Ack給生產者。
優點:可以保證消息不丟失
缺點:影響到整體接口吞吐量
應用場景:金融支付類 msg
2.異步刷盤
生產者不需要等待pagecache刷盤到硬盤中結果,完全采用異步的形式
優點:
提高整體接口吞吐量
缺點:
有可能消息會丟失 (概率不是很大)
應用場景:行為分析---- 100 -12 98%
刷盤策略
RocketMQ,默認會將消息持久化存放在硬盤,首先會
寫入到系統PageCahe中,讓后再刷盤到硬盤,這樣就可以保證
硬盤與PageCache中數據完全一致性。
同步刷盤:
消息真正刷盤到磁盤中,才會返回給生產者,只要磁盤沒有壞,這樣做
可以保證消息不丟失,但是可能會影響整體的吞吐量。
異步刷盤:
讀寫充分利用pagecache,消息寫入到pagecache成功之后,采用異步的形式
刷盤到硬盤中,可以提高系統的吞吐量,但是可能消息會丟失。
RocketMQ解決分布式事務問題
RocketMQ解決分布式事務思想與原理
RocketMQ在其消息定義的基礎上,對事務消息擴展了兩個相關的概念:
1.Half(Prepare) Message——半消息(預處理消息)
半消息是一種特殊的消息類型,該狀態的消息暫時不能被Consumer消費。當一條事務消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發出的二次確認時,該事務消息就處于"暫時不可被消費"狀態,該狀態的事務消息被稱為半消息。
2.Message Status Check——消息狀態回查
由于網絡抖動、Producer重啟等原因,可能導致Producer向Broker發送的二次確認消息沒有成功送達。如果Broker檢測到某條事務消息長時間處于半消息狀態,則會主動向Producer端發起回查操作,查詢該事務消息在Producer端的事務狀態(Commit 或 Rollback)??梢钥闯?#xff0c;Message Status Check主要用來解決分布式事務中的超時問題。
RocketMQ的事務消息是基于兩階段提交實現的,也就是說消息有兩個狀態,prepared和commited。當消息執行完send方法后,進入的prepared狀態,進入prepared狀態以后,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定著這個消息的命運,
COMMIT_MESSAGE:提交消息,這個消息由prepared狀態進入到commited狀態,消費者可以消費這個消息;
ROLLBACK_MESSAGE:回滾,這個消息將被刪除,消費者不能消費這個消息;
UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個消息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個消息命運的,是checkLocalTransaction這個方法。
RocketMQ實現分布式事務的原理:
消費。
服務器端,RocketMQ服務器端將該消息,推送給消費者消費。
核心思想:確保生產者一定將消息投遞到mq服務器端,生產者必須先一定執行完成,在執行消費者。
RocketMQLocalTransactionState.ROLLBACK;—回滾
RocketMQLocalTransactionState.COMMIT—提交
RocketMQLocalTransactionState.UNKNOWN— 不是提交也不是回滾。
存在的一些問題:
發送ack給rocketmq 回滾該消息即可,不會被消費者消費。
Rocketmq服務器端 在默認的情況下 每隔60s 檢查本地事務是否已經執行過,如果執行過的情況下,則提交該提交,如果沒有執行該事務的情況下,則回滾。
常見錯誤
Lock failed,MQ already started
將 broker 的 master 和 slave 節點放在同一臺機器上,配置的storePath相同導致的,修改配置文件,改為不同的路徑即可解決。
總結
以上是生活随笔為你收集整理的RocketMQ简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式服务追踪与调用链系统
- 下一篇: properties 转 yml