日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

kafka 重新分配节点_Kafka控制器-分区重分配

發布時間:2023/11/30 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 重新分配节点_Kafka控制器-分区重分配 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

分區重分配指的是將分區的副本重新分配到不同的代理節點上。如果ZK節點中分區的副本的新副本集合和當前分區副本集合相同,這個分區就不需要重新分配了。

分區重分配是通過監聽ZK的 /admin/reassign_partitions 節點觸發的,Kafka也提供了相應的腳本工具進行分區重分配,使用方法如下:

./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute

其中?XXX.json是分區重分配的JSON文件,格式如下:

{

"version":1,

"partitions":[

{"topic":"product", "partition":0, "replicas":[4,5,6]},

{"topic":"product", "partition":1, "replicas":[1,2,3]},

{"topic":"product", "partition":4, "replicas":[4,5,6]}

]}

假設主題 product 的分區數只有 {P0, P1},當執行上面的腳本時。此時會發現P4的分區對于 product 主題根本就不存在,此時就會忽略掉P4的副本遷移。對于P0和P1的副本重分配,可以簡單的理解為下面的過程。

分區重分配命令接收

當使用腳本提交分區重分配時,接收命令的是?kafka.admin.ReassignPartitionsCommand#executeAssignment():

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {

val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)

val adminZkClient = new AdminZkClient(zkClient)

val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)

// 如果當前有正在執行中的分區重分配,則終止當前提交

if (zkClient.reassignPartitionsInProgress()) {

println("There is an existing assignment running.")

reassignPartitionsCommand.maybeLimit(throttle)

} else {

printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))

if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)

println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))

/** 更新重分配數據至ZK */

if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {

println("Successfully started reassignment of partitions.")

} else

println("Failed to reassign partitions %s".format(partitionAssignment))

}

}

提交命令時,如果分區重分配還在進行,那么本次無法提交,意味著當前只能有一個執行的分區重分配。

重分配監聽執行整體流程

當?/admin/reassign_partitions 被修改后,監聽器會觸發?PartitionReassignment 事件,其代碼執行鏈如下所示:

下面我們看一下代碼執行流程的展開。

分區重分配流程

控制器事件模型中 PartitionReassignment 事件,會觸發調用processPartitionReassignment()。此時會注冊監聽ZK節點 /admin/reassign_partitions 變化,當重分配策略更新到ZK上時,該監聽器就會被觸發,然后執行分區重分配邏輯。

case PartitionReassignment =>

processPartitionReassignment()

private def processPartitionReassignment(): Unit = {

if (!isActive) return

/** 注冊 /admin/reassign_partitions 節點變化監聽 */

if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {

val partitionReassignment = zkClient.getPartitionReassignment

partitionReassignment.foreach { case (tp, newReplicas) =>

val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)

/** 記錄正在遷移的分區副本 */

controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))

}

maybeTriggerPartitionReassignment(partitionReassignment.keySet)

}

}

前置判斷

對于是否需要分區重分配,在?maybeTriggerPartitionReassignment() 中做了一些判斷取舍,其代碼實現如下:

/**

* 如有下面情況發生,則不進行分區重分配:

* 1. Topic設置了刪除標識;

* 2. 新副本與已經存在的副本相同;

* 3. 分區所有新分配的副本都不存活;

* 上面的情況發生時, 會輸出一條日志, 并從ZK移除該分區副本的重分配記錄

*/

private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {

val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]

topicPartitions.foreach { tp =>

if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {

/** 如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除) */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {

throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +

s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")

}

val newReplicas = reassignedPartitionContext.newReplicas

val topic = tp.topic

val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)

if (assignedReplicas.nonEmpty) {

if (assignedReplicas == newReplicas) {

/** 新副本與已經存在的副本相同,不進行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

try {

/** 注冊ISR變化監聽 */

reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)

/** 設置正在遷移的副本不能刪除 */

topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress")

/** 執行重分配 */

onPartitionReassignment(tp, reassignedPartitionContext)

} catch {

}

}

} else {

/** 分區副本都不存活,不進行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

}

}

}

removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)

}

對于前置校驗的流程如下:

1、如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除);

2、如果分區副本都不存活,不進行重分配;

3、如果新副本與已經存在的副本相同,不進行重分配;

4、注冊ISR變化監聽;

5、設置將要遷移的副本為不能刪除;

6、調用 onPartitionReassignment() 執行重分配。

執行分區重分配

分區重分配的執行是在 onPartitionReassignment() 中實現的,下面說明一下官方給出的幾個技術名詞:

RAR:新分配的副本列表;

OAR:原先的分區副本列表;

AR:當前副本列表,隨著分配過程不斷變化;

RAR-OAR:RAR與OAR的差集,即需要創建、數據遷移的新副本;

OAR-RAR:OAR與RAR的差集,即遷移后需要下線的副本。

重分配的具體代碼實現如下所示:

/**

* 當需要進行分區重分配時, 會在[/admin/reassign_partitions]下創建一個節點來觸發操作

* RAR: 重新分配的副本, OAR: 分區原副本列表, AR: 當前的分配的副本

*/

private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {

val reassignedReplicas = reassignedPartitionContext.newReplicas

if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {

/** 新分配的并沒有全在ISR中 */

/** RAR-OAR */

val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet

/** RAR+OAR */

val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet

/** 1.將AR更新為OAR + RAR */

updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)

/** 2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求 */

updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq)

/** 3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據)*/

startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)

} else {

/** 4.等待所有的RAR都在ISR中 */

/** OAR - RAR */

val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet

/** 5.將副本狀態設置為OnlineReplica */

reassignedReplicas.foreach { replica =>

replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)

}

/** 6.將上下文中的AR設置為RAR */

/** 7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果 */

moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)

/** 8-9.將舊的副本下線 */

stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)

/** 10.將ZK中的AR設置為RAR */

updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)

/** 11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文 */

removePartitionsFromReassignedPartitions(Set(topicPartition))

/** 12.發送metadata更新請求給所有存活的broker */

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))

/** 如果topic標記了刪除, 此時喚醒刪除線程*/

topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))

}

}

上面代碼執行的過程描述如下:

1.將AR更新為OAR+RAR;

2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求;

3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據);

4.等待所有的RAR都在ISR中;

5.將副本狀態設置為OnlineReplica;

6.將上下文中的AR設置為RAR;

7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果;

8-9.將舊的副本下線;

10.將ZK中的AR設置為RAR;

11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文;

12.發送metadata更新請求給所有存活的broker;

重分配簡單描述

通過代碼層面看起來不是很好理解,下面簡單描述一下執行過程:

1、創建新的副本,開始同步數據,等所有新副本都加入了ISR后,在RAR中進行Leader選舉;

2、下線不需要的副本(OAR-RAR),下線完成后將AR(即RAR)信息更新到ZK中;

3、發送LeaderAndIsr給存活broker。

假如初始情況下,分區副本在 {1,2,3} 三個 Broker 上;重分配之后在{4,5,6}上,此時變化過程如下圖所示:

參考:《Kafka技術內幕》、《Apache Kafka 源碼剖析》、Kafka源碼

總結

以上是生活随笔為你收集整理的kafka 重新分配节点_Kafka控制器-分区重分配的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。