Kafka Controller Redesign 方案
轉載自??Kafka Controller Redesign 方案
Kafka Controller 是 Kafka 的核心組件,在前面的文章中,已經詳細講述過 Controller 部分的內容。在過去的幾年根據大家在生產環境中應用的反饋,Controller 也積累了一些比較大的問題,而針對這些問題的修復,代碼的改動量都是非常大的,無疑是一次重構,因此,社區準備在新版的系統里對 Controller 做一些相應的優化(0.11.0及以后的版本),相應的設計方案見:Kafka Controller Redesign,本文的內容就是結合這篇文章做一個簡單的總結。
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
?
Controller 功能
在一個 Kafka 中,Controller 要處理的事情總結如下表所示:
?
?
Controller 目前存在的問題
之所以要重新設計 Controller,是因為現在的 Controller 積累了一些比較難解決的問題,這些問題解決起來,代碼改動量都是巨大的,甚至需要改變 controller 部門的設計,基本就跟重構差不多了,下面我們先來了看一下 controller 之前(主要是 0.11.0 之前的版本)存在的一些問題。
目前遇到的比較大的問題有以下幾個:
Partition 級別同步 zk 寫;
sequential per-partition controller-to-broker requests;
Controller 復雜的并發語義;
代碼組織混亂;
控制類請求與數據類請求未分離;
Controller 給 broker 的請求中沒有 broker 的 generation信息;
ZkClient 阻礙 Client 的狀態管理。
?
Partition 級別同步 zk 寫
zookeeper 的同步寫意味著在下次寫之前需要等待前面整個過程的結束,而且由于它們都是 partition 粒度的(一個 Partition 一個 Partition 的去執行寫操作),對于 Partition 非常多的集群來說,需要等待的時間會更長,Controller 通常會在下面這兩個地方做 Partition 級別 zookeeper 同步寫操作:
PartitionStateMachine 在進行觸發 leader 選舉(partition 目的狀態是 OnlinePartition),將會觸發上面的操作;
ReplicaStateMachine 更新 LeaderAndIsr 信息到 zk(replica 狀態轉變為 OfflineReplica),這種情況也觸發這種情況,它既阻礙了 Controller 進程,也有可能會 zk 造成壓力。
?
sequential per-partition controller-to-broker requests
Controller 在向 Broker 發送請求,有些情況下也是 Partition 粒度去發送的,效率非常低,比如在 Controller 處理 broker shutdown 請求時,這里是按 Partition 級別處理,每處理一個 Partition 都會執行 Partition、Replica 狀態變化以及 Metadata 更新,并且調用 sendRequestsToBrokers() 向 broker 發送請求,這樣的話,效率將變得非常低。
?
Controller 復雜的并發語義
Controller 需要在多個線程之間共享狀態信息,這些線程有:
IO threads handling controlled shutdown requests
The ZkClient org.I0Itec.zkclient.ZkEventThread processing zookeeper callbacks sequentially;
The TopicDeletionManager kafka.controller.DeleteTopicsThread;
Per-broker RequestSendThread within ControllerChannelManager.
所有這些線程都需要訪問或修改狀態信息(ControllerContext),現在它們是通過 ControllerContext 的 controllerLock(排它鎖)實現的,Controller 的并發變得虛弱無力。
?
代碼組織混亂
KafkaController 部分的代碼組織(KafkaController、PartitionStateMachine 和 ReplicaStateMachine)不是很清晰,比如,下面的問題就很難回答:
where and when does zookeeper get updated?
where and when does a controller-to-broker request get formed?
what impact does a failing zookeeper update or controller-to-broker request have on the cluster state?
這也導致了這部分很多開發者不敢輕易去改動。
?
控制類請求與數據類請求未分離
現在 broker 收到的請求,有來自 client、broker 和 controller 的請求,這些請求都會被放到同一個 requestQueue 中,它們有著同樣的優先級,所以來自 client 的請求很可能會影響來自 controller 請求的處理(如果是 leader 變動的請求,ack 設置的不是 all,這種情況有可能會導致數據丟失)。
Controller 給 broker 的請求中沒有 broker 的 generation信息
這里的 Broker generation 代表著一個標識,每當它重新加入集群時,這個標識都會變化。如果 Controller 的請求沒有這個信息的話,可能會導致一個重啟的 Broker 收到之前的請求,讓 Broker 進入到一個錯誤的狀態。
比如,Broker 收到之前的 StopReplica 請求,可能會導致副本同步線程退出。
?
ZkClient 阻礙 Client 的狀態管理
這里的狀態管理指的是當 Client 發生重連或會話過期時,Client 可以監控這種狀態變化,并做出一些處理,因為開源版的 ZKClient 在處理 notification 時,是線性處理的,一些 notification 會被先放到 ZkEventThread’s queue 中,這樣會導致一些最新的 notification 不能及時被處理,特別是與 zk 連接斷開重連的情況。
?
Controller 改進方案
關于上述問題,Kafka 提出了一些改進方案,有些已經在最新版的系統中實現,有的還在規劃中。
?
使用異步的 zk API
Zookeeper 的 client 提供三種執行請求的方式:
同步調用,意味著下次請求需要等待當前當前請求的完成;
異步調用,意味著不需要等待當前請求的完成就可以開始下次請求的執行,并且我們可以通過回調機制去處理請求返回的結果;
單請求的 batch 調用,意味著 batch 內的所有請求都會在一次事務處理中完成,這里需要關注的是 zookeeper 的 server 對單請求的大小是有限制的(jute.maxbuffer)。
文章中給出了三種請求的測試結果,Kafka 最后選取的是異步處理機制,因為對于單請求處理,異步處理更加簡潔,并且相比于同步處理還可以保持一個更好的寫性能。
improve controller-to-broker request batching
這個在設計文檔還是 TODO 狀態,具體的方案還沒確定,不過基本可以猜測一下,因為目的是提高 batch 發送能力,那么只能是在調用對每個 broker 的 RequestSenderThread 線程發送請求之前,做一下檢測,而不是來一個請求立馬就發送,這是一個性能與時間的權衡,如果不是立馬發送請求,那么可能會帶來 broker 短時 metadata 信息的不一致,這個不一致時間不同的應用場景要求是不一樣的。
?
單線程的事件處理模型
采用單線程的時間處理模型將極大簡化 Controller 的并發實現,只允許這個線程訪問和修改 Controller 的本地狀態信息,因此在 Controller 部分也就不需要到處加鎖來保證線程安全了。
目前 1.1.0 的實現中,Controller 使用了一個 ControllerEventThread 線程來處理所有的 event,目前可以支持13種不同類型事件:
Idle:代表當前 ControllerEventThread 處理空閑狀態;
ControllerChange:Controller 切換處理;
BrokerChange:Broker 變動處理,broker 可能有上線或掉線;
TopicChange:Topic 新增處理;
TopicDeletion:Topic 刪除處理;
PartitionReassignment:Partition 副本遷移處理;
AutoLeaderBalance:自動 rebalance 處理;
ManualLeaderBalance:最優 leader 選舉處理,這里叫做手動 rebalance,手動去切流量;
ControlledShutdown:優雅關閉 broker;
IsrChange:Isr 變動處理;
LeaderAndIsrResponseReceived;
LogDirChange:Broker 某個目錄失敗后的處理(比如磁盤壞掉等);
ControllerShutdown:ControllerEventThread 處理這個事件時,會關閉當前線程。
重構集群狀態管理
這部分的改動,目前社區也沒有一個很好的解決思路,重構這部分的目的是希望 Partition、Replica 的狀態管理變得更清晰一些,讓我們從代碼中可以清楚地明白狀態是在什么時間、什么地方、什么條件下被觸發的。這個優化其實是跟上面那個有很大關聯,采用單線程的事件處理模型,可以讓狀態管理也變得更清晰。
prioritize controller requests
我們想要把控制類請求與數據類請求分開,提高 controller 請求的優先級,這樣的話即使 Broker 中請求有堆積,Broker 也會優先處理控制類的請求。
這部分的優化可以在網絡層的 RequestChannel 中做,RequestChannel 可以根據請求的 id 信息把請求分為正常的和優先的,如果請求是 UpdateMetadataRequest、LeaderAndIsrRequest 或者 StopReplicaRequest,那么這個請求的優先級應該提高。實現方案有以下兩種:
在請求隊列中增加一個優先級隊列,優先級高的請求放到 the prioritized request queue 中,優先級低的放到普通請求隊列中,但是無論使用一個定時拉取(poll)還是2個定時拉取,都會帶來其他的問題,要么是增大普通請求的處理延遲,要么是增大了優先級高請求的延遲;
直接使用優先級隊列代替現在的普通隊列,設計上更傾向與這一種。
?
目前這部分在1.1.0中還未實現。
Controller 發送請求中添加 broker 的 generation 信息
generation 信息是用來標識當前 broker 加入集群 epoch 信息,每當 broker 重新加入集群中,該 broker.id 對應的 generation 都應該變化(要求遞增),目前有兩種實現方案:
為 broker 分配的一個全局唯一的 id,由 controller 廣播給其他 broker;
直接使用 zookeeper 的 zxid 信息(broker.id 注冊時的 zxid)。
?
直接使用原生的 Zookeeper client
Client 端的狀態管理意味著當 Client 端發生狀態變化(像連接中斷或回話超時)時,我們有能力做一些操作。其中,zookeeper client 有效的狀態(目前的 client 比下面又多了幾種狀態,這里先不深入)是:
-
NOT_CONNECTED: the initial state of the client;
-
CONNECTING: the client is establishing a connection to zookeeper;
-
CONNECTED: the client has established a connection and session to zookeeper;
-
CLOSED: the session has closed or expired。
有效的狀態轉移是:
-
NOT_CONNECTED > CONNECTING
-
CONNECTING > CONNECTED
-
CONNECTING > CLOSED
-
CONNECTED > CONNECTING
-
CONNECTED > CLOSED
最開始的設想是直接使用原生 Client 的異步調用方式,這樣的話依然可以通過回調方法監控到狀態的變化(像連接中斷或回話超時),同樣,在每次事件處理時,可以通過檢查狀態信息來監控到 Client 狀態的變化,及時做一些處理。
當一個 Client 接收到連接中斷的 notification(Client 狀態變成了 CONNECTING 狀態),它意味著 Client 不能再從 zookeeper 接收到任何 notification 了。如果斷開連接,對于 Controller 而言,無論它現在正在做什么它都應該先暫停,因為可能集群的 Controller 已經切換到其他機器上了,只是它還沒接收到通知,它如果還在工作,可能會導致集群狀態不一致。當連接斷開后,Client 可以重新建立連接(re-establish,狀態變為 CONNECTED)或者會話過期(狀態變為 CLOSED,會話過期是由 zookeeper Server 來決定的)。如果變成了 CONNECTED 狀態,Controller 應該重新開始這些暫停的操作,而如果狀態變成了 CLOSED 狀態,舊的 Controller 就會知道它不再是 controller,應該丟棄掉這些任務。
?
參考
-
Kafka Controller Redesign;
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
-
Kafka controller重設計。
https://www.cnblogs.com/huxi2b/p/6980045.html
總結
以上是生活随笔為你收集整理的Kafka Controller Redesign 方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从零开始用好 Maven : 从 Hel
- 下一篇: 相机领域的御三家,各自产品都有什么特点?