Kafka controller重设计
本文主要參考社區(qū)0.11版本Controller的重設(shè)計方案,試圖給大家梳理一下Kafka controller這個組件在設(shè)計上的一些重要思考。眾所周知,Kafka中有個關(guān)鍵組件叫controller,負責(zé)管理和協(xié)調(diào)Kafka集群。網(wǎng)上關(guān)于controller的源碼分析也有很多,本文就不再大段地列出代碼重復(fù)做這件事情了。實際上,對于controller的代碼我一直覺得寫的非常混亂,各種調(diào)用關(guān)系十分復(fù)雜,想要完整地理解它的工作原理確實不易。好在我們就是普通的使用者,大致了解controller的工作原理即可。下面我就帶各位簡要了解一下當(dāng)前Kafka controller的原理架構(gòu)以及社區(qū)為什么要在大改controller的設(shè)計。
Controller是做什么的
“負責(zé)管理和協(xié)調(diào)Kafka集群”的說法實在沒有什么營養(yǎng),上點干貨吧——具體來說Controller目前主要提供多達10種的Kafka服務(wù)功能的實現(xiàn),它們分別是:
- UpdateMetadataRequest:更新元數(shù)據(jù)請求。topic分區(qū)狀態(tài)經(jīng)常會發(fā)生變更(比如leader重新選舉了或副本集合變化了等)。由于當(dāng)前clients只能與分區(qū)的leader broker進行交互,那么一旦發(fā)生變更,controller會將最新的元數(shù)據(jù)廣播給所有存活的broker。具體方式就是給所有broker發(fā)送UpdateMetadataRequest請求
- CreateTopics: 創(chuàng)建topic請求。當(dāng)前不管是通過API方式、腳本方式抑或是CreateTopics請求方式來創(chuàng)建topic,做法幾乎都是在Zookeeper的/brokers/topics下創(chuàng)建znode來觸發(fā)創(chuàng)建邏輯,而controller會監(jiān)聽該path下的變更來執(zhí)行真正的“創(chuàng)建topic”邏輯
- DeleteTopics:刪除topic請求。和CreateTopics類似,也是通過創(chuàng)建Zookeeper下的/admin/delete_topics/<topic>節(jié)點來觸發(fā)刪除topic,controller執(zhí)行真正的邏輯
- 分區(qū)重分配:即kafka-reassign-partitions腳本做的事情。同樣是與Zookeeper結(jié)合使用,腳本寫入/admin/reassign_partitions節(jié)點來觸發(fā),controller負責(zé)按照方案分配分區(qū)
- Preferred leader分配:preferred leader選舉當(dāng)前有兩種觸發(fā)方式:1. 自動觸發(fā)(auto.leader.rebalance.enable = true);2.?kafka-preferred-replica-election腳本觸發(fā)。兩者“玩法”相同,向Zookeeper的/admin/preferred_replica_election寫數(shù)據(jù),controller提取數(shù)據(jù)執(zhí)行preferred leader分配
- 分區(qū)擴展:即增加topic分區(qū)數(shù)。標準做法也是通過kafka-reassign-partitions腳本完成,不過用戶可直接往Zookeeper中寫數(shù)據(jù)來實現(xiàn),比如直接把新增分區(qū)的副本集合寫入到/brokers/topics/<topic>下,然后controller會為你自動地選出leader并增加分區(qū)
- 集群擴展:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務(wù)發(fā)現(xiàn)的工作
- broker崩潰:同樣地,controller通過Zookeeper可實時偵測broker狀態(tài)。一旦有broker掛掉了,controller可立即感知并為受影響分區(qū)選舉新的leader
- ControlledShutdown:broker除了崩潰,還能“優(yōu)雅”地退出。broker一旦自行終止,controller會接收到一個ControlledShudownRequest請求,然后controller會妥善處理該請求并執(zhí)行各種收尾工作
- Controller leader選舉:controller必然要提供自己的leader選舉以防這個全局唯一的組件崩潰宕機導(dǎo)致服務(wù)中斷。這個功能也是通過Zookeeper的幫助實現(xiàn)的
Controller當(dāng)前設(shè)計
當(dāng)前controller啟動時會為集群中所有broker創(chuàng)建一個各自的連接。這么說吧,假設(shè)你的集群中有100臺broker,那么controller啟動時會創(chuàng)建100個Socket連接(也包括與它自己的連接!)。當(dāng)前新版本的Kafka統(tǒng)一使用了NetworkClient類來建模底層的網(wǎng)絡(luò)連接(有興趣研究源碼的可以去看下這個類,它主要依賴于Java NIO的Selector)。Controller會為每個連接都創(chuàng)建一個對應(yīng)的請求發(fā)送線程,專門負責(zé)給對應(yīng)的broker發(fā)送請求。也就是說,如果還是那100臺broker,那么controller啟動時還會創(chuàng)建100個RequestSendThread線程。當(dāng)前的設(shè)計中Controller只能給broker發(fā)送三類請求,它們是:
- UpdateMetadataRequest:更新元數(shù)據(jù)
- LeaderAndIsrRequest:創(chuàng)建分區(qū)、副本以及完成必要的leader和/或follower角色的工作
- StopReplicaRequest:停止副本請求,還可能刪除分區(qū)副本
Controller通常都是發(fā)送請求給broker的,只有上面談到的controller 10大功能中的ControlledShutdownRequest請求是例外:這個請求是待關(guān)閉的broker通過RPC發(fā)送給controller的,即它的方向是反的。另外這個請求還有一個特別之處就是其他所有功能或是請求都是通過Zookeeper間接與controller交互的,只有它是直接與controller進行交互的。
Controller組成
構(gòu)成controller的組件太多了,多到我已經(jīng)不想用文字表達了,直接上圖吧:
其中比較重要的組件包括:
- ControllerContext:可以說是controller的緩存。當(dāng)前controller為人詬病的原因之一就是用了大量的同步機制來保護這個東西。ControllerContext的構(gòu)成如下圖所示:
緩存內(nèi)容十分豐富,這也是controller可以協(xié)調(diào)管理整個cluster的基礎(chǔ)。
- TopicDeletionManager:負責(zé)刪除topic的組件
- ****Selector:controller提供的各種功能的leader選舉器
- ****Listener:controller注冊的各種Zookeeper監(jiān)聽器。想要讓controller無所不能,必然要注冊各種"觸角" 才能實時感知各種變化
Controller當(dāng)前問題
?不謙虛地說,我混跡社區(qū)也有些日子了。在里面碰到過很多關(guān)于controller的bug。社區(qū)對于這些bug有個很共性的特點,那就是沒有什么人愿意(敢去)改這部分代碼,因為它實在是太復(fù)雜了。具體的問題包括:
1. 需要在多線程間共享狀態(tài)
編寫正確的多線程程序一直是Java開發(fā)者的痛點。在Controller的實現(xiàn)類KafkaController中創(chuàng)建了很多線程,比如之前提到的RequestSendThread線程,另外ZkClient也會創(chuàng)建單獨的線程來處理zookeeper回調(diào),這還不算TopicDeletionManager創(chuàng)建的線程和其他IO線程等。幾乎所有這些線程都需要訪問ControllerContext(RequestSendThread只操作它們專屬的請求隊列,不會訪問ControllerContext),因此必要的多線程同步機制是一定需要的。當(dāng)前是使用controllerLock鎖來實現(xiàn)的,因此可以說沒有并行度可言。
2. 代碼組織混亂
看過源代碼的人相信對這一點深有體會。KafkaController、PartitionStateMachine和ReplicaStateMachine每個都是500+行的大類且彼此混調(diào)的現(xiàn)象明顯,比如KafkaController的stopOldReplicasOfReassignedPartition方法調(diào)用ReplicaStateMachine的handleStateChanges方法,而后者又會調(diào)用KafkaController的remoteReplicaFromIsr方法。類似的情況還發(fā)生在KafkaController和ControllerChannelManager之間。
3. 管理類請求與數(shù)據(jù)類請求未分離
當(dāng)前broker對入站請求類型不做任何優(yōu)先級處理,不論是PRODUCE請求、FETCH請求還是Controller類的請求。這就可能造成一個問題:即clients發(fā)送的數(shù)據(jù)類請求積壓導(dǎo)致controller推遲了管理類請求的處理。設(shè)想這樣的場景,假設(shè)controller向broker廣播了leader發(fā)生變更。于是新leader開始接收clients端請求,而同時老leader所在的broker由于出現(xiàn)了數(shù)據(jù)類請求的積壓使得它一直忙于處理這些請求而無法處理controller發(fā)來的LeaderAndIsrRequest請求,因此這是就會出現(xiàn)“雙主”的情況——也就是所謂的腦裂。此時倘若client發(fā)送的一個PRODUCE請求未指定acks=-1,那么因為日志水位截斷的緣故這個請求包含的消息就可能“丟失”了。現(xiàn)在社區(qū)中關(guān)于controller丟失數(shù)據(jù)的bug大多是因為這個原因造成的。
4. Controller同步寫Zookeeper且是一個分區(qū)一個分區(qū)地寫
當(dāng)前controller操作Zookeeper是通過ZkClient來完成的。ZkClient目前是同步寫入Zookeeper,而同步通常意味著性能不高。更為嚴重的是,controller是一個分區(qū)一個分區(qū)進行寫入的,對于分區(qū)數(shù)很多的集群來說,這無疑是個巨大的性能瓶頸。如果用戶仔細查看源代碼,可以發(fā)現(xiàn)PartitionStateMachine的electLeaderForPartition就是一個分區(qū)一個分區(qū)地選舉的。
5. Controller按照一個分區(qū)一個分區(qū)的發(fā)送請求
Controller當(dāng)前發(fā)送請求都是按照分區(qū)級別發(fā)送的,即一個分區(qū)一個分區(qū)地發(fā)送。沒有任何batch或并行可言,效率很低。
6. Controller給broker的請求無版本號信息
這里的版本號類似于new consumer的generation,總之是要有一種機制告訴controller broker的版本信息。因為有些情況下broker會處理本已過期或失效的請求導(dǎo)致broker狀態(tài)不一致。舉個例子,如果一個broker正常關(guān)閉過程中“宕機”了,那么重啟之后這個broker就有可能處理之前controller發(fā)送過來的StopReplicaRequest,導(dǎo)致某些副本被置成offline從而無法使用。而這肯定不是我們希望看到的結(jié)果,對吧?
7. ZkClient阻礙狀態(tài)管理
Contoller目前是使用了ZkClient這個開源工具,它可以自動重建會話并使用特有的線程順序處理所有的Zookeeper監(jiān)聽消息。因為是順序處理,它就有可能無法及時響應(yīng)最新的狀態(tài)變更導(dǎo)致Kafka集群狀態(tài)的不一致。
Controller改進方案
1. 單線程事件模型
和new consumer類似,controller摒棄多線程的模型,采用單線程的事件隊列模型。這樣簡化了設(shè)計同時也避免了復(fù)雜的同步機制。各位在最新的trunk分支上已然可以看到這種變化:增加了ControllerEventManager類以及對應(yīng)的ControllerEventThread線程類專門負責(zé)處理ControllerEvent。目前總共有9種controller event,它們分別是:
- Idle
- ControllerChange
- BrokerChange
- TopicChange
- TopicDeletion
- PartitionReassignment
- AutoLeaderBalance
- ManualLeaderBalance
- ControlledShutdown
- IsrChange
我們基本上可以從名字就能判斷出它們分別代表了什么事件。
2. 使用Zookeeper的async API
將所有同步操作Zookeeper的地方都改成異步調(diào)用+回調(diào)的方式。實際上Apache Zookeeper客戶端執(zhí)行請求的方式有三種:同步、異步和batch。通常以batch性能最好,但Kafka社區(qū)目前還是傾向于用async替換sync。畢竟實現(xiàn)起來相對簡單同時性能上也能得到不少提升。
3. 重構(gòu)狀態(tài)管理
可能摒棄之前狀態(tài)機的方式,采用和GroupCoordinator類似的方式,讓controller保存所有的狀態(tài)并且負責(zé)狀態(tài)的流轉(zhuǎn)以及狀態(tài)流轉(zhuǎn)過程中的邏輯。當(dāng)然,具體的實現(xiàn)還要再結(jié)合0.11最終代碼才能確定。
4. 對請求排定優(yōu)先級
對管理類請求和數(shù)據(jù)類請求區(qū)分優(yōu)先級。比如使用優(yōu)先級隊列替換現(xiàn)有的BlockingQueue——社區(qū)應(yīng)該已經(jīng)實現(xiàn)了這個功能,開發(fā)了一個叫PrioritizationAwareBlockingQueue的類來做這件事情,后續(xù)大家可以看下這個類的源代碼
5. 為controller發(fā)送的請求匹配broker版本信息
為broker設(shè)定版本號(generation id)。如果controller發(fā)送過來的請求中包含的generation與broker自己的generation不匹配, 那么broker會拒絕該請求。
6. 拋棄ZkClient,使用原生Zookeeper client
ZkClient是同步順序處理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API還能夠在接收到狀態(tài)變更通知時便馬上開始處理,而ZkClient的特定線程則必須要在隊列中順序處理到這條變更消息時才能處理。
結(jié)語
以上就是關(guān)于Kafka controller的一些討論,包括了它當(dāng)前的組件構(gòu)成、設(shè)計問題以及對應(yīng)的改進方案。有很多地方可能理解的還不是透徹,期待著在Kafka 0.11正式版本中可以看到全新的controller組件。
總結(jié)
以上是生活随笔為你收集整理的Kafka controller重设计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: day2编写购物商城(1)
- 下一篇: 【码云周刊第 24 期】超实用 Ando