日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Kafka设计解析(三):Kafka High Availability (下)--转

發(fā)布時間:2025/4/5 68 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka设计解析(三):Kafka High Availability (下)--转 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-3?utm_source=infoq&utm_campaign=user_page&utm_medium=link#

Kafka是由LinkedIn開發(fā)的一個分布式的消息系統(tǒng),使用Scala編寫,它以可水平擴(kuò)展和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統(tǒng)如Cloudera、Apache Storm、Spark都支持與Kafka集成。InfoQ一直在緊密關(guān)注Kafka的應(yīng)用以及發(fā)展,“Kafka剖析”專欄將會從架構(gòu)設(shè)計(jì)、實(shí)現(xiàn)、應(yīng)用場景、性能等方面深度解析Kafka。

本文在上篇文章基礎(chǔ)上,更加深入講解了Kafka的HA機(jī)制,主要闡述了HA相關(guān)各種場景,如Broker failover、Controller failover、Topic創(chuàng)建/刪除、Broker啟動、Follower從Leader fetch數(shù)據(jù)等詳細(xì)處理過程。同時介紹了Kafka提供的與Replication相關(guān)的工具,如重新分配Partition等。

Broker Failover過程

Controller對Broker failure的處理過程

  • Controller在ZooKeeper的/brokers/ids節(jié)點(diǎn)上注冊Watch。一旦有Broker宕機(jī)(本文用宕機(jī)代表任何讓Kafka認(rèn)為其Broker die的情景,包括但不限于機(jī)器斷電,網(wǎng)絡(luò)不可用,GC導(dǎo)致的Stop The World,進(jìn)程crash等),其在ZooKeeper對應(yīng)的Znode會自動被刪除,ZooKeeper會fire Controller注冊的Watch,Controller即可獲取最新的幸存的Broker列表。
  • Controller決定set_p,該集合包含了宕機(jī)的所有Broker上的所有Partition。
  • 對set_p中的每一個Partition:

    3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該P(yáng)artition當(dāng)前的ISR。

    3.2 決定該P(yáng)artition的新Leader。如果當(dāng)前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當(dāng)前ISR中所有幸存的Replica。否則選擇該P(yáng)artition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數(shù)據(jù)丟失)。如果該P(yáng)artition的所有Replica都宕機(jī)了,則將新的Leader設(shè)置為-1。

    3.3 將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有Controller版本在3.1至3.3的過程中無變化時才會執(zhí)行,否則跳轉(zhuǎn)到3.1。

  • 直接通過RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發(fā)送多個命令從而提高效率。

    Broker failover順序圖如下所示。

  • LeaderAndIsrRequest結(jié)構(gòu)如下

    LeaderAndIsrResponse結(jié)構(gòu)如下

    創(chuàng)建/刪除Topic

  • Controller在ZooKeeper的/brokers/topics節(jié)點(diǎn)上注冊Watch,一旦某個Topic被創(chuàng)建或刪除,則Controller會通過Watch得到新創(chuàng)建/刪除的Topic的Partition/Replica分配。
  • 對于刪除Topic操作,Topic工具會將該Topic名字存于/admin/delete_topics。若delete.topic.enable為true,則Controller注冊在/admin/delete_topics上的Watch被fire,Controller通過回調(diào)向?qū)?yīng)的Broker發(fā)送StopReplicaRequest,若為false則Controller不會在/admin/delete_topics上注冊Watch,也就不會對該事件作出反應(yīng)。
  • 對于創(chuàng)建Topic操作,Controller從/brokers/ids讀取當(dāng)前所有可用的Broker列表,對于set_p中的每一個Partition:

    3.1 從分配給該P(yáng)artition的所有Replica(稱為AR)中任選一個可用的Broker作為新的Leader,并將AR設(shè)置為新的ISR(因?yàn)樵揟opic是新創(chuàng)建的,所以AR中所有的Replica都沒有數(shù)據(jù),可認(rèn)為它們都是同步的,也即都在ISR中,任意一個Replica都可作為Leader)

    3.2 將新的Leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]

  • 直接通過RPC向相關(guān)的Broker發(fā)送LeaderAndISRRequest。

    創(chuàng)建Topic順序圖如下所示。

  • Broker響應(yīng)請求流程

    Broker通過kafka.network.SocketServer及相關(guān)模塊接受各種請求并作出響應(yīng)。整個網(wǎng)絡(luò)通信模塊基于Java NIO開發(fā),并采用Reactor模式,其中包含1個Acceptor負(fù)責(zé)接受客戶請求,N個Processor負(fù)責(zé)讀寫數(shù)據(jù),M個Handler處理業(yè)務(wù)邏輯。

    Acceptor的主要職責(zé)是監(jiān)聽并接受客戶端(請求發(fā)起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的連接請求,并建立和客戶端的數(shù)據(jù)傳輸通道,然后為該客戶端指定一個Processor,至此它對該客戶端該次請求的任務(wù)就結(jié)束了,它可以去響應(yīng)下一個客戶端的連接請求了。其核心代碼如下。

    Processor主要負(fù)責(zé)從客戶端讀取數(shù)據(jù)并將響應(yīng)返回給客戶端,它本身并不處理具體的業(yè)務(wù)邏輯,并且其內(nèi)部維護(hù)了一個隊(duì)列來保存分配給它的所有SocketChannel。Processor的run方法會循環(huán)從隊(duì)列中取出新的SocketChannel并將其SelectionKey.OP_READ注冊到selector上,然后循環(huán)處理已就緒的讀(請求)和寫(響應(yīng))。Processor讀取完數(shù)據(jù)后,將其封裝成Request對象并將其交給RequestChannel。

    RequestChannel是Processor和KafkaRequestHandler交換數(shù)據(jù)的地方,它包含一個隊(duì)列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從里面取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request后返還給客戶端的Response。

    Processor會通過processNewResponses方法依次將requestChannel中responseQueue保存的Response取出,并將對應(yīng)的SelectionKey.OP_WRITE事件注冊到selector上。當(dāng)selector的select方法返回時,對檢測到的可寫通道,調(diào)用write方法將Response返回給客戶端。

    KafkaRequestHandler循環(huán)從RequestChannel中取Request并交給kafka.server.KafkaApis處理具體的業(yè)務(wù)邏輯。

    LeaderAndIsrRequest響應(yīng)過程

    對于收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:

  • 若請求中controllerEpoch小于當(dāng)前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。
  • 對于請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):

    2.1 若partitionStateInfo中的leader epoch大于當(dāng)前ReplicManager中存儲的(topic, partitionId)對應(yīng)的partition的leader epoch,則:

    2.1.1 若當(dāng)前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名為partitionState的HashMap中

    2.1.2 否則說明該Broker不在該P(yáng)artition分配的Replica list中,將該信息記錄于log中

    2.2 否則將相應(yīng)的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中

  • 篩選出partitionState中Leader與當(dāng)前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
  • 若partitionsTobeLeader不為空,則對其執(zhí)行makeLeaders方。
  • 若partitionsToBeFollower不為空,則對其執(zhí)行makeFollowers方法。
  • 若highwatermak線程還未啟動,則將其啟動,并將hwThreadInitialized設(shè)為true。
  • 關(guān)閉所有Idle狀態(tài)的Fetcher。
  • LeaderAndIsrRequest處理過程如下圖所示

    Broker啟動過程

    Broker啟動后首先根據(jù)其ID在ZooKeeper的/brokers/idszonde下創(chuàng)建臨時子節(jié)點(diǎn)(Ephemeral node),創(chuàng)建成功后Controller的ReplicaStateMachine注冊其上的Broker Change Watch會被fire,從而通過回調(diào)KafkaController.onBrokerStartup方法完成以下步驟:

  • 向所有新啟動的Broker發(fā)送UpdateMetadataRequest,其定義如下。

  • 將新啟動的Broker上的所有Replica設(shè)置為OnlineReplica狀態(tài),同時這些Broker會為這些Partition啟動high watermark線程。
  • 通過partitionStateMachine觸發(fā)OnlinePartitionStateChange。
  • Controller Failover

    Controller也需要Failover。每個Broker都會在Controller Path (/controller)上注冊一個Watch。當(dāng)前Controller失敗時,對應(yīng)的Controller Path會自動消失(因?yàn)樗荅phemeral Node),此時該Watch被fire,所有“活”著的Broker都會去競選成為新的Controller(創(chuàng)建新的Controller Path),但是只會有一個競選成功(這點(diǎn)由ZooKeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因?yàn)閆ooKeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注冊。

    Broker成功競選為新Controller后會觸發(fā)KafkaController.onControllerFailover方法,并在該方法中完成如下操作:

  • 讀取并增加Controller Epoch。
  • 在ReassignedPartitions Patch(/admin/reassign_partitions)上注冊Watch。
  • 在PreferredReplicaElection Path(/admin/preferred_replica_election)上注冊Watch。
  • 通過partitionStateMachine在Broker Topics Patch(/brokers/topics)上注冊Watch。
  • 若delete.topic.enable設(shè)置為true(默認(rèn)值是false),則partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注冊Watch。
  • 通過replicaStateMachine在Broker Ids Patch(/brokers/ids)上注冊Watch。
  • 初始化ControllerContext對象,設(shè)置當(dāng)前所有Topic,“活”著的Broker列表,所有Partition的Leader及ISR等。
  • 啟動replicaStateMachine和partitionStateMachine。
  • 將brokerState狀態(tài)設(shè)置為RunningAsController。
  • 將每個Partition的Leadership信息發(fā)送給所有“活”著的Broker。
  • 若auto.leader.rebalance.enable配置為true(默認(rèn)值是true),則啟動partition-rebalance線程。
  • 若delete.topic.enable設(shè)置為true且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic。
  • Partition重新分配

    管理工具發(fā)出重新分配Partition請求后,會將相應(yīng)信息寫到/admin/reassign_partitions上,而該操作會觸發(fā)ReassignedPartitionsIsrChangeListener,從而通過執(zhí)行回調(diào)函數(shù)KafkaController.onPartitionReassignment來完成以下操作:

  • 將ZooKeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  • 強(qiáng)制更新ZooKeeper中的leader epoch,向AR中的每個Replica發(fā)送LeaderAndIsrRequest。
  • 將RAR - OAR中的Replica設(shè)置為NewReplica狀態(tài)。
  • 等待直到RAR中所有的Replica都與其Leader同步。
  • 將RAR中所有的Replica都設(shè)置為OnlineReplica狀態(tài)。
  • 將Cache中的AR設(shè)置為RAR。
  • 若Leader不在RAR中,則從RAR中重新選舉出一個新的Leader并發(fā)送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增加ZooKeeper中的leader epoch。
  • 將OAR - RAR中的所有Replica設(shè)置為OfflineReplica狀態(tài),該過程包含兩部分。第一,將ZooKeeper上ISR中的OAR - RAR移除并向Leader發(fā)送LeaderAndIsrRequest從而通知這些Replica已經(jīng)從ISR中移除;第二,向OAR - RAR中的Replica發(fā)送StopReplicaRequest從而停止不再分配給該P(yáng)artition的Replica。
  • 將OAR - RAR中的所有Replica設(shè)置為NonExistentReplica狀態(tài)從而將其從磁盤上刪除。
  • 將ZooKeeper中的AR設(shè)置為RAR。
  • 刪除/admin/reassign_partition。
  • 注意:最后一步才將ZooKeeper中的AR更新,因?yàn)檫@是唯一一個持久存儲AR的地方,如果Controller在這一步之前crash,新的Controller仍然能夠繼續(xù)完成該過程。

    以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配過程中ZooKeeper中的AR和Leader/ISR路徑如下

    ARleader/isrSttep
    {1,2,3}1/{1,2,3}(initial state)
    {1,2,3,4,5,6}1/{1,2,3}(step 2)
    {1,2,3,4,5,6}1/{1,2,3,4,5,6}(step 4)
    {1,2,3,4,5,6}4/{1,2,3,4,5,6}(step 7)
    {1,2,3,4,5,6}4/{4,5,6}(step 8)
    {4,5,6}4/{4,5,6}(step 10)

    Follower從Leader Fetch數(shù)據(jù)

    Follower通過向Leader發(fā)送FetchRequest獲取消息,FetchRequest結(jié)構(gòu)如下

    從FetchRequest的結(jié)構(gòu)可以看出,每個Fetch請求都要指定最大等待時間和最小獲取字節(jié)數(shù),以及由TopicAndPartition和PartitionFetchInfo構(gòu)成的Map。實(shí)際上,Follower從Leader數(shù)據(jù)和Consumer從Broker Fetch數(shù)據(jù),都是通過FetchRequest請求完成,所以在FetchRequest結(jié)構(gòu)中,其中一個字段是clientID,并且其默認(rèn)值是ConsumerConfig.DefaultClientId。

    Leader收到Fetch請求后,Kafka通過KafkaApis.handleFetchRequest響應(yīng)該請求,響應(yīng)過程如下:

  • replicaManager根據(jù)請求讀出數(shù)據(jù)存入dataRead中。
  • 如果該請求來自Follower則更新其相應(yīng)的LEO(log end offset)以及相應(yīng)Partition的High Watermark
  • 根據(jù)dataRead算出可讀消息長度(單位為字節(jié))并存入bytesReadable中。
  • 滿足下面4個條件中的1個,則立即將相應(yīng)的數(shù)據(jù)返回
    • Fetch請求不希望等待,即fetchRequest.macWait <= 0
    • Fetch請求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo為空
    • 有足夠的數(shù)據(jù)可供返回,即bytesReadable >= fetchRequest.minBytes
    • 讀取數(shù)據(jù)時發(fā)生異常
  • 若不滿足以上4個條件,FetchRequest將不會立即返回,并將該請求封裝成DelayedFetch。檢查該DeplayedFetch是否滿足,若滿足則返回請求,否則將該請求加入Watch列表
  • Leader通過以FetchResponse的形式將消息返回給Follower,FetchResponse結(jié)構(gòu)如下

    Replication工具

    Topic Tool

    $KAFKA_HOME/bin/kafka-topics.sh,該工具可用于創(chuàng)建、刪除、修改、查看某個Topic,也可用于列出所有Topic。另外,該工具還可修改某個Topic的以下配置。

    unclean.leader.election.enable delete.retention.ms segment.jitter.ms retention.ms flush.ms segment.bytes flush.messages segment.ms retention.bytes cleanup.policy segment.index.bytes min.cleanable.dirty.ratio max.message.bytes file.delete.delay.ms min.insync.replicas index.interval.bytes

    Replica Verification Tool

    $KAFKA_HOME/bin/kafka-replica-verification.sh,該工具用來驗(yàn)證所指定的一個或多個Topic下每個Partition對應(yīng)的所有Replica是否都同步。可通過topic-white-list這一參數(shù)指定所需要驗(yàn)證的所有Topic,支持正則表達(dá)式。

    Preferred Replica Leader Election Tool

    用途

    有了Replication機(jī)制后,每個Partition可能有多個備份。某個Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一個Replica即為“Preferred Replica”。創(chuàng)建一個新的Topic或者給已有Topic增加Partition時,Kafka保證Preferred Replica被均勻分布到集群中的所有Broker上。理想情況下,Preferred Replica會被選為Leader。以上兩點(diǎn)保證了所有Partition的Leader被均勻分布到了集群當(dāng)中,這一點(diǎn)非常重要,因?yàn)樗械淖x寫操作都由Leader完成,若Leader分布過于集中,會造成集群負(fù)載不均衡。但是,隨著集群的運(yùn)行,該平衡可能會因?yàn)锽roker的宕機(jī)而被打破,該工具就是用來幫助恢復(fù)Leader分配的平衡。

    事實(shí)上,每個Topic從失敗中恢復(fù)過來后,它默認(rèn)會被設(shè)置為Follower角色,除非某個Partition的Replica全部宕機(jī),而當(dāng)前Broker是該P(yáng)artition的AR中第一個恢復(fù)回來的Replica。因此,某個Partition的Leader(Preferred Replica)宕機(jī)并恢復(fù)后,它很可能不再是該P(yáng)artition的Leader,但仍然是Preferred Replica。

    原理

    1. 在ZooKeeper上創(chuàng)建/admin/preferred_replica_election節(jié)點(diǎn),并存入需要調(diào)整Preferred Replica的Partition信息。

    2. Controller一直Watch該節(jié)點(diǎn),一旦該節(jié)點(diǎn)被創(chuàng)建,Controller會收到通知,并獲取該內(nèi)容。

    3. Controller讀取Preferred Replica,如果發(fā)現(xiàn)該Replica當(dāng)前并非是Leader并且它在該P(yáng)artition的ISR中,Controller向該Replica發(fā)送LeaderAndIsrRequest,使該Replica成為Leader。如果該Replica當(dāng)前并非是Leader,且不在ISR中,Controller為了保證沒有數(shù)據(jù)丟失,并不會將其設(shè)置為Leader。

    用法

    $KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

    在包含8個Broker的Kafka集群上,創(chuàng)建1個名為topic1,replication-factor為3,Partition數(shù)為8的Topic,使用如下命令查看其Partition/Replica分布。

    $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181

    查詢結(jié)果如下圖所示,從圖中可以看到,Kafka將所有Replica均勻分布到了整個集群,并且Leader也均勻分布。

    手動停止部分Broker,topic1的Partition/Replica分布如下圖所示。從圖中可以看到,由于Broker 1/2/4都被停止,Partition 0的Leader由原來的1變?yōu)?,Partition 1的Leader由原來的2變?yōu)?,Partition 2的Leader由原來的3變?yōu)?,Partition 3的Leader由原來的4變?yōu)?。

    再重新啟動ID為1的Broker,topic1的Partition/Replica分布如下。可以看到,雖然Broker 1已經(jīng)啟動(Partition 0和Partition5的ISR中有1),但是1并不是任何一個Parititon的Leader,而Broker 5/6/7都是2個Partition的Leader,即Leader的分布不均衡——一個Broker最多是2個Partition的Leader,而最少是0個Partition的Leader。

    運(yùn)行該工具后,topic1的Partition/Replica分布如下圖所示。由圖可見,除了Partition 1和Partition 3由于Broker 2和Broker 4還未啟動,所以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同時,與運(yùn)行該工具前相比,Leader的分配更均勻——一個Broker最多是2個Parittion的Leader,最少是1個Partition的Leader。

    啟動Broker 2和Broker 4,Leader分布與上一步相比并未變化,如下圖所示。

    再次運(yùn)行該工具,所有Partition的Leader都由其Preferred Replica承擔(dān),Leader分布更均勻——每個Broker承擔(dān)1個Partition的Leader角色。

    除了手動運(yùn)行該工具使Leader分配均勻外,Kafka還提供了自動平衡Leader分配的功能,該功能可通過將auto.leader.rebalance.enable設(shè)置為true開啟,它將周期性檢查Leader分配是否平衡,若不平衡度超過一定閾值則自動由Controller嘗試將各Partition的Leader設(shè)置為其Preferred Replica。檢查周期由leader.imbalance.check.interval.seconds指定,不平衡度閾值由leader.imbalance.per.broker.percentage指定。

    Kafka Reassign Partitions Tool

    用途

    該工具的設(shè)計(jì)目標(biāo)與Preferred Replica Leader Election Tool有些類似,都旨在促進(jìn)Kafka集群的負(fù)載均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范圍內(nèi)調(diào)整其Leader,使Leader分布均勻,而該工具還可以調(diào)整Partition的AR。

    Follower需要從Leader Fetch數(shù)據(jù)以保持與Leader同步,所以僅僅保持Leader分布的平衡對整個集群的負(fù)載均衡來說是不夠的。另外,生產(chǎn)環(huán)境下,隨著負(fù)載的增大,可能需要給Kafka集群擴(kuò)容。向Kafka集群中增加Broker非常簡單方便,但是對于已有的Topic,并不會自動將其Partition遷移到新加入的Broker上,此時可用該工具達(dá)到此目的。某些場景下,實(shí)際負(fù)載可能遠(yuǎn)小于最初預(yù)期負(fù)載,此時可用該工具將分布在整個集群上的Partition重裝分配到某些機(jī)器上,然后可以停止不需要的Broker從而實(shí)現(xiàn)節(jié)約資源的目的。

    需要說明的是,該工具不僅可以調(diào)整Partition的AR位置,還可調(diào)整其AR數(shù)量,即改變該Topic的replication factor。

    原理

    該工具只負(fù)責(zé)將所需信息存入ZooKeeper中相應(yīng)節(jié)點(diǎn),然后退出,不負(fù)責(zé)相關(guān)的具體操作,所有調(diào)整都由Controller完成。

    1. 在ZooKeeper上創(chuàng)建/admin/reassign_partitions節(jié)點(diǎn),并存入目標(biāo)Partition列表及其對應(yīng)的目標(biāo)AR列表。

    2. Controller注冊在/admin/reassign_partitions上的Watch被fire,Controller獲取該列表。

    3. 對列表中的所有Partition,Controller會做如下操作:

    • 啟動RAR - AR中的Replica,即新分配的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
    • 等待新的Replica與Leader同步
    • 如果Leader不在RAR中,從RAR中選出新的Leader
    • 停止并刪除AR - RAR中的Replica,即不再需要的Replica
    • 刪除/admin/reassign_partitions節(jié)點(diǎn)

    用法

    該工具有三種使用模式

    • generate模式,給定需要重新分配的Topic,自動生成reassign plan(并不執(zhí)行)
    • execute模式,根據(jù)指定的reassign plan重新分配Partition
    • verify模式,驗(yàn)證重新分配Partition是否成功

    下面這個例子將使用該工具將Topic的所有Partition重新分配到Broker 4/5/6/7上,步驟如下:

    1. 使用generate模式,生成reassign plan

    指定需要重新分配的Topic ({"topics":[{"topic":"topic1"}],"version":1}),并存入/tmp/topics-to-move.json文件中,然后執(zhí)行如下命令

    $KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "4,5,6,7" --generate

    結(jié)果如下圖所示

    2. 使用execute模式,執(zhí)行reassign plan

    將上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,并執(zhí)行

    $KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/reassign-plan.json --execute

    此時,ZooKeeper上/admin/reassign_partitions節(jié)點(diǎn)被創(chuàng)建,且其值與/tmp/reassign-plan.json文件的內(nèi)容一致。

    3. 使用verify模式,驗(yàn)證reassign是否完成

    執(zhí)行verify命令

    $KAFKA_HOME/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file /tmp/reassign-plan.json --verify

    結(jié)果如下所示,從圖中可看出topic1的所有Partititon都根據(jù)reassign plan重新分配成功。

    接下來用Topic Tool再次驗(yàn)證。

    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

    結(jié)果如下圖所示,從圖中可看出topic1的所有Partition都被重新分配到Broker 4/5/6/7,且每個Partition的AR與reassign plan一致。

    需要說明的是,在使用execute之前,并不一定要使用generate模式自動生成reassign plan,使用generate模式只是為了方便。事實(shí)上,某些場景下,generate模式生成的reassign plan并不一定能滿足需求,此時用戶可以自己設(shè)置reassign plan。

    State Change Log Merge Tool

    用途

    該工具旨在從整個集群的Broker上收集狀態(tài)改變?nèi)罩?#xff0c;并生成一個集中的格式化的日志以幫助診斷狀態(tài)改變相關(guān)的故障。每個Broker都會將其收到的狀態(tài)改變相關(guān)的的指令存于名為state-change.log的日志文件中。某些情況下,Partition的Leader election可能會出現(xiàn)問題,此時我們需要對整個集群的狀態(tài)改變有個全局的了解從而診斷故障并解決問題。該工具將集群中相關(guān)的state-change.log日志按時間順序合并,同時支持用戶輸入時間范圍和目標(biāo)Topic及Partition作為過濾條件,最終將格式化的結(jié)果輸出。

    用法

    bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log --topic topic1 --partitions 0,1,2,3,4,5,6,7

    轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/7027555.html

    總結(jié)

    以上是生活随笔為你收集整理的Kafka设计解析(三):Kafka High Availability (下)--转的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。