消息队列MQ
消息隊列理論
作用
基本概念
兩種模型
producer/broker/consumer
- 為了提高并發(fā)度,Topic模式往往還會引入隊列或者分區(qū)的概念,即消息發(fā)往Topic下的某個隊列或分區(qū)中。RocketMq叫隊列,kafka中叫分區(qū)。如某個Topic有5個隊列,那么該主題的并發(fā)度就提高為5,可以有五個消費者并行消費該主題的消息。一般可以采用輪詢或者hash取余的方式將消息分配到不同的隊列當中
- 為了提高消費者的并發(fā)度,消費者通常都有消費者組(consumer group)的概念,消費者都是屬于某個消費者組的。一個消息會發(fā)往多個訂閱了這個topic的消費者組。這個消息實際上是寫到了topic下的某個隊列中,消費者組中的某個消費者對應消費一個隊列的消息,同一消費者組下的應用,可以認為是同一應用的集群,一個topic下的一個隊列,只會對應消費組下的一個實例(或一個線程),如果實例數(shù)大于隊列數(shù),則有的實例是不會分配消息的。
- 物理上一條消息處理副本拷貝以外,一條消息在broker中只會有一份,每個消費組有自己的消費點位(offset)來標識消費到的位置。offset之前的消息是被這個消費組消費過的,這個offset是隊列級的,每個消費組都會維護訂閱的topic下的每個隊列的offset
rocketmq offset的維護
[參考文章](https://blog.csdn.net/agonie201218/article/details/120242146)
- **本地管理模式:**消費者廣播模式下,每個消費者將消費點位以json的方式存儲在本地文件中,每個消費者管理自己的消費點位,各消費者間不存在消費進度的交集
- **遠程管理模式:**消費者集群消費模式下,topic下每個quene的消費點位以json文件的方式持久化在broker的本地文件中,broker內(nèi)存中會形成一個雙層map,[topic, [queneId, offset]]。正常情況下topic下的一個quene只會被一個消費者消費,使用遠程管理的模式,主要是為保證rebalance的機制
- 消費者啟動時,要消費的第一條消息的起始位置有三種:1. CONSUME_FROM_LAST_OFFSET從queue的當前最后一條消息開始消費;2. CONSUME_FROM_FIRST_OFFSET從queue的第一條消息開始消費;3.CONSUME_FROM_TIMESTAMP從指定的具體時間戳位置的消息開始消費。
- offset提交broker支持同步和異步的方式,同步提交offset時,會等待broker響應后再進行下一條消息的消費;異步提交offset時,不用等待就繼續(xù)消費消息。默認異步提交的方式,但如果發(fā)生rebance,則因為消費者消費的消息offset可能大于broker存的offsetid,下一個消費者會從broker 存儲的offset的位置繼續(xù)消費,可能會有消息重復消費的問題
怎么保證消息不丟失
處理重復消息
冪等機制: 版本號、業(yè)務流水號、業(yè)務狀態(tài)等
如何保證消息有序性
處理消息堆積
原因: 生產(chǎn)者的生產(chǎn)速率與消費者的消費速率不匹配
優(yōu)化:1. 優(yōu)化消費者的處理性能;2. 增加topic隊列和消費者數(shù)量,即水平擴容,注意一個topic下的隊列只會分配給一個消費者。
rocketMq消息消費
- 消費者和broker的消息獲取,有推拉兩種模式,activeMq用推的模式。RocketMq和kafka均采用拉的模式,通過長輪詢的方式,通過消費者等待消息,當有消息時broker會直接返回消息,如果沒有消息,則會采用延遲處理的方式,并且為了保證消息實時性,當有新消息到來時,會及時返回消息
RocketMq事務消息
- rocketmq事務消息事務開始時小發(fā)起一個半消息,本地事務執(zhí)行后,根據(jù)執(zhí)行結果發(fā)送消息的提交或者回滾,半消息對消費者是不可見的,消息提交后才放入正常隊列供消費者消費。
- 生產(chǎn)者發(fā)送的提交或回滾有可能會失敗,因此需要producer暴露一個接口,供broker定時查詢事務狀態(tài)。
RocketMq
原理
參考文章
特點
部署
RocketMq由四部分構成:Producer、Consumer、Broker、NameServer
- producer(生產(chǎn)者):可集群部署,先與nameServer中的一臺建立長鏈接,獲取當前要發(fā)送的topic存放在哪個Broker Master上,再與其建立長連接,支持多種負載均衡模式發(fā)送消息。producer會與滿足條件的多個broker建立長連接,輪詢他們來發(fā)送消息,同步發(fā)送消息失敗的情況下,默認(retryTimesWhenSendFailed = 2)會重試其他節(jié)點兩次,異步發(fā)送失敗也會重試兩次(retryTimesWhenSendAsyncFailed = 2),但只會重試當前失敗的節(jié)點。
- consumer(消費者):可集群部署,也會先與nameServer中的一臺建立長鏈接,獲取當前要的消息topic存放在哪個Broker master 、slaver上,然后與他們建立長連接,支持集群消費和廣播消費。消息消費失敗,rocketmq會給每個消費者組設置一個重試隊列,%retry%+consumerGroup,設置了很多重試級別來延遲重試時間。超過一定重試次數(shù)都失敗,會移入死信隊列Topic %DLQ%" + ConsumerGroup,人工處理死信隊列里的消息
- Broker: 負責消息的存儲、查詢消費,支持主從部署,一個master對應多個slave,master支持讀寫,slave只支持讀。**broker會向集群中的每一臺nameServer注冊自己的路由信息。**生產(chǎn)上不建議開啟自動創(chuàng)建主題的配置,可能會導致消息發(fā)送不均衡。多Master多Slave模式,同步雙寫部署模式,會避免丟消息
- nameServer: 是一個簡單的Topic路由注冊中心,支持Broker的動態(tài)注冊發(fā)現(xiàn),保存Topic和Broker的關系。通常也集群部署,但各nameServer間不會相互通信,每個nameServer都有完整的路由信息,即無狀態(tài)。維護集群中存活的broker列表,broker信息,主題和隊列信息
工作過程:
先啟動nameServer集群,各nameServer間不會有信息交互。Broker啟動后,會向所有nameServer定時發(fā)送心跳包(默認30s),包括:ip/port/topicInfo,nameServer會定期掃描broker存活列表,如果超過120s沒有心跳,則移除此broker信息,代表下線。
每個nameServer就知道集群中所有broker的信息,producer上線,從某個nameserver中可以得知他要發(fā)送的topic位于哪個broker上,與對應broker(master角色)建立長連接,發(fā)送消息。
consumer也從nameserver中獲取broker master 、slave的信息,并與他們建立長連接,接收消息
使用建議:一個生產(chǎn)者應用集群使用一個Topic(生產(chǎn)者只關注消息生產(chǎn),而不關心由誰來消費消息,消費者的增減對生產(chǎn)者無感),通過tags標記業(yè)務類型。不同的消費者設置不同的消費者組,對于不是當前消費者能消費的消息,跳過即可
RocketMq性能高的原因:RocketMq采用文件系統(tǒng)存儲消息,采用順序寫的方式寫入消息,使用零拷貝發(fā)送消息,這三者的結合極大地保證了RocketMq的性能
RocketMq消息清理
參考文章
- Broker消息被順序寫入commitLog文件,清理不是以消息的粒度來清理的(消息大小不定,文件順序寫,按消息級清理性能低),而是commitLog文件為單位進行清理
- 清理commitLog文件,可以手工清理,或者滿足一定條件下自動清理(commitlog文件存在一個過期時間,默認為72小時,即三天):
- 對于RocketMQ系統(tǒng)來說,刪除一個1G大小的文件,是一個壓力巨大的IO操作。在刪除過程中,系統(tǒng)性能會驟然下降。默認在訪問量最小的時候刪除;我們應保障磁盤空間的空閑率,不要使系統(tǒng)出現(xiàn)在其它時間點刪除commitlog文件的情況。建議linux文件系統(tǒng)采用ext4(刪除操作ext4比ext3高)
rabbitMq
參考文章
鏡像集群模式
常見消息隊列對比
參考文章
總結
- 上一篇: 企业如何确定是否需要BPM(出自:计世网
- 下一篇: 微型计算机接口技术学啥的,微型计算机接口