消息长度_填坑笔记:RocketMQ消息订阅失败问题?
前語:不要為了讀文章而讀文章,一定要帶著問題來讀文章,勤思考。
作者:kinnylee ??來源:http://1t.click/g26
# 背景介紹
項目組使用阿里RocketMQ,對同一個消費組設置不同的tag訂閱關系,出現消息丟失的問題,本文從rocketmq源碼研究消息發布與訂閱原理,并分析導致該問題的原因。
# 官方說明
告訴使用者:同一個消費組,必須保持訂閱關系一致
為什么?它沒有說!只能從源碼找答案
# 問題復現
啟動消費者1,消費組為group1,訂閱topicA的消息,tag設置為tag1 || tag2
啟動消費者2,消費組也為group1,也訂閱topicA的消息,但是tag設置為tag3
啟動生產者,生產者發送含有tag1,tag2,tag3的消息各10條
消費者1沒有收到任何消息,消費者2收到部分消息
# 結論
同一個消費組中,設置不同tag時,后啟動的消費者會覆蓋先啟動的消費者設置的tag
tag決定了消息過濾的條件,經過服務端和客戶端兩層過濾,最后只有后啟動的消費者才能收到部分消息
# 原理說明
1、消息如何保存
CommitLog
保存所有topic的原始消息
CommitLog分為多個文件,每個文件默認最大為1G
每條記錄包括:消息長度和消息文本(消息體,屬性,uid等等)
因每條消息長度不一致,每個commitLog的記錄長度也不一致
ConsumerQueue
保存某個Topic下某個Queue的索引信息
每條記錄包括:消息在commitLog中的offset,消息大小,消息tag的哈希值
每條記錄長度固定為20byte
producer發送消息后,先保存到commitLog,再異步建立該條消息對應的topic + queue對應的ConsumerQueue索引
第三部分的Hash(tag)是服務端過濾消息的重要依據
2、consumer如何訂閱消息?
注冊訂閱信息
consumer訂閱時,會將訂閱信息注冊到到服務端
保存訂閱信息的是Map類,key為topic,value主要是tag
subVersion取當前時間。
這里的key是topic,subVersion版本號,這兩點很關鍵!后面有用到!
拉取消息并過濾
拉取消息時,首先從服務端獲取訂閱關系,得到tag的hash集合codeSet
然后從ConsumerQueue獲取一條記錄,判斷記錄的hashCode是否在codeSet中,以達到消息過濾的目的,決定是否將該消息發送給consumer
總之一句話:tag決定了消息是否發到客戶端
3、消息過濾
服務端過濾
過濾:tag的hash值過濾
優點:
減少不必要消息占用流量
缺點:
Hash存在沖突,過濾不完全準確
客戶端過濾
服務端過濾存在不準確性,客戶端再次精確過濾
客戶度過濾:tag的字符串值做對比。不相等的不返回給消費者
原因總結
同一個consumer group的訂閱關系,保存在RebalanceImpl類的Map中。key為topic
不同的消費者啟動后,依次注冊訂閱關系,因為tag不一樣,導致Map中同一topic的tag被覆蓋。比如:消費者1訂閱tag1,消費者2訂閱tag2。最后map中只保存tag2.
過濾的核心是是tag,tag被更新,過濾條件被改變。服務端過濾后只返回tag2的消息
客戶端接收消息后,再次過濾。先啟動的消費者1訂閱tagA,但是服務端返回tag2,所以消費者1收不到任何消息。消費者2能收到一半的消息(集群模式,假設消息平均分配,另外一半分給tag2)
# 源碼分析
1、訂閱關系數據結構
2、消費者1啟動時注冊的訂閱關系
3、消費者2后啟動覆蓋訂閱關系
4、服務端過濾時取出ConsumerQueue的Hash(tag)
5、對比消息的Hash(tag)和之前保存的訂閱關系
7、客戶端過濾
熱文推薦
這份5G PPT這幾天在我的朋友圈刷屏了。
作為一名Java程序員,你竟然不知道Intrumentation!
總結
以上是生活随笔為你收集整理的消息长度_填坑笔记:RocketMQ消息订阅失败问题?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 收藏模板:软件日报告模板(参考)
- 下一篇: deepin安装卡死在蓝色背景_求大神帮