RocketMQ简介
RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時消息、消息回溯等。
RocketMQ優(yōu)點
1 RocketMQ去除對zk的依賴
2 RocketMQ支持異步和同步兩種方式刷磁盤
3 RocketMQ單機支持的隊列或者topic數(shù)量是5w
4 RocketMQ支持消息重試
5 RocketMQ支持嚴格按照一定的順序發(fā)送消息
6 RocketMQ支持定時發(fā)送消息
7 RocketMQ支持根據(jù)消息ID來進行查詢
8 RocketMQ支持根據(jù)某個時間點進行消息的回溯
9 RocketMQ支持對消息服務(wù)端的過濾
10 RocketMQ消費并行度:順序消費 取決于queue數(shù)量,亂序消費 取決于consumer數(shù)量
RocketMQ架構(gòu)原理
RocketMQ專業(yè)名詞
Producer 生產(chǎn)者角色—投遞消息給mq。
Producer Group 生產(chǎn)者組 —
Consumer 消費者 采用拉取/mq推送方式 獲取消息offset
Consumer Group 消費者組----在同一個組中,是不允許多個不同的消費者消費同一條消息。
多個消費者消費同一條消息呢? 兩個分組 多個不同的分組中可以允許有不同分組中消費者消費同一條消息的。----
以組的名義關(guān)聯(lián)該組消費的offset位置—
Topic
業(yè)務(wù)隊列存放消息
異步發(fā)送短信
異步發(fā)送郵件
郵件Topic
短信Topic
業(yè)務(wù)區(qū)分不同的隊列消息
Queue
會將一個topic主題中的消息存放在多個不同的Queue 與kafka分區(qū)模型是一樣
中。
Message
生產(chǎn)者投遞消息會自動對給消息生成一個全局消息id,后期的可以根據(jù)該消息全局id實現(xiàn)業(yè)務(wù)的防止重復(fù)執(zhí)行------冪等性概念。
Tag
–區(qū)分 過濾
Broker
Mq服務(wù)器端
Name Server
與zk相同思想,作為rocketmq注冊中心 存放生產(chǎn)者消費者 topic主題信息。
Producer
消息生產(chǎn)者,位于用戶的進程內(nèi),Producer通過NameServer獲取所有Broker的路由信息,根據(jù)負載均衡策略選擇將消息發(fā)到哪個Broker,然后調(diào)用Broker接口提交消息。
Producer Group
生產(chǎn)者組,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組。
Consumer
消息消費者,位于用戶進程內(nèi)。Consumer通過NameServer獲取所有broker的路由信息后,向Broker發(fā)送Pull請求來獲取消息數(shù)據(jù)。Consumer可以以兩種模式啟動,廣播(Broadcast)和集群(Cluster),廣播模式下,一條消息會發(fā)送給所有Consumer,集群模式下消息只會發(fā)送給一個Consumer。
Consumer Group
消費者組,和生產(chǎn)者類似,消費同一類消息的多個 Consumer 實例組成一個消費者組。
Topic
Topic用于將消息按主題做劃分,Producer將消息發(fā)往指定的Topic,Consumer訂閱該Topic就可以收到這條消息。Topic跟發(fā)送方和消費方都沒有強關(guān)聯(lián)關(guān)系,發(fā)送方可以同時往多個Topic投放消息,消費方也可以訂閱多個Topic的消息。在RocketMQ中,Topic是一個上邏輯概念。消息存儲不會按Topic分開。
Message
代表一條消息,使用MessageId唯一識別,用戶在發(fā)送時可以設(shè)置messageKey,便于之后查詢和跟蹤。一個 Message 必須指定 Topic,相當(dāng)于寄信的地址。Message 還有一個可選的 Tag 設(shè)置,以便消費端可以基于 Tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業(yè)務(wù) key 來查找 Broker 上的消息,方便在開發(fā)過程中診斷問題。
Tag
標(biāo)簽可以被認為是對 Topic 進一步細化。一般在相同業(yè)務(wù)模塊中通過引入標(biāo)簽來標(biāo)記不同用途的消息。
Broker
Broker是RocketMQ的核心模塊,負責(zé)接收并存儲消息,同時提供Push/Pull接口來將消息發(fā)送給Consumer。Consumer可選擇從Master或者Slave讀取數(shù)據(jù)。多個主/從組成Broker集群,集群內(nèi)的Master節(jié)點之間不做數(shù)據(jù)交互。Broker同時提供消息查詢的功能,可以通過MessageID和MessageKey來查詢消息。Borker會將自己的Topic配置信息實時同步到NameServer。
Queue
Topic和Queue是1對多的關(guān)系,一個Topic下可以包含多個Queue,主要用于負載均衡。發(fā)送消息時,用戶只指定Topic,Producer會根據(jù)Topic的路由信息選擇具體發(fā)到哪個Queue上。Consumer訂閱消息時,會根據(jù)負載均衡策略決定訂閱哪些Queue的消息。
Offset
RocketMQ在存儲消息時會為每個Topic下的每個Queue生成一個消息的索引文件,每個Queue都對應(yīng)一個Offset記錄當(dāng)前Queue中消息條數(shù)。
NameServer
NameServer可以看作是RocketMQ的注冊中心,它管理兩部分數(shù)據(jù):集群的Topic-Queue的路由配置;Broker的實時配置信息。其它模塊通過Nameserv提供的接口獲取最新的Topic配置和路由信息。
Producer/Consumer :通過查詢接口獲取Topic對應(yīng)的Broker的地址信息
Broker : 注冊配置信息到NameServer, 實時更新Topic信息到NameServe
RocketMQ環(huán)境搭建
注意:一定要配置rocketmq 環(huán)境變量 不然啟動 mqnamesrv.cmd
報錯: Please set the ROCKETMQ_HOME variable in your environment!
啟動mqnamesrv
系統(tǒng)環(huán)境變量配置
變量名:ROCKETMQ_HOME
變量值:MQ解壓路徑\MQ文件夾名
eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release
4.啟動 mqnamesrv.cmd
啟動mqBroker
Cmd命令框執(zhí)行進入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關(guān)閉。
啟動Rocketmq-console
1.下載rocketmq-externals-master
,進入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夾,打開‘a(chǎn)pplication.properties’進行配置。
2. 新增:rocketmq.config.namesrvAddr=127.0.0.1:9876
3. 執(zhí)行
用CMD進入‘\rocketmq-externals\rocketmq-console’文件夾,執(zhí)行‘mvn clean package -Dmaven.test.skip=true’,編譯生成。
編譯成功之后,Cmd進入‘target’文件夾,執(zhí)行‘java -jar rocketmq-console-ng-2.0.0.jar’,啟動‘rocketmq-console-ng-1.0.0.jar’。
4.瀏覽器中輸入‘127.0.0.1:配置端口’,成功后即可查看。
eg:http://127.0.0.1:8088
RocketMQ集群環(huán)境
1.集群支持:
RocketMQ天生對集群的支持非常友好
2.單Master:
優(yōu)點:除了配置簡單沒什么優(yōu)點
缺點:不可靠,該機器重啟或宕機,將導(dǎo)致整個服務(wù)不可用
3.多Master:
優(yōu)點:配置簡單,性能最高
缺點:可能會有少量消息丟失(配置相關(guān)),單臺機器重啟或宕機期間,該機器下未被消費的消息在機器恢復(fù)前不可訂閱,影響消息實時性
4.多Master多Slave異步模式:
每個Master配一個Slave,有多對Master-Slave,集群采用異步復(fù)制方式,主備有短消息延遲,毫秒級
優(yōu)點:性能同多Master幾乎一樣,實時性高,主備間切換對應(yīng)用透明,不需人工干預(yù)
缺點:Master宕機或磁盤損壞時會有少量消息丟失
Rocketmq偽集群搭建
集群:Master節(jié)點、Slave節(jié)點
Master節(jié)點:讀寫 生產(chǎn)者可以投遞消息到Master節(jié)點、消費者讀取該Master節(jié)點消費消息。
Slave節(jié)點:只能讀 不能夠?qū)?消費者只能夠讀,生產(chǎn)者不能夠?qū)⑾⑼哆f到Slave節(jié)點中。
broker-a.conf 配置
brokerClusterName=DefaultCluster
#從節(jié)點的brokerName必須和主節(jié)點一樣
brokerName=broker-a
#0表示是一個主節(jié)點, >0表示Slave
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876
#允許自動創(chuàng)建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認的10911相差5以上
listenPort=10911
#存儲路徑
storePathRootDir=D:\rocketmq\store_master
#commitLog存儲路徑
storePathCommitLog=D:\rocketmq\store_master\commitLog
#消費隊列存儲路徑
storePathConsumerQueue=D:\rocketmq\store_master\consumerqueue
#消息索引存儲路徑
storePathIndex=D:\rocketmq\store_master\index
#checkpoint 文件存儲路徑
storeCheckpoint=D:\rocketmq\store_master\checkpoint
#abort 文件存儲路徑
abortFile=D:\rocketmq\store_master\abort
broker-b.conf配置
brokerClusterName=DefaultCluster
#從節(jié)點的brokerName必須和主節(jié)點一樣
brokerName=broker-a
#0表示是一個主節(jié)點, >0表示Slave
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876
#允許自動創(chuàng)建主題
autoCreateTopicEnable=true
#注意需改端口,并且要和默認的10911相差5以上
listenPort=10921
#存儲路徑
storePathRootDir=D:\rocketmq\store_slave
#commitLog存儲路徑
storePathCommitLog=D:\rocketmq\store_slave\commitLog
#消費隊列存儲路徑
storePathConsumerQueue=D:\rocketmq\store_slave\consumerqueue
#消息索引存儲路徑
storePathIndex=D:\rocketmq\store_slave\index
#checkpoint 文件存儲路徑
storeCheckpoint=D:\rocketmq\store_slave\checkpoint
#abort 文件存儲路徑
abortFile=D:\rocketmq\store_slave\abort
啟動broker集群
啟動broker-a—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-a.conf
The broker[broker-a, 192.168.31.1:10911] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
啟動broker-b—
F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-b.conf
The broker[broker-a, 192.168.31.1:10921] boot success. serializeType=JSON and na
me server is 127.0.0.1:9876
展示界面
Rocketmq集群同步方案
在RocketMQ中 Broker需要實現(xiàn)集群保證高可用(HA)
生產(chǎn)者投遞消息都是存放在主的Broker中
從Broker每次定時同步主Brokercommitlog日志文件。
A.如果實現(xiàn)一主一從讀寫分離消費模型----
消費者訂閱從節(jié)點消費消息,可能會存在延遲問題。(網(wǎng)絡(luò)數(shù)據(jù)傳輸過程中,延遲必然)
B. 消費者直接訂閱我們主Broker消費消息 延遲概率比較低
默認的情況下,消費者:訂閱我們主的Broker消費消息,如果主的Broker節(jié)點物理內(nèi)存占用達到40%,開始采用訂閱從節(jié)點實現(xiàn)消費,可以提高讀寫性能。----讀寫分離消費模型架構(gòu)物理內(nèi)存占用達到40%----消息堆積—
如果主Broker宕機的情況下,生產(chǎn)者是無法投遞消息,而我們消費者可以
訂閱我們從節(jié)點實現(xiàn)數(shù)據(jù)的消費。
消費者 消費進度同步問題
在每個Broker服務(wù)器中, 在C:\Users\Administrator\store\config
consumerOffset.json 記錄每個分組消費主題消息隊列 消費進度。
“topic_2020_mayikt@mayikt-group23”:{0:2,1:1,2:3,3:0
topic_2020_mayikt 主題名稱
mayikt-group23消費者分組名稱
0:2,1:1,
0:隊列id 消費進度 offset為2的位置
1:隊列id 消費者進度 offset為1的位置
1.從節(jié)點定時的形式同步主節(jié)點消費記錄信息,不管消費者訂閱主節(jié)點還是從節(jié)點
最終都會優(yōu)先的將消費記錄結(jié)果給主節(jié)點,如果節(jié)點真的宕機的情況下,先記錄在
從節(jié)點。
2.如果主節(jié)點現(xiàn)在突然存活的情況下,從哪個位置開始消費呢?
如果我們消費者內(nèi)存中有緩存消費進度的情況下,連接到主節(jié)點修改最新消費者進度記錄。
如果消費者內(nèi)存沒有緩存消費進度的情況下,可能會發(fā)生重復(fù)消費的問題。
在RocketMQ中 Broker需要實現(xiàn)集群保證高可用(HA)
----Slave會同步Master節(jié)點中所有commitlog文件數(shù)據(jù)。
如果當(dāng)Master節(jié)點宕機了,這時候Master節(jié)點就無法寫入數(shù)據(jù)(生產(chǎn)者無法投遞數(shù)據(jù)),
但是消費者可以根據(jù)Slave(備份)節(jié)點消費數(shù)據(jù),注意的是 從Slave(備份)節(jié)點無法
寫入數(shù)據(jù)。
設(shè)置該參數(shù):slaveReadEnable 從服務(wù)器不允許讀;
RocketMQ讀寫分離機制:
讀寫分離消費模型,提高IO讀寫的性能。
RocketMQ消費者進度如何同步:
Springboot整合方式
注意springboot整合rocketmq server端 版本一定要與rocketmq 不然可能啟動報錯
Maven依賴
org.springframework.boot
spring-boot-starter-parent
2.2.4.RELEASE
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.1
生產(chǎn)者
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
- 普通消息投遞 單向發(fā)送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
消費者
/**
-
@ClassName RocketMQConsumer
-
@Author 螞蟻課堂余勝軍 QQ644064779 www.mayikt.com
-
@Version V1.0
log.info("消費者監(jiān)聽到消息:<msg:{}>", msgEntity);
**/
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group5”, topic = “topic_meite”)
public class RocketMQConsumer implements RocketMQListener {
@Override
public void onMessage(MsgEntity msgEntity) {}
}
配置文件
spring:
application:
name: mayikt-rocketmq
server:
port: 8000
rocketmq:
rocketmq地址
name-server: 127.0.0.1:9876
producer:
# 必須填寫 group
group: mayikt-group
Rocketmq配置文件詳解
所屬集群名字
brokerClusterName=rocketmq-cluster
此處需手動更改
broker名字,注意此處不同的配置文件填寫的不一樣
附加:按配置文件文件名來匹配
brokerName=broker-a
0 表示Master, > 0 表示slave
brokerId=0
此處許手動更改
(此處nameserver跟host配置相匹配,9876為默認rk服務(wù)默認端口)nameServer 地址,分號分割
附加:broker啟動時會跟nameserver建一個長連接,broker通過長連接才會向nameserver發(fā)新建的topic主題,然后java的客戶端才能跟nameserver端發(fā)起長連接,向nameserver索取topic,找到topic主題之后,判斷其所屬的broker,建立長連接進行通訊,這是一個至關(guān)重要的路由的概念,重點,也是區(qū)別于其它版本的一個重要特性
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
在發(fā)送消息時,自動創(chuàng)建服務(wù)器不存在的Topic,默認創(chuàng)建的隊列數(shù)
defaultTopicQueueNums=4
是否允許Broker 自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉
autoCreateTopicEnable=true
是否允許Broker自動創(chuàng)建訂閱組,建議線下開啟,線上關(guān)閉
autoCreateSubscriptionGroup=true
Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911
刪除文件時間點,默認是凌晨4點
deleteWhen=04
文件保留時間,默認48小時
fileReservedTime=120
commitLog每個文件的大小默認1G
附加:消息實際存儲位置,和ConsumeQueue是mq的核心存儲概念,之前搭建2m環(huán)境的時候創(chuàng)建在store下面,用于數(shù)據(jù)存儲,consumequeue是一個邏輯的概念,消息過來之后,consumequeue并不是把消息所有保存起來,而是記錄一個數(shù)據(jù)的位置,記錄好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
ConsumeQueue每個文件默認存30W條,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
存儲路徑
storePathRootDir=/usr/local/rocketmq/store
commitLog存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
消費隊列存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
abort 文件存儲路徑
abortFile=/usr/local/rocketmq/store/abort
限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
Broker 的角色
? ASYNC_MASTER 異步復(fù)制Master
? SYNC_MASTER 同步雙寫Master
? SLAVE
brokerRote=ASYNC_MASTER
刷盤方式
? ASYNC_FLUSH 異步刷盤
? SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
發(fā)消息線程池數(shù)量
sendMessageTreadPoolNums=128
拉消息線程池數(shù)量
pullMessageTreadPoolNums=128
Rocketmq隊列分區(qū)模型
Rocketmq 底層存儲結(jié)構(gòu)中 將一個主題分成n多個不同的
隊列來實現(xiàn)存放消息。
創(chuàng)建了一個主題:需要指定隊列個數(shù) 默認是4和16
寫隊列數(shù)量: 對我們生產(chǎn)者寫投遞隊列數(shù)量 16
讀隊列數(shù)量: 對我們消費者獲取消息隊列數(shù)量 16
在rocketmq中,如果一個topic只有一個隊列的情況下支持的并行能力比較弱,所以會將一個topic分成分成n多個不同的隊列queue來實現(xiàn)存放, 類似于Kafka的分區(qū)模型概念。
writeQueueNums:寫隊列數(shù),表示producer發(fā)送到的MessageQueue的隊列個數(shù)
readQueueNums:讀隊列數(shù),表示Consumer讀取消息的MessageQueue隊列個數(shù)
注:這兩個值需要相等,在集群模式下如果不相等,writeQueueNums=6,readQueueNums=3, 那么每個broker上會有3個queue的消息是無法消費的。
perm 隊列的權(quán)限:2表示w、4表示r 6表示rw
架構(gòu)模擬圖
注意我們rocketmq與kafka設(shè)計思想原理相同,消息存儲在mq服務(wù)器端中,消費者不管消費失敗還是成功,最終消息還是會存在mq服務(wù)器端,可以通過日志策略清理消息。
RocketMQ解決方案
如果現(xiàn)在消費者獲取消息、處理器都是同一個線程的情況下,
有可能會影響到我們消費者速率—
消費者獲取消息10毫秒—處理消息5s 5.001s
Rocketmq中 消費者消費的消息,采用多線程的形式,需要注意消息順序一致性的問題。
與kafka思想是一樣的,消費者不管消費成功還是失敗,最終消息不會立即被刪除。
后期都是通過日志刪除策略,定時刪除消息。
NameServer原理
NameServer注冊中心 類似于zookeeper、nacos 存儲 服務(wù)節(jié)點信息。
Rocketmq 不使用Zookeeper而使用Nameserver實現(xiàn)注冊中心呢?
Cp(每個節(jié)點副本必須要保證數(shù)據(jù)一致性) ap(每個節(jié)點副本數(shù)據(jù)不一致
,保證可用性。)
Kafka:副本機制 副本選舉依賴 控制器 選舉依賴Zookeeper—選舉過程
在Rocketmq中 人為在配置文件中指定那個Broker 為主節(jié)點 ,而在kafka中依賴
與zookeeper實現(xiàn)選舉的。
NameServer 每個節(jié)點相互之間沒有數(shù)據(jù)過程、都是獨立的 (ap模式)
保證可用性、不保證每個NameServer 數(shù)據(jù)的一致性問題。
NameServer 與Broker 、生產(chǎn)者、消費者關(guān)系。
NameServer 中存放那些信息呢?
NameServer 與Broker 關(guān)系
nameServer如何知道Broker 宕機呢?
發(fā)送一個心跳續(xù)約包給Broker 告訴我還在存活狀態(tài)
與nacos、eureka 實現(xiàn)服務(wù)注冊中心原理是一樣的。
生產(chǎn)者:
發(fā)送請求
Broker 與 nameserver心跳續(xù)約間隔30s
如果生產(chǎn)者獲取到一個故障的Broker地址,實現(xiàn)發(fā)送消息如果失敗的情況下
如何處理
答案:
實現(xiàn)調(diào)用 存儲消息。
NameServer類似于zookeeper實現(xiàn)服務(wù)注冊中心
為什么RocketMQ不使用zookeeper?而使用NameServer作為注冊中心呢?
Zookeeper實現(xiàn)注冊中心 模式CP模式 保證數(shù)據(jù)的一致性
NameServer 保證AP模式 可用性
特點:
NameServer集群之間不需要數(shù)據(jù)同步
注冊分析:
Broker啟動的時候,會讀取Broker.config 配置文件,獲取到多個不同的nameServer集群的地址列表,會將Broker信息注冊給所有的NameServer存儲與所有的NameServer保持長連接。
注冊信息:
Broker 的IP和端口號 主題信息、集群信息 過濾器等。
NameSever如何知道我們Broker宕機呢?
續(xù)約機制:
Broker:
Rocketmq底層基于netty實現(xiàn)的,每個Broker在默認每隔30
S時間給nameServer發(fā)送一個心跳續(xù)約包,告訴我Broker還在存活。
Name 每隔10s時間檢測,故障Broker節(jié)點,則剔除。
Broker關(guān)系
每個NameServer相互之間都是獨立,不會做任何數(shù)據(jù)同步,采用ap模式思想
Broker信息注冊:Broker啟動的時候,會根據(jù)配置文件讀取到多個NameServer地址,輪詢的將Broker信息注冊到每個NameServer上,這樣每個NameServer都有該Broker信息。
剔除:Broker需要每隔30s時間給每個NameServer發(fā)送一個心跳續(xù)約,如果沒有發(fā)送心跳續(xù)約的話,NameServer 會有一個定時器每隔每10s中掃描一次,檢測故障的Broker,則會剔除。
最終每個Broker與NameSserver保持長連接。
生產(chǎn)者關(guān)系
如果是每隔30s發(fā)送續(xù)約,也就是意味著30s后 才能夠剔除該Broker
真好在這時候 生產(chǎn)者發(fā)送到該故障節(jié)點如何處理?
使用ack模式 確認刷盤成功 才屬于生產(chǎn)者消息投遞成功。
相關(guān)疑問
生產(chǎn)者發(fā)送消息三種模式
Producer發(fā)送消息有三種方式:同步、異步和單向
三種發(fā)送方式
單向 生產(chǎn)者投遞消息到mq中,不需要返回結(jié)果。
優(yōu)點:延遲概率比較低
缺點:丟失消息數(shù)據(jù)
投遞消息過程比較耗時時間5毫秒
異步 生產(chǎn)者投遞消息到mq中,使用回調(diào)形式返回。
投遞消息過程比較耗時時間5毫秒
補償----
同步
生產(chǎn)者投遞消息到mq中,采用同步的形式獲取到返回消息是否有
投遞成功的結(jié)果,導(dǎo)致接口延遲概率比較大。
投遞消息過程比較耗時時間10毫秒
發(fā)送請求 基于請求與響應(yīng)
1.同步發(fā)送:發(fā)送請求模式屬于同步的,發(fā)送該條消息不需等待該條消息發(fā)送成功之后,才可以繼續(xù)發(fā)送下一條。
2.異步發(fā)送:采用異步的發(fā)送模式,不需要同步阻塞等待,通過回調(diào)的形式監(jiān)聽生產(chǎn)者消息投遞結(jié)果
3單向發(fā)送:只負責(zé)發(fā)送消息給mq,不管是否有發(fā)送成功。
同步發(fā)送
/**
- 同步發(fā)送
- @throws Exception
*/
@GetMapping("/sync")
public void sync() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
log.info(“同步發(fā)送字符串{}, 發(fā)送結(jié)果{}”, msg.toString(), sendResult);
}
異步發(fā)送
/**
-
異步發(fā)送
-
@throws Exception
@Overridepublic void onException(Throwable var1) {log.info("異步發(fā)送失敗{}", var1);}
*/
@GetMapping(“async”)
public void async() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
log.info(">msg:<<" + msg);
rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
log.info(“異步發(fā)送成功{}”, var1);
}});
}
單向發(fā)送
/**
- 普通消息投遞 單向發(fā)送
*/
@GetMapping("/sendMsg")
public String sendMsg() {
MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
return “投遞消息 => " + msg.toString() + " => 成功”;
}
順序消息
Rocketmq中,消費者處理消息業(yè)務(wù)邏輯的時候 是采用多線程。
如何解決消息順序一致性的問題?
實際上做業(yè)務(wù)邏輯開發(fā)中,很少有需要保證消息順序一致性問題。
1.在rocketmq 消費者默認是多線程異步消費的,開發(fā)者需要設(shè)定指定保證消息順序一致性問,也就是同一個隊列消息最終被同一個線程實現(xiàn)消費。
相關(guān)代碼
生產(chǎn)者
String uuid = UUID.randomUUID().toString();
SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “insert”, uuid);
log.info(“insert:” + result1.toString());
SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “update”, uuid);
log.info(“update:” + result2.toString());
SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “delete”, uuid);
log.info(“delete:” + result3.toString());
消費者
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = “mayikt-group20”, topic = “topic_seq”, consumeMode = ConsumeMode.ORDERLY
)
public class RocketMQConsumer01 implements RocketMQListener {
@Override
public void onMessage(String msg) {
try {
Random r = new Random(100);
int i = r.nextInt(500);
Thread.sleep(i);
} catch (Exception e) {
}
結(jié)果
消息存儲結(jié)構(gòu)
在win的安裝rocketmq,消息物理存放在
C:\Users\Administrator\store
commitlog:消息的存儲目錄
config:運行期間一些配置信息
consumequeue:消息消費隊列存儲目錄
index:消息索引文件存儲目錄
abort:如果存在abort文件說明Broker非正常關(guān)閉,該文件默認啟動時創(chuàng)建,正常退出時刪除
checkpoint:文件檢測點。存儲commitlog文件最后一次刷盤時間戳、consumequeue最后一次刷盤時間、index索引文件最后一次刷盤時間戳。
學(xué)習(xí)kafka 的時候,topic主題消息分成n多個不同的partition 分區(qū)存放,
而在我們的rocketmq中,將一個topic主題的消息分成多個不同的Queue存放。
前提條件:commitlog日志文件沒有滿的情況下:
在rocketmq中所有topic主題對應(yīng)的隊列的消息都會存放在同一個commitlog日志文件中,
消費者在消費消息的時候 不會直接與commitlog日志文件打交道。
Rocketmq 提供消費隊列 (邏輯概念)
Commitlog日志文件 存放消息內(nèi)容主體----Commitlogoffset
ConsumeQueue 不存放消息主體,只存放消息的Commitlogoffset、msgsize、msgtag
Queue offset與Commitlogoffset 之間區(qū)別?
Queue offset—消息存放在ConsumeQueue 消費偏移量的位置
Commitlogoffset ----消息物理存放位置
[queueId=2, storeSize=242, queueOffset=25, sysFlag=0, bornTimestamp=1611665447721, bornHost=/192.168.31.1:55706, storeTimestamp=1611665447722, storeHost=/192.168.31.1:10911, msgId=C0A81F0100002A9F0000000000039330, commitLogOffset=234288, bodyCRC=336380854, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_ide_mayikt’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26,
生產(chǎn)者如何投遞消息:
消費者如何消費:
消息的commitlogoffset 如何存放在不同的consumequeue中。
Consumequeue==16
投遞消息 消息key
消息key%16=1
Consumequeue 中 Consumeoffset 對應(yīng) 一條消息(沒有對應(yīng)消息主體)—commitlogoffset
消費者消費我們消息 在(kafka、rocketmq)中 消費成功或者失敗都不會立即將該消息刪除,日志清理策略刪除。
Rocketmq消息存儲結(jié)構(gòu)
1.(commitlog日志文件沒有滿的情況下)在rocketmq中所有的消息日志,都存放在同一個commitlog日志文件中,(默認是1GB來存儲 好處1零拷貝映射 好處2 分段 非常好方式管理清理日志文件 commitlog命名就是上一個commitlog日志文件中最后一個commitlog-offset值)
,ConsumeQueue類似于在kafka中的 partition 分區(qū)模型, 而ConsumeQueue存放的是commitlog開始的commitLogoffset、msgsize、tag。
2.每次消費者讀取消息的時候,先讀取ConsumeQueue中獲取到commitLogoffset,在根據(jù)該
commitLogoffset查找commitLog日志文件獲取到消息體返回給消費者客戶端。
3.與kafka的設(shè)計不同
根據(jù)阿里巴巴消息中間件團隊的測試,如果每個topic中的partition 分區(qū)存儲的消息過多
,可能會影響到磁盤io的讀寫性能,所以采用ConsumeQueue存放少量的數(shù)據(jù),消息讀取還是通過commitlog文件中查找。
Rocketmq讀寫使用:MappedByteBuffer 零拷貝
因為rocketmq 中日志文件存儲采用文件映射機制+mmap 減少用戶態(tài)與內(nèi)核態(tài)來回拷貝的次數(shù),從而可以提高性能
MappedByteBuffer 文件存儲映射,只能映射用戶態(tài)1.5GB-2GB 所以RocketMQ日志文件存儲commitlog默認為1G大小。
commitLog文件
commitlog文件的存儲地址:KaTeX parse error: Undefined control sequence: \store at position 5: HOME\?s?t?o?r?e?\commitlog{fileName},每個文件的大小默認1G =102410241024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲的時候會順序?qū)懭胛募?#xff0c;當(dāng)文件滿了,寫入下一個文件。
冪等問題
整合方式
消費者冪等問題
當(dāng)消費者消費消息失敗的時候,可以返回繼續(xù)重復(fù)消費,這時候rocketmq會給
該消費者不斷的重試,重試的過程中需要注意冪等性問題。
在rocketmq中,根據(jù)返回ack 狀態(tài):
ConsumeConcurrentlyStatus
CONSUME_SUCCESS --消費成功
RECONSUME_LATER— 消費失敗繼續(xù)重試消費。
Messageid
對我們每條消息生成一個messageid------根據(jù)該id 實現(xiàn)消費者冪等性問題
Messageid 生成規(guī)則:關(guān)聯(lián)存放在那個mq服務(wù)器端、commitlog-offset值。
Messageid --C0A812E0495C18B4AAC276054FC20005 轉(zhuǎn)碼
獲取到commitlog-offset —查找物理對應(yīng)消息位置。
Rocketmq中的 messageid 長度一共占用16個字節(jié),其中包含消息存儲的ip和端口號,
消息commitLogOffset,客戶端發(fā)送請求傳遞messageid 給mq服務(wù)器端,mq服務(wù)器端
根據(jù)該commitlogoffset值查找對應(yīng)的消息返回給客戶端。
消息過濾
1.Tag就是相當(dāng)于給消息打一個標(biāo)簽,比如我的文章 屬于那些標(biāo)簽下,用戶如果訂閱了該標(biāo)簽就可能刷到該文章。
2. 一個消息可能打上了多個tag標(biāo)簽,消費者訂閱該主題和tag標(biāo)簽,標(biāo)簽如果一致的情況下就可以獲取到該消息。
原理:在我們ConsumeQueue中,存儲結(jié)構(gòu)為 commitoffset、size、messagetag 會根據(jù)消費者傳遞的tag hash值與隊列中每條消息的tag hash 做比較,如果相等的情況則獲取該消息,如果不相等的情況下,則不會獲取該消息。
消費者組
與kafka基本原理相同,同一個消費者中,只能有一個消費者消費消息,
多個不同的消費者組可以消費同一條消息。
消費成功之后,提交offset。
分布式事務(wù)
消息的可用性
刷盤:將數(shù)據(jù)從pagecache中刷盤到硬盤中存儲
,避免數(shù)據(jù)的丟失。
刷盤模式:
1.同步刷盤:
生產(chǎn)者必須等待該消息 從pagecache刷盤到硬盤中,在返回
Ack給生產(chǎn)者。
優(yōu)點:可以保證消息不丟失
缺點:影響到整體接口吞吐量
應(yīng)用場景:金融支付類 msg
2.異步刷盤
生產(chǎn)者不需要等待pagecache刷盤到硬盤中結(jié)果,完全采用異步的形式
優(yōu)點:
提高整體接口吞吐量
缺點:
有可能消息會丟失 (概率不是很大)
應(yīng)用場景:行為分析---- 100 -12 98%
刷盤策略
RocketMQ,默認會將消息持久化存放在硬盤,首先會
寫入到系統(tǒng)PageCahe中,讓后再刷盤到硬盤,這樣就可以保證
硬盤與PageCache中數(shù)據(jù)完全一致性。
同步刷盤:
消息真正刷盤到磁盤中,才會返回給生產(chǎn)者,只要磁盤沒有壞,這樣做
可以保證消息不丟失,但是可能會影響整體的吞吐量。
異步刷盤:
讀寫充分利用pagecache,消息寫入到pagecache成功之后,采用異步的形式
刷盤到硬盤中,可以提高系統(tǒng)的吞吐量,但是可能消息會丟失。
RocketMQ解決分布式事務(wù)問題
RocketMQ解決分布式事務(wù)思想與原理
RocketMQ在其消息定義的基礎(chǔ)上,對事務(wù)消息擴展了兩個相關(guān)的概念:
1.Half(Prepare) Message——半消息(預(yù)處理消息)
半消息是一種特殊的消息類型,該狀態(tài)的消息暫時不能被Consumer消費。當(dāng)一條事務(wù)消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發(fā)出的二次確認時,該事務(wù)消息就處于"暫時不可被消費"狀態(tài),該狀態(tài)的事務(wù)消息被稱為半消息。
2.Message Status Check——消息狀態(tài)回查
由于網(wǎng)絡(luò)抖動、Producer重啟等原因,可能導(dǎo)致Producer向Broker發(fā)送的二次確認消息沒有成功送達。如果Broker檢測到某條事務(wù)消息長時間處于半消息狀態(tài),則會主動向Producer端發(fā)起回查操作,查詢該事務(wù)消息在Producer端的事務(wù)狀態(tài)(Commit 或 Rollback)。可以看出,Message Status Check主要用來解決分布式事務(wù)中的超時問題。
RocketMQ的事務(wù)消息是基于兩階段提交實現(xiàn)的,也就是說消息有兩個狀態(tài),prepared和commited。當(dāng)消息執(zhí)行完send方法后,進入的prepared狀態(tài),進入prepared狀態(tài)以后,就要執(zhí)行executeLocalTransaction方法,這個方法的返回值有3個,也決定著這個消息的命運,
COMMIT_MESSAGE:提交消息,這個消息由prepared狀態(tài)進入到commited狀態(tài),消費者可以消費這個消息;
ROLLBACK_MESSAGE:回滾,這個消息將被刪除,消費者不能消費這個消息;
UNKNOW:未知,這個狀態(tài)有點意思,如果返回這個狀態(tài),這個消息既不提交,也不回滾,還是保持prepared狀態(tài),而最終決定這個消息命運的,是checkLocalTransaction這個方法。
RocketMQ實現(xiàn)分布式事務(wù)的原理:
消費。
服務(wù)器端,RocketMQ服務(wù)器端將該消息,推送給消費者消費。
核心思想:確保生產(chǎn)者一定將消息投遞到mq服務(wù)器端,生產(chǎn)者必須先一定執(zhí)行完成,在執(zhí)行消費者。
RocketMQLocalTransactionState.ROLLBACK;—回滾
RocketMQLocalTransactionState.COMMIT—提交
RocketMQLocalTransactionState.UNKNOWN— 不是提交也不是回滾。
存在的一些問題:
發(fā)送ack給rocketmq 回滾該消息即可,不會被消費者消費。
Rocketmq服務(wù)器端 在默認的情況下 每隔60s 檢查本地事務(wù)是否已經(jīng)執(zhí)行過,如果執(zhí)行過的情況下,則提交該提交,如果沒有執(zhí)行該事務(wù)的情況下,則回滾。
常見錯誤
Lock failed,MQ already started
將 broker 的 master 和 slave 節(jié)點放在同一臺機器上,配置的storePath相同導(dǎo)致的,修改配置文件,改為不同的路徑即可解決。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式服务追踪与调用链系统
- 下一篇: properties 转 yml