Kafka Controller Redesign 方案
轉(zhuǎn)載自??Kafka Controller Redesign 方案
Kafka Controller 是 Kafka 的核心組件,在前面的文章中,已經(jīng)詳細(xì)講述過 Controller 部分的內(nèi)容。在過去的幾年根據(jù)大家在生產(chǎn)環(huán)境中應(yīng)用的反饋,Controller 也積累了一些比較大的問題,而針對這些問題的修復(fù),代碼的改動量都是非常大的,無疑是一次重構(gòu),因此,社區(qū)準(zhǔn)備在新版的系統(tǒng)里對 Controller 做一些相應(yīng)的優(yōu)化(0.11.0及以后的版本),相應(yīng)的設(shè)計(jì)方案見:Kafka Controller Redesign,本文的內(nèi)容就是結(jié)合這篇文章做一個簡單的總結(jié)。
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
?
Controller 功能
在一個 Kafka 中,Controller 要處理的事情總結(jié)如下表所示:
?
?
Controller 目前存在的問題
之所以要重新設(shè)計(jì) Controller,是因?yàn)楝F(xiàn)在的 Controller 積累了一些比較難解決的問題,這些問題解決起來,代碼改動量都是巨大的,甚至需要改變 controller 部門的設(shè)計(jì),基本就跟重構(gòu)差不多了,下面我們先來了看一下 controller 之前(主要是 0.11.0 之前的版本)存在的一些問題。
目前遇到的比較大的問題有以下幾個:
Partition 級別同步 zk 寫;
sequential per-partition controller-to-broker requests;
Controller 復(fù)雜的并發(fā)語義;
代碼組織混亂;
控制類請求與數(shù)據(jù)類請求未分離;
Controller 給 broker 的請求中沒有 broker 的 generation信息;
ZkClient 阻礙 Client 的狀態(tài)管理。
?
Partition 級別同步 zk 寫
zookeeper 的同步寫意味著在下次寫之前需要等待前面整個過程的結(jié)束,而且由于它們都是 partition 粒度的(一個 Partition 一個 Partition 的去執(zhí)行寫操作),對于 Partition 非常多的集群來說,需要等待的時間會更長,Controller 通常會在下面這兩個地方做 Partition 級別 zookeeper 同步寫操作:
PartitionStateMachine 在進(jìn)行觸發(fā) leader 選舉(partition 目的狀態(tài)是 OnlinePartition),將會觸發(fā)上面的操作;
ReplicaStateMachine 更新 LeaderAndIsr 信息到 zk(replica 狀態(tài)轉(zhuǎn)變?yōu)?OfflineReplica),這種情況也觸發(fā)這種情況,它既阻礙了 Controller 進(jìn)程,也有可能會 zk 造成壓力。
?
sequential per-partition controller-to-broker requests
Controller 在向 Broker 發(fā)送請求,有些情況下也是 Partition 粒度去發(fā)送的,效率非常低,比如在 Controller 處理 broker shutdown 請求時,這里是按 Partition 級別處理,每處理一個 Partition 都會執(zhí)行 Partition、Replica 狀態(tài)變化以及 Metadata 更新,并且調(diào)用 sendRequestsToBrokers() 向 broker 發(fā)送請求,這樣的話,效率將變得非常低。
?
Controller 復(fù)雜的并發(fā)語義
Controller 需要在多個線程之間共享狀態(tài)信息,這些線程有:
IO threads handling controlled shutdown requests
The ZkClient org.I0Itec.zkclient.ZkEventThread processing zookeeper callbacks sequentially;
The TopicDeletionManager kafka.controller.DeleteTopicsThread;
Per-broker RequestSendThread within ControllerChannelManager.
所有這些線程都需要訪問或修改狀態(tài)信息(ControllerContext),現(xiàn)在它們是通過 ControllerContext 的 controllerLock(排它鎖)實(shí)現(xiàn)的,Controller 的并發(fā)變得虛弱無力。
?
代碼組織混亂
KafkaController 部分的代碼組織(KafkaController、PartitionStateMachine 和 ReplicaStateMachine)不是很清晰,比如,下面的問題就很難回答:
where and when does zookeeper get updated?
where and when does a controller-to-broker request get formed?
what impact does a failing zookeeper update or controller-to-broker request have on the cluster state?
這也導(dǎo)致了這部分很多開發(fā)者不敢輕易去改動。
?
控制類請求與數(shù)據(jù)類請求未分離
現(xiàn)在 broker 收到的請求,有來自 client、broker 和 controller 的請求,這些請求都會被放到同一個 requestQueue 中,它們有著同樣的優(yōu)先級,所以來自 client 的請求很可能會影響來自 controller 請求的處理(如果是 leader 變動的請求,ack 設(shè)置的不是 all,這種情況有可能會導(dǎo)致數(shù)據(jù)丟失)。
Controller 給 broker 的請求中沒有 broker 的 generation信息
這里的 Broker generation 代表著一個標(biāo)識,每當(dāng)它重新加入集群時,這個標(biāo)識都會變化。如果 Controller 的請求沒有這個信息的話,可能會導(dǎo)致一個重啟的 Broker 收到之前的請求,讓 Broker 進(jìn)入到一個錯誤的狀態(tài)。
比如,Broker 收到之前的 StopReplica 請求,可能會導(dǎo)致副本同步線程退出。
?
ZkClient 阻礙 Client 的狀態(tài)管理
這里的狀態(tài)管理指的是當(dāng) Client 發(fā)生重連或會話過期時,Client 可以監(jiān)控這種狀態(tài)變化,并做出一些處理,因?yàn)殚_源版的 ZKClient 在處理 notification 時,是線性處理的,一些 notification 會被先放到 ZkEventThread’s queue 中,這樣會導(dǎo)致一些最新的 notification 不能及時被處理,特別是與 zk 連接斷開重連的情況。
?
Controller 改進(jìn)方案
關(guān)于上述問題,Kafka 提出了一些改進(jìn)方案,有些已經(jīng)在最新版的系統(tǒng)中實(shí)現(xiàn),有的還在規(guī)劃中。
?
使用異步的 zk API
Zookeeper 的 client 提供三種執(zhí)行請求的方式:
同步調(diào)用,意味著下次請求需要等待當(dāng)前當(dāng)前請求的完成;
異步調(diào)用,意味著不需要等待當(dāng)前請求的完成就可以開始下次請求的執(zhí)行,并且我們可以通過回調(diào)機(jī)制去處理請求返回的結(jié)果;
單請求的 batch 調(diào)用,意味著 batch 內(nèi)的所有請求都會在一次事務(wù)處理中完成,這里需要關(guān)注的是 zookeeper 的 server 對單請求的大小是有限制的(jute.maxbuffer)。
文章中給出了三種請求的測試結(jié)果,Kafka 最后選取的是異步處理機(jī)制,因?yàn)閷τ趩握埱筇幚?#xff0c;異步處理更加簡潔,并且相比于同步處理還可以保持一個更好的寫性能。
improve controller-to-broker request batching
這個在設(shè)計(jì)文檔還是 TODO 狀態(tài),具體的方案還沒確定,不過基本可以猜測一下,因?yàn)槟康氖翘岣?batch 發(fā)送能力,那么只能是在調(diào)用對每個 broker 的 RequestSenderThread 線程發(fā)送請求之前,做一下檢測,而不是來一個請求立馬就發(fā)送,這是一個性能與時間的權(quán)衡,如果不是立馬發(fā)送請求,那么可能會帶來 broker 短時 metadata 信息的不一致,這個不一致時間不同的應(yīng)用場景要求是不一樣的。
?
單線程的事件處理模型
采用單線程的時間處理模型將極大簡化 Controller 的并發(fā)實(shí)現(xiàn),只允許這個線程訪問和修改 Controller 的本地狀態(tài)信息,因此在 Controller 部分也就不需要到處加鎖來保證線程安全了。
目前 1.1.0 的實(shí)現(xiàn)中,Controller 使用了一個 ControllerEventThread 線程來處理所有的 event,目前可以支持13種不同類型事件:
Idle:代表當(dāng)前 ControllerEventThread 處理空閑狀態(tài);
ControllerChange:Controller 切換處理;
BrokerChange:Broker 變動處理,broker 可能有上線或掉線;
TopicChange:Topic 新增處理;
TopicDeletion:Topic 刪除處理;
PartitionReassignment:Partition 副本遷移處理;
AutoLeaderBalance:自動 rebalance 處理;
ManualLeaderBalance:最優(yōu) leader 選舉處理,這里叫做手動 rebalance,手動去切流量;
ControlledShutdown:優(yōu)雅關(guān)閉 broker;
IsrChange:Isr 變動處理;
LeaderAndIsrResponseReceived;
LogDirChange:Broker 某個目錄失敗后的處理(比如磁盤壞掉等);
ControllerShutdown:ControllerEventThread 處理這個事件時,會關(guān)閉當(dāng)前線程。
重構(gòu)集群狀態(tài)管理
這部分的改動,目前社區(qū)也沒有一個很好的解決思路,重構(gòu)這部分的目的是希望 Partition、Replica 的狀態(tài)管理變得更清晰一些,讓我們從代碼中可以清楚地明白狀態(tài)是在什么時間、什么地方、什么條件下被觸發(fā)的。這個優(yōu)化其實(shí)是跟上面那個有很大關(guān)聯(lián),采用單線程的事件處理模型,可以讓狀態(tài)管理也變得更清晰。
prioritize controller requests
我們想要把控制類請求與數(shù)據(jù)類請求分開,提高 controller 請求的優(yōu)先級,這樣的話即使 Broker 中請求有堆積,Broker 也會優(yōu)先處理控制類的請求。
這部分的優(yōu)化可以在網(wǎng)絡(luò)層的 RequestChannel 中做,RequestChannel 可以根據(jù)請求的 id 信息把請求分為正常的和優(yōu)先的,如果請求是 UpdateMetadataRequest、LeaderAndIsrRequest 或者 StopReplicaRequest,那么這個請求的優(yōu)先級應(yīng)該提高。實(shí)現(xiàn)方案有以下兩種:
在請求隊(duì)列中增加一個優(yōu)先級隊(duì)列,優(yōu)先級高的請求放到 the prioritized request queue 中,優(yōu)先級低的放到普通請求隊(duì)列中,但是無論使用一個定時拉取(poll)還是2個定時拉取,都會帶來其他的問題,要么是增大普通請求的處理延遲,要么是增大了優(yōu)先級高請求的延遲;
直接使用優(yōu)先級隊(duì)列代替現(xiàn)在的普通隊(duì)列,設(shè)計(jì)上更傾向與這一種。
?
目前這部分在1.1.0中還未實(shí)現(xiàn)。
Controller 發(fā)送請求中添加 broker 的 generation 信息
generation 信息是用來標(biāo)識當(dāng)前 broker 加入集群 epoch 信息,每當(dāng) broker 重新加入集群中,該 broker.id 對應(yīng)的 generation 都應(yīng)該變化(要求遞增),目前有兩種實(shí)現(xiàn)方案:
為 broker 分配的一個全局唯一的 id,由 controller 廣播給其他 broker;
直接使用 zookeeper 的 zxid 信息(broker.id 注冊時的 zxid)。
?
直接使用原生的 Zookeeper client
Client 端的狀態(tài)管理意味著當(dāng) Client 端發(fā)生狀態(tài)變化(像連接中斷或回話超時)時,我們有能力做一些操作。其中,zookeeper client 有效的狀態(tài)(目前的 client 比下面又多了幾種狀態(tài),這里先不深入)是:
-
NOT_CONNECTED: the initial state of the client;
-
CONNECTING: the client is establishing a connection to zookeeper;
-
CONNECTED: the client has established a connection and session to zookeeper;
-
CLOSED: the session has closed or expired。
有效的狀態(tài)轉(zhuǎn)移是:
-
NOT_CONNECTED > CONNECTING
-
CONNECTING > CONNECTED
-
CONNECTING > CLOSED
-
CONNECTED > CONNECTING
-
CONNECTED > CLOSED
最開始的設(shè)想是直接使用原生 Client 的異步調(diào)用方式,這樣的話依然可以通過回調(diào)方法監(jiān)控到狀態(tài)的變化(像連接中斷或回話超時),同樣,在每次事件處理時,可以通過檢查狀態(tài)信息來監(jiān)控到 Client 狀態(tài)的變化,及時做一些處理。
當(dāng)一個 Client 接收到連接中斷的 notification(Client 狀態(tài)變成了 CONNECTING 狀態(tài)),它意味著 Client 不能再從 zookeeper 接收到任何 notification 了。如果斷開連接,對于 Controller 而言,無論它現(xiàn)在正在做什么它都應(yīng)該先暫停,因?yàn)榭赡芗旱?Controller 已經(jīng)切換到其他機(jī)器上了,只是它還沒接收到通知,它如果還在工作,可能會導(dǎo)致集群狀態(tài)不一致。當(dāng)連接斷開后,Client 可以重新建立連接(re-establish,狀態(tài)變?yōu)?CONNECTED)或者會話過期(狀態(tài)變?yōu)?CLOSED,會話過期是由 zookeeper Server 來決定的)。如果變成了 CONNECTED 狀態(tài),Controller 應(yīng)該重新開始這些暫停的操作,而如果狀態(tài)變成了 CLOSED 狀態(tài),舊的 Controller 就會知道它不再是 controller,應(yīng)該丟棄掉這些任務(wù)。
?
參考
-
Kafka Controller Redesign;
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
-
Kafka controller重設(shè)計(jì)。
https://www.cnblogs.com/huxi2b/p/6980045.html
總結(jié)
以上是生活随笔為你收集整理的Kafka Controller Redesign 方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从零开始用好 Maven : 从 Hel
- 下一篇: 一文理解Netty模型架构