Kafka controller重设计
本文主要參考社區(qū)0.11版本Controller的重設(shè)計(jì)方案,試圖給大家梳理一下Kafka controller這個(gè)組件在設(shè)計(jì)上的一些重要思考。眾所周知,Kafka中有個(gè)關(guān)鍵組件叫controller,負(fù)責(zé)管理和協(xié)調(diào)Kafka集群。網(wǎng)上關(guān)于controller的源碼分析也有很多,本文就不再大段地列出代碼重復(fù)做這件事情了。實(shí)際上,對(duì)于controller的代碼我一直覺(jué)得寫的非常混亂,各種調(diào)用關(guān)系十分復(fù)雜,想要完整地理解它的工作原理確實(shí)不易。好在我們就是普通的使用者,大致了解controller的工作原理即可。下面我就帶各位簡(jiǎn)要了解一下當(dāng)前Kafka controller的原理架構(gòu)以及社區(qū)為什么要在大改controller的設(shè)計(jì)。
Controller是做什么的
“負(fù)責(zé)管理和協(xié)調(diào)Kafka集群”的說(shuō)法實(shí)在沒(méi)有什么營(yíng)養(yǎng),上點(diǎn)干貨吧——具體來(lái)說(shuō)Controller目前主要提供多達(dá)10種的Kafka服務(wù)功能的實(shí)現(xiàn),它們分別是:
- UpdateMetadataRequest:更新元數(shù)據(jù)請(qǐng)求。topic分區(qū)狀態(tài)經(jīng)常會(huì)發(fā)生變更(比如leader重新選舉了或副本集合變化了等)。由于當(dāng)前clients只能與分區(qū)的leader broker進(jìn)行交互,那么一旦發(fā)生變更,controller會(huì)將最新的元數(shù)據(jù)廣播給所有存活的broker。具體方式就是給所有broker發(fā)送UpdateMetadataRequest請(qǐng)求
- CreateTopics: 創(chuàng)建topic請(qǐng)求。當(dāng)前不管是通過(guò)API方式、腳本方式抑或是CreateTopics請(qǐng)求方式來(lái)創(chuàng)建topic,做法幾乎都是在Zookeeper的/brokers/topics下創(chuàng)建znode來(lái)觸發(fā)創(chuàng)建邏輯,而controller會(huì)監(jiān)聽該path下的變更來(lái)執(zhí)行真正的“創(chuàng)建topic”邏輯
- DeleteTopics:刪除topic請(qǐng)求。和CreateTopics類似,也是通過(guò)創(chuàng)建Zookeeper下的/admin/delete_topics/<topic>節(jié)點(diǎn)來(lái)觸發(fā)刪除topic,controller執(zhí)行真正的邏輯
- 分區(qū)重分配:即kafka-reassign-partitions腳本做的事情。同樣是與Zookeeper結(jié)合使用,腳本寫入/admin/reassign_partitions節(jié)點(diǎn)來(lái)觸發(fā),controller負(fù)責(zé)按照方案分配分區(qū)
- Preferred leader分配:preferred leader選舉當(dāng)前有兩種觸發(fā)方式:1. 自動(dòng)觸發(fā)(auto.leader.rebalance.enable = true);2.?kafka-preferred-replica-election腳本觸發(fā)。兩者“玩法”相同,向Zookeeper的/admin/preferred_replica_election寫數(shù)據(jù),controller提取數(shù)據(jù)執(zhí)行preferred leader分配
- 分區(qū)擴(kuò)展:即增加topic分區(qū)數(shù)。標(biāo)準(zhǔn)做法也是通過(guò)kafka-reassign-partitions腳本完成,不過(guò)用戶可直接往Zookeeper中寫數(shù)據(jù)來(lái)實(shí)現(xiàn),比如直接把新增分區(qū)的副本集合寫入到/brokers/topics/<topic>下,然后controller會(huì)為你自動(dòng)地選出leader并增加分區(qū)
- 集群擴(kuò)展:新增broker時(shí)Zookeeper中/brokers/ids下會(huì)新增znode,controller自動(dòng)完成服務(wù)發(fā)現(xiàn)的工作
- broker崩潰:同樣地,controller通過(guò)Zookeeper可實(shí)時(shí)偵測(cè)broker狀態(tài)。一旦有broker掛掉了,controller可立即感知并為受影響分區(qū)選舉新的leader
- ControlledShutdown:broker除了崩潰,還能“優(yōu)雅”地退出。broker一旦自行終止,controller會(huì)接收到一個(gè)ControlledShudownRequest請(qǐng)求,然后controller會(huì)妥善處理該請(qǐng)求并執(zhí)行各種收尾工作
- Controller leader選舉:controller必然要提供自己的leader選舉以防這個(gè)全局唯一的組件崩潰宕機(jī)導(dǎo)致服務(wù)中斷。這個(gè)功能也是通過(guò)Zookeeper的幫助實(shí)現(xiàn)的
Controller當(dāng)前設(shè)計(jì)
當(dāng)前controller啟動(dòng)時(shí)會(huì)為集群中所有broker創(chuàng)建一個(gè)各自的連接。這么說(shuō)吧,假設(shè)你的集群中有100臺(tái)broker,那么controller啟動(dòng)時(shí)會(huì)創(chuàng)建100個(gè)Socket連接(也包括與它自己的連接!)。當(dāng)前新版本的Kafka統(tǒng)一使用了NetworkClient類來(lái)建模底層的網(wǎng)絡(luò)連接(有興趣研究源碼的可以去看下這個(gè)類,它主要依賴于Java NIO的Selector)。Controller會(huì)為每個(gè)連接都創(chuàng)建一個(gè)對(duì)應(yīng)的請(qǐng)求發(fā)送線程,專門負(fù)責(zé)給對(duì)應(yīng)的broker發(fā)送請(qǐng)求。也就是說(shuō),如果還是那100臺(tái)broker,那么controller啟動(dòng)時(shí)還會(huì)創(chuàng)建100個(gè)RequestSendThread線程。當(dāng)前的設(shè)計(jì)中Controller只能給broker發(fā)送三類請(qǐng)求,它們是:
- UpdateMetadataRequest:更新元數(shù)據(jù)
- LeaderAndIsrRequest:創(chuàng)建分區(qū)、副本以及完成必要的leader和/或follower角色的工作
- StopReplicaRequest:停止副本請(qǐng)求,還可能刪除分區(qū)副本
Controller通常都是發(fā)送請(qǐng)求給broker的,只有上面談到的controller 10大功能中的ControlledShutdownRequest請(qǐng)求是例外:這個(gè)請(qǐng)求是待關(guān)閉的broker通過(guò)RPC發(fā)送給controller的,即它的方向是反的。另外這個(gè)請(qǐng)求還有一個(gè)特別之處就是其他所有功能或是請(qǐng)求都是通過(guò)Zookeeper間接與controller交互的,只有它是直接與controller進(jìn)行交互的。
Controller組成
構(gòu)成controller的組件太多了,多到我已經(jīng)不想用文字表達(dá)了,直接上圖吧:
其中比較重要的組件包括:
- ControllerContext:可以說(shuō)是controller的緩存。當(dāng)前controller為人詬病的原因之一就是用了大量的同步機(jī)制來(lái)保護(hù)這個(gè)東西。ControllerContext的構(gòu)成如下圖所示:
緩存內(nèi)容十分豐富,這也是controller可以協(xié)調(diào)管理整個(gè)cluster的基礎(chǔ)。
- TopicDeletionManager:負(fù)責(zé)刪除topic的組件
- ****Selector:controller提供的各種功能的leader選舉器
- ****Listener:controller注冊(cè)的各種Zookeeper監(jiān)聽器。想要讓controller無(wú)所不能,必然要注冊(cè)各種"觸角" 才能實(shí)時(shí)感知各種變化
Controller當(dāng)前問(wèn)題
?不謙虛地說(shuō),我混跡社區(qū)也有些日子了。在里面碰到過(guò)很多關(guān)于controller的bug。社區(qū)對(duì)于這些bug有個(gè)很共性的特點(diǎn),那就是沒(méi)有什么人愿意(敢去)改這部分代碼,因?yàn)樗鼘?shí)在是太復(fù)雜了。具體的問(wèn)題包括:
1. 需要在多線程間共享狀態(tài)
編寫正確的多線程程序一直是Java開發(fā)者的痛點(diǎn)。在Controller的實(shí)現(xiàn)類KafkaController中創(chuàng)建了很多線程,比如之前提到的RequestSendThread線程,另外ZkClient也會(huì)創(chuàng)建單獨(dú)的線程來(lái)處理zookeeper回調(diào),這還不算TopicDeletionManager創(chuàng)建的線程和其他IO線程等。幾乎所有這些線程都需要訪問(wèn)ControllerContext(RequestSendThread只操作它們專屬的請(qǐng)求隊(duì)列,不會(huì)訪問(wèn)ControllerContext),因此必要的多線程同步機(jī)制是一定需要的。當(dāng)前是使用controllerLock鎖來(lái)實(shí)現(xiàn)的,因此可以說(shuō)沒(méi)有并行度可言。
2. 代碼組織混亂
看過(guò)源代碼的人相信對(duì)這一點(diǎn)深有體會(huì)。KafkaController、PartitionStateMachine和ReplicaStateMachine每個(gè)都是500+行的大類且彼此混調(diào)的現(xiàn)象明顯,比如KafkaController的stopOldReplicasOfReassignedPartition方法調(diào)用ReplicaStateMachine的handleStateChanges方法,而后者又會(huì)調(diào)用KafkaController的remoteReplicaFromIsr方法。類似的情況還發(fā)生在KafkaController和ControllerChannelManager之間。
3. 管理類請(qǐng)求與數(shù)據(jù)類請(qǐng)求未分離
當(dāng)前broker對(duì)入站請(qǐng)求類型不做任何優(yōu)先級(jí)處理,不論是PRODUCE請(qǐng)求、FETCH請(qǐng)求還是Controller類的請(qǐng)求。這就可能造成一個(gè)問(wèn)題:即clients發(fā)送的數(shù)據(jù)類請(qǐng)求積壓導(dǎo)致controller推遲了管理類請(qǐng)求的處理。設(shè)想這樣的場(chǎng)景,假設(shè)controller向broker廣播了leader發(fā)生變更。于是新leader開始接收clients端請(qǐng)求,而同時(shí)老leader所在的broker由于出現(xiàn)了數(shù)據(jù)類請(qǐng)求的積壓使得它一直忙于處理這些請(qǐng)求而無(wú)法處理controller發(fā)來(lái)的LeaderAndIsrRequest請(qǐng)求,因此這是就會(huì)出現(xiàn)“雙主”的情況——也就是所謂的腦裂。此時(shí)倘若client發(fā)送的一個(gè)PRODUCE請(qǐng)求未指定acks=-1,那么因?yàn)槿罩舅唤財(cái)嗟木壒蔬@個(gè)請(qǐng)求包含的消息就可能“丟失”了。現(xiàn)在社區(qū)中關(guān)于controller丟失數(shù)據(jù)的bug大多是因?yàn)檫@個(gè)原因造成的。
4. Controller同步寫Zookeeper且是一個(gè)分區(qū)一個(gè)分區(qū)地寫
當(dāng)前controller操作Zookeeper是通過(guò)ZkClient來(lái)完成的。ZkClient目前是同步寫入Zookeeper,而同步通常意味著性能不高。更為嚴(yán)重的是,controller是一個(gè)分區(qū)一個(gè)分區(qū)進(jìn)行寫入的,對(duì)于分區(qū)數(shù)很多的集群來(lái)說(shuō),這無(wú)疑是個(gè)巨大的性能瓶頸。如果用戶仔細(xì)查看源代碼,可以發(fā)現(xiàn)PartitionStateMachine的electLeaderForPartition就是一個(gè)分區(qū)一個(gè)分區(qū)地選舉的。
5. Controller按照一個(gè)分區(qū)一個(gè)分區(qū)的發(fā)送請(qǐng)求
Controller當(dāng)前發(fā)送請(qǐng)求都是按照分區(qū)級(jí)別發(fā)送的,即一個(gè)分區(qū)一個(gè)分區(qū)地發(fā)送。沒(méi)有任何batch或并行可言,效率很低。
6. Controller給broker的請(qǐng)求無(wú)版本號(hào)信息
這里的版本號(hào)類似于new consumer的generation,總之是要有一種機(jī)制告訴controller broker的版本信息。因?yàn)橛行┣闆r下broker會(huì)處理本已過(guò)期或失效的請(qǐng)求導(dǎo)致broker狀態(tài)不一致。舉個(gè)例子,如果一個(gè)broker正常關(guān)閉過(guò)程中“宕機(jī)”了,那么重啟之后這個(gè)broker就有可能處理之前controller發(fā)送過(guò)來(lái)的StopReplicaRequest,導(dǎo)致某些副本被置成offline從而無(wú)法使用。而這肯定不是我們希望看到的結(jié)果,對(duì)吧?
7. ZkClient阻礙狀態(tài)管理
Contoller目前是使用了ZkClient這個(gè)開源工具,它可以自動(dòng)重建會(huì)話并使用特有的線程順序處理所有的Zookeeper監(jiān)聽消息。因?yàn)槭琼樞蛱幚?#xff0c;它就有可能無(wú)法及時(shí)響應(yīng)最新的狀態(tài)變更導(dǎo)致Kafka集群狀態(tài)的不一致。
Controller改進(jìn)方案
1. 單線程事件模型
和new consumer類似,controller摒棄多線程的模型,采用單線程的事件隊(duì)列模型。這樣簡(jiǎn)化了設(shè)計(jì)同時(shí)也避免了復(fù)雜的同步機(jī)制。各位在最新的trunk分支上已然可以看到這種變化:增加了ControllerEventManager類以及對(duì)應(yīng)的ControllerEventThread線程類專門負(fù)責(zé)處理ControllerEvent。目前總共有9種controller event,它們分別是:
- Idle
- ControllerChange
- BrokerChange
- TopicChange
- TopicDeletion
- PartitionReassignment
- AutoLeaderBalance
- ManualLeaderBalance
- ControlledShutdown
- IsrChange
我們基本上可以從名字就能判斷出它們分別代表了什么事件。
2. 使用Zookeeper的async API
將所有同步操作Zookeeper的地方都改成異步調(diào)用+回調(diào)的方式。實(shí)際上Apache Zookeeper客戶端執(zhí)行請(qǐng)求的方式有三種:同步、異步和batch。通常以batch性能最好,但Kafka社區(qū)目前還是傾向于用async替換sync。畢竟實(shí)現(xiàn)起來(lái)相對(duì)簡(jiǎn)單同時(shí)性能上也能得到不少提升。
3. 重構(gòu)狀態(tài)管理
可能摒棄之前狀態(tài)機(jī)的方式,采用和GroupCoordinator類似的方式,讓controller保存所有的狀態(tài)并且負(fù)責(zé)狀態(tài)的流轉(zhuǎn)以及狀態(tài)流轉(zhuǎn)過(guò)程中的邏輯。當(dāng)然,具體的實(shí)現(xiàn)還要再結(jié)合0.11最終代碼才能確定。
4. 對(duì)請(qǐng)求排定優(yōu)先級(jí)
對(duì)管理類請(qǐng)求和數(shù)據(jù)類請(qǐng)求區(qū)分優(yōu)先級(jí)。比如使用優(yōu)先級(jí)隊(duì)列替換現(xiàn)有的BlockingQueue——社區(qū)應(yīng)該已經(jīng)實(shí)現(xiàn)了這個(gè)功能,開發(fā)了一個(gè)叫PrioritizationAwareBlockingQueue的類來(lái)做這件事情,后續(xù)大家可以看下這個(gè)類的源代碼
5. 為controller發(fā)送的請(qǐng)求匹配broker版本信息
為broker設(shè)定版本號(hào)(generation id)。如果controller發(fā)送過(guò)來(lái)的請(qǐng)求中包含的generation與broker自己的generation不匹配, 那么broker會(huì)拒絕該請(qǐng)求。
6. 拋棄ZkClient,使用原生Zookeeper client
ZkClient是同步順序處理ZK事件的,而原生Zookeeper client支持async方式。另外使用原生API還能夠在接收到狀態(tài)變更通知時(shí)便馬上開始處理,而ZkClient的特定線程則必須要在隊(duì)列中順序處理到這條變更消息時(shí)才能處理。
結(jié)語(yǔ)
以上就是關(guān)于Kafka controller的一些討論,包括了它當(dāng)前的組件構(gòu)成、設(shè)計(jì)問(wèn)題以及對(duì)應(yīng)的改進(jìn)方案。有很多地方可能理解的還不是透徹,期待著在Kafka 0.11正式版本中可以看到全新的controller組件。
總結(jié)
以上是生活随笔為你收集整理的Kafka controller重设计的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: day2编写购物商城(1)
- 下一篇: 【码云周刊第 24 期】超实用 Ando