rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?
上兩篇文章都在討論順序消息的一些知識,看到有個讀者的留言如下:
這個問題問得非常棒,由于在之前的文章中并沒有提及到,因此我在這篇文章中單獨講解,本文將從消費順序性這個問題出發(fā),深度剖析 Kafka/RocketMQ 消費線程模型。
Kafka
kafka 的消費類 KafkaConsumer 是非線程安全的,因此用戶無法在多線程中共享一個 KafkaConsumer 實例,且 KafkaConsumer 本身并沒有實現(xiàn)多線程消費邏輯,如需多線程消費,還需要用戶自行實現(xiàn),在這里我會講到 Kafka 兩種多線程消費模型。
1、每個線程維護(hù)一個 KafkaConsumer
這樣相當(dāng)于一個進(jìn)程內(nèi)擁有多個消費者,也可以說消費組內(nèi)成員是有多個線程內(nèi)的 KafkaConsumer 組成的。
但其實這個消費模型是存在很大問題的,從消費消費模型可看出每個 KafkaConsumer 會負(fù)責(zé)固定的分區(qū),因此無法提升單個分區(qū)的消費能力,如果一個主題分區(qū)數(shù)量很多,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數(shù)量過多,導(dǎo)致項目 Socket 連接開銷巨大,項目中一般不用該線程模型去消費。
2、單 KafkaConsumer 實例 + 多 worker 線程
針對第一個線程模型的缺點,我們可采取 KafkaConsumer 實例與消息消費邏輯解耦,把消息消費邏輯放入單獨的線程中去處理,線程模型如下:
從消費線程模型可看出,當(dāng) KafkaConsumer 實例與消息消費邏輯解耦后,我們不需要創(chuàng)建多個 KafkaConsumer 實例就可進(jìn)行多線程消費,還可根據(jù)消費的負(fù)載情況動態(tài)調(diào)整 worker 線程,具有很強(qiáng)的獨立擴(kuò)展性,在公司內(nèi)部使用的多線程消費模型就是用的單 KafkaConsumer 實例 + 多 worker 線程模型。
但這個消費模型由于消費邏輯是利用多線程進(jìn)行消費的,因此并不能保證其消息的消費順序,在這里我們可以引入阻塞隊列的模型,一個 woker 線程對應(yīng)一個阻塞隊列,線程不斷輪訓(xùn)從阻塞隊列中獲取消息進(jìn)行消費,對具有相同 key 的消息進(jìn)行取模,并放入相同的隊列中,實現(xiàn)順序消費, 消費模型如下:
但是以上兩個消費線程模型,存在一個問題:
在消費過程中,如果 Kafka 消費組發(fā)生重平衡,此時的分區(qū)被分配給其它消費組了,如果拉取回來的消息沒有被消費,雖然 Kakfa 可以實現(xiàn) ConsumerRebalanceListener 接口,在新一輪重平衡前主動提交消費偏移量,但這貌似解決不了未消費的消息被打亂順序的可能性?
因此在消費前,還需要主動進(jìn)行判斷此分區(qū)是否被分配給其它消費者處理,并且還需要鎖定該分區(qū)在消費當(dāng)中不能被分配到其它消費者中(但 kafka 目前做不到這一點)。
參考 RocketMQ 的做法:
在消費前主動調(diào)用 ProcessQueue#isDropped 方法判斷隊列是否已過期,并且對該隊列進(jìn)行加鎖處理(向 broker 端請求該隊列加鎖)。
RocketMQ
RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已為你準(zhǔn)備好了你的需求,它本身的消費模型就是單 consumer 實例 + 多 worker 線程模型,有興趣的小伙伴可以從以下方法觀摩 RocketMQ 的消費邏輯:
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
RocketMQ 會為每個隊列分配一個 PullRequest,并將其放入 pullRequestQueue,PullMessageService 線程會不斷輪詢從 pullRequestQueue 中取出 PullRequest 去拉取消息,接著將拉取到的消息給到 ConsumeMessageService 處理,ConsumeMessageService 有兩個子接口:
// 并發(fā)消息消費邏輯實現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 順序消息消費邏輯實現(xiàn)類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
其中,ConsumeMessageConcurrentlyService 內(nèi)部有一個線程池,用于并發(fā)消費,同樣地,如果需要順序消費,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 類進(jìn)行順序消息消費處理。
經(jīng)過對 Kafka 消費線程模型的思考之后,從 ConsumeMessageOrderlyService 源碼中能夠看出 RocketMQ 能夠?qū)崿F(xiàn)局部消費順序,我認(rèn)為主要有以下兩點:
1)RocketMQ 會為每個消息隊列建一個對象鎖,這樣只要線程池中有該消息隊列在處理,則需等待處理完才能進(jìn)行下一次消費,保證在當(dāng)前 Consumer 內(nèi),同一隊列的消息進(jìn)行串行消費。
2)向 Broker 端請求鎖定當(dāng)前順序消費的隊列,防止在消費過程中被分配給其它消費者處理從而打亂消費順序。
總結(jié)
經(jīng)過這篇文章的分析后,嘗試回答文章開頭的那個問題:
1)多分區(qū)的情況下:
如果想要保證 Kafka 在消費時要保證消費的順序性,可以使用每個線程維護(hù)一個 KafkaConsumer 實例,并且是一條一條地去拉取消息并進(jìn)行消費(防止重平衡時有可能打亂消費順序);對于能容忍消息短暫亂序的業(yè)務(wù)(話說回來, Kafka 集群也不能保證嚴(yán)格的消息順序),可以使用單 KafkaConsumer 實例 + 多 worker 線程 + 一條線程對應(yīng)一個阻塞隊列消費線程模型。
1)單分區(qū)的情況下:
由于單分區(qū)不存在重平衡問題,以上兩個線程模型的都可以保證消費的順序性。
另外如果是 RocketMQ,使用 MessageListenerOrderly 監(jiān)聽消費可保證消息消費順序。
很多人也有這個疑問:既然 Kafka 和 RocketMQ 都不能保證嚴(yán)格的順序消息,那么順序消費還有意義嗎?
一般來說普通的的順序消息能夠滿足大部分業(yè)務(wù)場景,如果業(yè)務(wù)能夠容忍集群異常狀態(tài)下消息短暫不一致的情況,則不需要嚴(yán)格的順序消息。
如果你對文章還有什么疑問和補(bǔ)充或者發(fā)現(xiàn)文中有錯誤的地方,歡迎留言,我們一起探討。
原文 張程輝
總結(jié)
以上是生活随笔為你收集整理的rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编写图形界面程序,显示一个红色反弹球的程
- 下一篇: FIFO,LRU,OPT的命中、调换过程