kafka消费组与重平衡机制详解
1.消費(fèi)者組
1.1 介紹
消費(fèi)者組,即 Consumer Group,應(yīng)該算是 Kafka 比較有亮點(diǎn)的設(shè)計(jì)了。
那么何謂 Consumer Group 呢?
Consumer Group 是 Kafka 提供的可擴(kuò)展且具有容錯性的消費(fèi)者機(jī)制。既然是一個組,那么組內(nèi)必然可以有多個消費(fèi)者或消費(fèi)者實(shí)例(Consumer Instance),它們共享一個公共的 ID,這個 ID 被稱為 Group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題(Subscribed Topics)的所有分區(qū)(Partition)。當(dāng)然,每個分區(qū)只能由同一個消費(fèi)者組內(nèi)的一個 Consumer 實(shí)例來消費(fèi)。
大概可以總結(jié)為以下三點(diǎn):
-
Consumer Group 下可以有一個或多個 Consumer 實(shí)例。這里的實(shí)例可以是一個單獨(dú)的進(jìn)程,也可以是同一進(jìn)程下的線程。在實(shí)際場景中,使用進(jìn)程更為常見一些。
-
Group ID 是一個字符串,在一個 Kafka 集群中,它標(biāo)識唯一的一個 Consumer Group。
-
Consumer Group 下所有實(shí)例訂閱的主題的單個分區(qū),只能分配給組內(nèi)的某個 Consumer 實(shí)例消費(fèi)。這個分區(qū)當(dāng)然也可以被其他的 Group 消費(fèi)。
Consumer Group 之間彼此獨(dú)立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。kafka可以利用這一機(jī)制,同時實(shí)現(xiàn)消息引擎的兩大模型:點(diǎn)對點(diǎn)模型和發(fā)布/訂閱模型:如果所有實(shí)例都屬于同一個 Group,那么它實(shí)現(xiàn)的就是消息隊(duì)列模型;如果所有實(shí)例分別屬于不同的 Group,那么它實(shí)現(xiàn)的就是發(fā)布 / 訂閱模型。
1.2 實(shí)例數(shù)量
在實(shí)際使用場景中,我怎么知道一個 Group 下該有多少個 Consumer 實(shí)例呢?理想情況下,Consumer 實(shí)例的數(shù)量應(yīng)該等于該 Group 訂閱主題的分區(qū)總數(shù)。
舉個簡單的例子,假設(shè)一個 Consumer Group 訂閱了 3 個主題,分別是 A、B、C,它們的分區(qū)數(shù)依次是 1、2、3,那么通常情況下,為該 Group 設(shè)置 6 個 Consumer 實(shí)例是比較理想的情形,因?yàn)樗茏畲笙薅鹊貙?shí)現(xiàn)高伸縮性。
如果你有 3 個實(shí)例,那么平均下來每個實(shí)例大約消費(fèi) 2 個分區(qū)(6 / 3 = 2);如果你設(shè)置了 8 個實(shí)例,那么很遺憾,有 2 個實(shí)例(8 – 6 = 2)將不會被分配任何分區(qū),它們永遠(yuǎn)處于空閑狀態(tài)。因此,在實(shí)際使用過程中一般不推薦設(shè)置大于總分區(qū)數(shù)的 Consumer 實(shí)例。設(shè)置多余的實(shí)例只會浪費(fèi)資源,而沒有任何好處。
2.重平衡機(jī)制
2.1 介紹
Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個 Consumer Group 下的所有 Consumer 如何達(dá)成一致,來分配訂閱 Topic 的每個分區(qū)。比如某個 Group 下有 20 個 Consumer 實(shí)例,它訂閱了一個具有 100 個分區(qū)的 Topic。正常情況下,Kafka 平均會為每個 Consumer 分配 5 個分區(qū)。這個分配的過程就叫 Rebalance。
那么 Consumer Group 何時進(jìn)行 Rebalance 呢?Rebalance 的觸發(fā)條件有 3 個。
組成員數(shù)發(fā)生變更。比如有新的 Consumer 實(shí)例加入組或者離開組,抑或是有 Consumer 實(shí)例崩潰被“踢出”組。
訂閱主題數(shù)發(fā)生變更。Consumer Group 可以使用正則表達(dá)式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結(jié)尾的主題。在 Consumer Group 的運(yùn)行過程中,你新創(chuàng)建了一個滿足這樣條件的主題,那么該 Group 就會發(fā)生 Rebalance。
訂閱主題的分區(qū)數(shù)發(fā)生變更。Kafka 當(dāng)前只能允許增加一個主題的分區(qū)數(shù)。當(dāng)分區(qū)數(shù)增加時,就會觸發(fā)訂閱該主題的所有 Group 開啟 Rebalance。
Rebalance 發(fā)生時,Group 下所有的 Consumer 實(shí)例都會協(xié)調(diào)在一起共同參與。你可能會問,每個 Consumer 實(shí)例怎么知道應(yīng)該消費(fèi)訂閱主題的哪些分區(qū)呢?這就需要分配策略的協(xié)助了。
當(dāng)前 Kafka 默認(rèn)提供了 3 種分配策略,每種策略都有一定的優(yōu)勢和劣勢。
三種策略具體介紹:https://blog.csdn.net/fy_java1995/article/details/106405169
2.2 注意點(diǎn)
首先,Rebalance 過程對 Consumer Group 消費(fèi)過程有極大的影響。如果你了解 JVM 的垃圾回收機(jī)制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。
Java中Stop-The-World機(jī)制簡稱STW,是在執(zhí)行垃圾收集算法時,Java應(yīng)用程序的其他所有線程都被掛起(除了垃圾收集幫助器之外)。Java中一種全局暫停現(xiàn)象,全局停頓,所有Java代碼停止,native代碼可以執(zhí)行,但不能與JVM交互;這些現(xiàn)象多半是由于gc引起。
在 STW 期間,所有應(yīng)用線程都會停止工作,表現(xiàn)為整個應(yīng)用程序僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 實(shí)例都會停止消費(fèi),等待 Rebalance 完成。這是 Rebalance 為人詬病的一個方面。
所以,我們應(yīng)該盡量避免ReBalance。
在實(shí)際情況中,大部分情況下,都是由于Consumer實(shí)例的增加或減少導(dǎo)致的ReBalance。
當(dāng) Consumer Group 完成 Rebalance 之后,每個 Consumer 實(shí)例都會定期地向 Coordinator 發(fā)送心跳請求,表明它還存活著。如果某個 Consumer 實(shí)例不能及時地發(fā)送這些心跳請求,Coordinator 就會認(rèn)為該 Consumer 已經(jīng)“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數(shù),叫 session.timeout.ms,就是被用來表征此事的。該參數(shù)的默認(rèn)值是 10 秒,即如果 Coordinator 在 10 秒之內(nèi)沒有收到 Group 下某 Consumer 實(shí)例的心跳,它就會認(rèn)為這個 Consumer 實(shí)例已經(jīng)掛了。可以這么說,session.timeout.ms 決定了 Consumer 存活性的時間間隔。
除了這個參數(shù),Consumer 還提供了一個允許你控制發(fā)送心跳請求頻率的參數(shù),就是 heartbeat.interval.ms。這個值設(shè)置得越小,Consumer 實(shí)例發(fā)送心跳請求的頻率就越高。頻繁地發(fā)送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開啟 Rebalance,因?yàn)?#xff0c;目前 Coordinator 通知各個 Consumer 實(shí)例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標(biāo)志封裝進(jìn)心跳請求的響應(yīng)體中。
除了以上兩個參數(shù),Consumer 端還有一個參數(shù),用于控制 Consumer 實(shí)際消費(fèi)能力對 Rebalance 的影響,即 max.poll.interval.ms 參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時間間隔。它的默認(rèn)值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
2.3 如何通知到其它消費(fèi)者
重平衡過程是如何通知到其他消費(fèi)者實(shí)例的?答案就是,靠消費(fèi)者端的心跳線程(Heartbeat Thread)。
Kafka Java 消費(fèi)者需要定期地發(fā)送心跳請求(Heartbeat Request)到 Broker 端的協(xié)調(diào)者,以表明它還存活著。在 Kafka 0.10.1.0 版本之前,發(fā)送心跳請求是在消費(fèi)者主線程完成的,也就是你寫代碼調(diào)用 KafkaConsumer.poll 方法的那個線程。
這樣做有諸多弊病,最大的問題在于,消息處理邏輯也是在這個線程中完成的。因此,一旦消息處理消耗了過長的時間,心跳請求將無法及時發(fā)到協(xié)調(diào)者那里,導(dǎo)致協(xié)調(diào)者“錯誤地”認(rèn)為該消費(fèi)者已“死”。自 0.10.1.0 版本開始,社區(qū)引入了一個單獨(dú)的心跳線程來專門執(zhí)行心跳請求發(fā)送,避免了這個問題。
但這和重平衡又有什么關(guān)系呢?其實(shí),重平衡的通知機(jī)制正是通過心跳線程來完成的。當(dāng)協(xié)調(diào)者決定開啟新一輪重平衡后,它會將“REBALANCE_IN_PROGRESS”封裝進(jìn)心跳請求的響應(yīng)中,發(fā)還給消費(fèi)者實(shí)例。當(dāng)消費(fèi)者實(shí)例發(fā)現(xiàn)心跳響應(yīng)中包含了“REBALANCE_IN_PROGRESS”,就能立馬知道重平衡又開始了,這就是重平衡的通知機(jī)制。
重平衡一旦開啟,Broker 端的協(xié)調(diào)者組件就要開始忙了,主要涉及到控制消費(fèi)者組的狀態(tài)流轉(zhuǎn)。當(dāng)前,Kafka 設(shè)計(jì)了一套消費(fèi)者組狀態(tài)機(jī)(State Machine),來幫助協(xié)調(diào)者完成整個重平衡流程。嚴(yán)格來說,這套狀態(tài)機(jī)屬于非常底層的設(shè)計(jì),Kafka 官網(wǎng)上壓根就沒有提到過,但你最好還是了解一下,因?yàn)樗軌驇椭愀愣M(fèi)者組的設(shè)計(jì)原理,比如消費(fèi)者組的過期位移(Expired Offsets)刪除等。
目前,Kafka 為消費(fèi)者組定義了 5 種狀態(tài),它們分別是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。那么,這 5 種狀態(tài)的含義是什么呢?我們一起來看看下面這張表格。
?
狀態(tài)流轉(zhuǎn)圖如下:
?
一個消費(fèi)者組最開始是 Empty 狀態(tài),當(dāng)重平衡過程開啟后,它會被置于 PreparingRebalance 狀態(tài)等待成員加入,之后變更到 CompletingRebalance 狀態(tài)等待分配方案,最后流轉(zhuǎn)到 Stable 狀態(tài)完成重平衡。
當(dāng)有新成員加入或已有成員退出時,消費(fèi)者組的狀態(tài)從 Stable 直接跳到 PreparingRebalance 狀態(tài),此時,所有現(xiàn)存成員就必須重新申請加入組。當(dāng)所有成員都退出組后,消費(fèi)者組狀態(tài)變更為 Empty。Kafka 定期自動刪除過期位移的條件就是,組要處于 Empty 狀態(tài)。因此,如果你的消費(fèi)者組停掉了很長時間(超過 7 天),那么 Kafka 很可能就把該組的位移數(shù)據(jù)刪除了。
重平衡的完整流程需要消費(fèi)者端和協(xié)調(diào)者組件共同參與才能完成。我們先從消費(fèi)者的視角來審視一下重平衡的流程。
2.4 消費(fèi)者端重平衡流程
在消費(fèi)者端,重平衡分為兩個步驟:分別是加入組和等待領(lǐng)導(dǎo)者消費(fèi)者(Leader Consumer)分配方案。這兩個步驟分別對應(yīng)兩類特定的請求:JoinGroup 請求和 SyncGroup 請求。
當(dāng)組內(nèi)成員加入組時,它會向協(xié)調(diào)者發(fā)送 JoinGroup 請求。在該請求中,每個成員都要將自己訂閱的主題上報(bào),這樣協(xié)調(diào)者就能收集到所有成員的訂閱信息。一旦收集了全部成員的 JoinGroup 請求后,協(xié)調(diào)者會從這些成員中選擇一個擔(dān)任這個消費(fèi)者組的領(lǐng)導(dǎo)者。
通常情況下,第一個發(fā)送 JoinGroup 請求的成員自動成為領(lǐng)導(dǎo)者。你一定要注意區(qū)分這里的領(lǐng)導(dǎo)者和之前我們介紹的領(lǐng)導(dǎo)者副本,它們不是一個概念。這里的領(lǐng)導(dǎo)者是具體的消費(fèi)者實(shí)例,它既不是副本,也不是協(xié)調(diào)者。領(lǐng)導(dǎo)者消費(fèi)者的任務(wù)是收集所有成員的訂閱信息,然后根據(jù)這些信息,制定具體的分區(qū)消費(fèi)分配方案。
選出領(lǐng)導(dǎo)者之后,協(xié)調(diào)者會把消費(fèi)者組訂閱信息封裝進(jìn) JoinGroup 請求的響應(yīng)體中,然后發(fā)給領(lǐng)導(dǎo)者,由領(lǐng)導(dǎo)者統(tǒng)一做出分配方案后,進(jìn)入到下一步:發(fā)送 SyncGroup 請求。
在這一步中,領(lǐng)導(dǎo)者向協(xié)調(diào)者發(fā)送 SyncGroup 請求,將剛剛做出的分配方案發(fā)給協(xié)調(diào)者。值得注意的是,其他成員也會向協(xié)調(diào)者發(fā)送 SyncGroup 請求,只不過請求體中并沒有實(shí)際的內(nèi)容。這一步的主要目的是讓協(xié)調(diào)者接收分配方案,然后統(tǒng)一以 SyncGroup 響應(yīng)的方式分發(fā)給所有成員,這樣組內(nèi)所有成員就都知道自己該消費(fèi)哪些分區(qū)了。
接下來,我用一張圖來形象地說明一下 JoinGroup 請求的處理過程。
?
就像前面說的,JoinGroup 請求的主要作用是將組成員訂閱信息發(fā)送給領(lǐng)導(dǎo)者消費(fèi)者,待領(lǐng)導(dǎo)者制定好分配方案后,重平衡流程進(jìn)入到 SyncGroup 請求階段。
下面這張圖描述的是 SyncGroup 請求的處理流程。
?
SyncGroup 請求的主要目的,就是讓協(xié)調(diào)者把領(lǐng)導(dǎo)者制定的分配方案下發(fā)給各個組內(nèi)成員。當(dāng)所有成員都成功接收到分配方案后,消費(fèi)者組進(jìn)入到 Stable 狀態(tài),即開始正常的消費(fèi)工作。
2.5 Broker端重平衡流程
要剖析協(xié)調(diào)者端處理重平衡的全流程,我們必須要分幾個場景來討論。這幾個場景分別是新成員加入組、組成員主動離組、組成員崩潰離組、組成員提交位移。
場景一:新成員入組
新成員入組是指組處于 Stable 狀態(tài)后,有新成員加入。如果是全新啟動一個消費(fèi)者組,Kafka 是有一些自己的小優(yōu)化的,流程上會有些許的不同。我們這里討論的是,組穩(wěn)定了之后有新成員加入的情形。
當(dāng)協(xié)調(diào)者收到新的 JoinGroup 請求后,它會通過心跳請求響應(yīng)的方式通知組內(nèi)現(xiàn)有的所有成員,強(qiáng)制它們開啟新一輪的重平衡。具體的過程和之前的客戶端重平衡流程是一樣的。現(xiàn)在,我用一張時序圖來說明協(xié)調(diào)者一端是如何處理新成員入組的。
?
場景二:組成員主動離組。
何謂主動離組?就是指消費(fèi)者實(shí)例所在線程或進(jìn)程調(diào)用 close() 方法主動通知協(xié)調(diào)者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協(xié)調(diào)者收到 LeaveGroup 請求后,依然會以心跳響應(yīng)的方式通知其他成員,因此我就不再贅述了,還是直接用一張圖來說明。
?
場景三:組成員奔潰離組。
崩潰離組是指消費(fèi)者實(shí)例出現(xiàn)嚴(yán)重故障,突然宕機(jī)導(dǎo)致的離組。它和主動離組是有區(qū)別的,因?yàn)楹笳呤侵鲃影l(fā)起的離組,協(xié)調(diào)者能馬上感知并處理。但崩潰離組是被動的,協(xié)調(diào)者通常需要等待一段時間才能感知到,這段時間一般是由消費(fèi)者端參數(shù) session.timeout.ms 控制的。也就是說,Kafka 一般不會超過 session.timeout.ms 就能感知到這個崩潰。當(dāng)然,后面處理崩潰離組的流程與之前是一樣的,我們來看看下面這張圖。
?
場景四:重平衡時協(xié)調(diào)者對組內(nèi)成員提交位移的處理。
正常情況下,每個組內(nèi)成員都會定期匯報(bào)位移給協(xié)調(diào)者。當(dāng)重平衡開啟時,協(xié)調(diào)者會給予成員一段緩沖時間,要求每個成員必須在這段時間內(nèi)快速地上報(bào)自己的位移信息,然后再開啟正常的 JoinGroup/SyncGroup 請求發(fā)送。還是老辦法,我們使用一張圖來說明。
?
總結(jié)
以上是生活随笔為你收集整理的kafka消费组与重平衡机制详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka数据存储详解
- 下一篇: Kafka在Spring项目中的实战演练