Kafka解析之topic创建(2)
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-2/
前言
上一篇Kafka解析之topic創建(1)中的介紹了怎樣創建一個topic以及對應的replica-assignment參數的一些使用細節,本文繼續來講述一下自動分配方案的具體算法實現,包括未指定機架的分配策略和指定機架的分配策略。
承接
如果在創建topic的時候并沒有指定replica-assignment參數,那么就需要采用kafka默認的分區副本分配策略來創建topic。主要的是以下這6行代碼:
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabledelse RackAwareMode.Enforced AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)第一行的作用就是驗證一下執行kafka-topics.sh時參數列表中是否包含有partitions和replication-factor這兩個參數,如果沒有包含則報出:Missing required argument "[partitions]"或者Missing required argument “[replication-factor]”,并給出參數的提示信息列表。
第2-5行的作用是獲取paritions、replication-factor參數所對應的值以及驗證是否包含disable-rack-aware這個參數。從0.10.x版本開始,kafka可以支持指定broker的機架信息,如果指定了機架信息則在副本分配時會盡可能地讓分區的副本分不到不同的機架上。指定機架信息是通過kafka的配置文件config/server.properties中的broker.rack參數來配置的,比如配置當前broker所在的機架為“RACK1”:
broker.rack=RACK1最后一行通過AdminUtils.createTopic方法來繼續創建,至此代碼流程又進入到下一個無底洞,不過暫時不用擔心,下面是這個方法的詳細內容,看上去只有幾行而已:
def createTopic(zkUtils: ZkUtils,topic: String,partitions: Int,replicationFactor: Int,topicConfig: Properties = new Properties,rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig) }總共只有三行,最后一行還是見過的,在使用replica-assignment參數解析驗證之后調用的,主要用來在/brokers/topics路徑下寫入相應的節點。回過頭來看第一句,它是用來獲取集群中每個broker的brokerId和機架信息(Option[String]類型)信息的列表,為下面的 AdminUtils.assignReplicasToBrokers()方法做分區副本分配前的準備工作。AdminUtils.assignReplicasToBrokers()首先是做一些簡單的驗證工作:分區個數partitions不能小于等于0、副本個數replicationFactor不能小于等于0以及副本個數replicationFactor不能大于broker的節點個數,其后的步驟就是方法最重要的兩大核心:assignReplicasToBrokersRackUnaware和assignReplicasToBrokersRackAware,看這個名字也應該猜出個一二來,前者用來針對不指定機架信息的情況,而后者是用來針對指定機架信息的情況,后者更加復雜一點。
未指定機架的分配策略
為了能夠循序漸進的說明問題,這里先來講解assignReplicasToBrokersRackUnaware,對應的代碼如下:
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,replicationFactor: Int,brokerList: Seq[Int],fixedStartIndex: Int,startPartitionId: Int): Map[Int, Seq[Int]] = {val ret = mutable.Map[Int, Seq[Int]]()val brokerArray = brokerList.toArrayval startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)var currentPartitionId = math.max(0, startPartitionId)var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)for (_ <- 0 until nPartitions) {if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))nextReplicaShift += 1val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.lengthval replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))for (j <- 0 until replicationFactor - 1)replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))ret.put(currentPartitionId, replicaBuffer)currentPartitionId += 1}ret }主構造函數參數列表中的fixedStartIndex和startPartitionId的值是從上游AdminUtils.assignReplicasToBrokers()方法調用傳下來,都是-1,分別表示第一個副本分配的位置和起始分區編號。assignReplicasToBrokers這個方法的核心是遍歷每個分區partition然后從brokerArray(brokerId的列表)中選取replicationFactor個brokerId分配給這個partition。
方法首先創建一個可變的Map用來存放本方法將要返回的結果,即分區partition和分配副本的映射關系。由于fixedStartIndex為-1,所以startIndex是一個隨機數,用來計算一個起始分配的brokerId,同時由于startPartitionId為-1,所以currentPartitionId的值為0,可見默認創建topic時總是從編號為0的分區依次輪詢進行分配。nextReplicaShift表示下一次副本分配相對于前一次分配的位移量,這個字面上理解有點繞,不如舉個例子:假設集群中有3個broker節點,即代碼中的brokerArray,創建某topic有3個副本和6個分區,那么首先從partitionId(partition的編號)為0的分區開始進行分配,假設第一次計算(由rand.nextInt(brokerArray.length)隨機)到nextReplicaShift為1,第一次隨機到的startIndex為2,那么partitionId為0的第一個副本的位置(這里指的是brokerArray的數組下標)firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length = (0+2)%3 = 2,第二個副本的位置為replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?,這里引入了一個新的方法replicaIndex,不過這個方法很簡單,具體如下:
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)(firstReplicaIndex + shift) % nBrokers }繼續計算 replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-1)))%3=0。繼續計算下一個副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以partitionId為0的副本分配位置列表為[2,0,1],如果brokerArray正好是從0開始編號,也正好是順序不間斷的,即brokerArray為[0,1,2]的話,那么當前partitionId為0的副本分配策略為[2,0,1]。如果brokerId不是從零開始,也不是順序的(有可能之前集群的其中broker幾個下線了),最終的brokerArray為[2,5,8],那么partitionId為0的分區的副本分配策略為[8,2,5]。為了便于說明問題,可以簡單的假設brokerArray就是[0,1,2]。
同樣計算下一個分區,即partitionId為1的副本分配策略。此時nextReplicaShift還是為2,沒有滿足自增的條件。這個分區的firstReplicaIndex = (1+2)%3=0。第二個副本的位置replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三個副本的位置replicaIndex(0,2,1,3) = 2,最終partitionId為2的分區分配策略為[0,1,2]。
以此類推,更多的分配細節可以參考下面的demo,topic-test4的分區分配策略和上面陳述的一致:
[root@node3 kafka_2.12-1.0.0]# bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test4 --replication-factor 3 --partitions 6 Created topic "topic-test4". [root@node3 kafka_2.12-1.0.0]# bin/kafka-topics.sh --describe --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test4 Topic:topic-test4 PartitionCount:6 ReplicationFactor:3 Configs:Topic: topic-test4 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1Topic: topic-test4 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: topic-test4 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0Topic: topic-test4 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Topic: topic-test4 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1Topic: topic-test4 Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2我們無法預先獲知startIndex和nextReplicaShift的值,因為都是隨機產生的。startIndex和nextReplicaShift的值可以通過最終的分區分配方案來反推,比如上面的topic-test4,第一個分區(即partitionId=0的分區)的第一個副本為2,那么可由2 = (0+startIndex)%3推斷出startIndex為2。之所以startIndex隨機是因為這樣可以在多個topic的情況下盡可能的均勻分布分區副本,如果這里固定為一個特定值,那么每次的第一個副本都是在這個broker上,進而就會導致少數幾個broker所分配到的分區副本過多而其余broker分配到的過少,最終導致負載不均衡。尤其是某些topic的副本數和分區數都比較少,甚至都為1的情況下,所有的副本都落到了那個指定的broker上。與此同時,在分配時位移量nextReplicaShift也可以更好的使得分區副本分配的更加均勻。
指定機架的分配策略
下面我們再來看一下指定機架信息的副本分配情況,即方法assignReplicasToBrokersRackAware,注意assignReplicasToBrokersRackUnaware的執行前提是所有的broker都沒有配置機架信息,而assignReplicasToBrokersRackAware的執行前提是所有的broker都配置了機架信息,如果出現部分broker配置了機架信息而另一部分沒有配置的話,則會拋出AdminOperationException的異常,如果還想要順利創建topic的話,此時需加上“–disable-rack-aware”,詳細demo如下:
[root@node2 kafka_2.12-1.0.0]# bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test5 --replication-factor 2 --partitions 4 Error while executing topic command : Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information. [2018-02-06 00:19:07,213] ERROR kafka.admin.AdminOperationException: Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:443)at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:461)at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:110)at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$) [root@node2 kafka_2.12-1.0.0]# bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test5 --replication-factor 2 --partitions 4 --disable-rack-aware Created topic "topic-test5". [root@node2 kafka_2.12-1.0.0]#assignReplicasToBrokersRackAware方法的詳細內容如下,這段代碼內容偏多,僅供參考,看得辣眼睛的小伙伴可以習慣性的忽略,后面會做詳細的文字介紹。
private def assignReplicasToBrokersRackAware(nPartitions: Int,replicationFactor: Int,brokerMetadatas: Seq[BrokerMetadata],fixedStartIndex: Int,startPartitionId: Int): Map[Int, Seq[Int]] = {val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>id -> rack}.toMapval numRacks = brokerRackMap.values.toSet.size//統計機架個數val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)//基于機架信息生成一個Broker列表,不同機架上的Broker交替出現val numBrokers = arrangedBrokerList.sizeval ret = mutable.Map[Int, Seq[Int]]()val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)var currentPartitionId = math.max(0, startPartitionId)var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)for (_ <- 0 until nPartitions) {if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))nextReplicaShift += 1val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.sizeval leader = arrangedBrokerList(firstReplicaIndex)val replicaBuffer = mutable.ArrayBuffer(leader)//每個分區的副本分配列表val racksWithReplicas = mutable.Set(brokerRackMap(leader))//每個分區中所分配的機架的列表集val brokersWithReplicas = mutable.Set(leader)//每個分區所分配的brokerId的列表集,和racksWithReplicas一起用來做一層篩選處理var k = 0for (_ <- 0 until replicationFactor - 1) {var done = falsewhile (!done) {val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))val rack = brokerRackMap(broker)if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {replicaBuffer += brokerracksWithReplicas += rackbrokersWithReplicas += brokerdone = true}k += 1}}ret.put(currentPartitionId, replicaBuffer)currentPartitionId += 1}ret }第一步獲得brokerId和rack信息的映射關系列表brokerRackMap ,之后調用getRackAlternatedBrokerList()方法對brokerRackMap做進一步的處理生成一個brokerId的列表,這么解釋比較拗口,不如舉個demo。假設目前有3個機架rack1、rack2和rack3,以及9個broker,分別對應關系如下:
rack1: 0, 1, 2 rack2: 3, 4, 5 rack3: 6, 7, 8那么經過getRackAlternatedBrokerList()方法處理過后就變成了[0, 3, 6, 1, 4, 7, 2, 5, 8]這樣一個列表,顯而易見的這是輪詢各個機架上的broker而產生的,之后你可以簡單的將這個列表看成是brokerId的列表,對應assignReplicasToBrokersRackUnaware()方法中的brokerArray,但是其中包含了簡單的機架分配信息。之后的步驟也和未指定機架信息的算法類似,同樣包含startIndex、currentPartiionId, nextReplicaShift的概念,循環為每一個分區分配副本。分配副本時處理第一個副本之外,其余的也調用replicaIndex方法來獲得一個broker,但是這里和assignReplicasToBrokersRackUnaware()不同的是,這里不是簡單的將這個broker添加到當前分區的副本列表之中,還要經過一層的篩選,滿足以下任意一個條件的broker不能被添加到當前分區的副本列表之中:
無論是帶機架信息的策略還是不帶機架信息的策略,上層調用方法AdminUtils.assignReplicasToBrokers()最后都是獲得一個[Int, Seq[Int]]類型的副本分配列表,其最后作為kafka zookeeper節點/brokers/topics/{topic-name}節點數據。至此kafka的topic創建就講解完了,有些同學會感到很疑問,全文通篇(包括上一篇)都是在講述如何分配副本,最后得到的也不過是個分配的方案,并沒有真正創建這些副本的環節,其實這個觀點沒有任何問題,對于通過kafka提供的kafka-topics.sh腳本創建topic的方法來說,它只是提供一個副本的分配方案,并在kafka zookeeper中創建相應的節點而已。kafka broker的服務會注冊監聽/brokers/topics/目錄下是否有節點變化,如果有新節點創建就會監聽到,然后根據其節點中的數據(即topic的分區副本分配方案)來創建對應的副本,具體的細節筆者會在后面的副本管理中有詳細介紹。
既然整個kafka-topics.sh腳本的作用就只是創建一個zookeeper的節點,并且寫上一些分配的方案數據而已,那么我們直接創建一個zookeeper節點來創建一個topic可不可以呢?答案是可以的。在開啟的kafka broker的情況下(如果未開啟kafka服務的情況下創建zk節點的話,待kafka啟動之后是不會再創建實際副本的,只有watch到當前通知才可以),通過zkCli創建一個與topic-test1副本分配方案相同的topic-test6,詳細如下:
[zk: localhost:2181(CONNECTED) 8] create /kafka100/brokers/topics/topic-test6 {"version":1,"partitions":{"2":[0,1],"1":[1,0],"3":[1,0],"0":[0,1]}} Created /kafka100/brokers/topics/topic-test6這里再來進一步check下topic-test1和topic-test6是否完全相同:
[root@node1 kafka_2.12-1.0.0]# bin/kafka-topics.sh --describe --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test1,topic-test6 Topic:topic-test1 PartitionCount:4 ReplicationFactor:2 Configs:Topic: topic-test1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: topic-test1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test1 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic:topic-test6 PartitionCount:4 ReplicationFactor:2 Configs:Topic: topic-test6 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test6 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: topic-test6 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test6 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0答案顯而易見。前面的篇幅也提到了通過kafka-topics.sh腳本的創建方式會對副本的分配有大堆的合格性的校驗,但是直接創建zk節點的方式沒有這些校驗,比如創建一個topic-test7,這個topic節點的數據為:{“version”:1,“partitions”:{“2”:[0,1],“1”:[1],“3”:[1,0],“0”:[0,1]}},可以看出paritionId為1的分區只有一個副本,我們來檢測下是否創建成功:
[root@node1 kafka_2.12-1.0.0]# bin/kafka-topics.sh --describe --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test7 Topic:topic-test7 PartitionCount:4 ReplicationFactor:2 Configs:Topic: topic-test7 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test7 Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: topic-test7 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: topic-test7 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0結果也是顯而易見的,不過如果沒有特殊需求不宜用這種方式,這種方式把控不好就像斷了線的風箏一樣難以把控。如果又不想用auto.create.topics.enable=true的這種方式,也不想用kafka-topics.sh的這種方式,就像用類似java的編程語言在代碼中內嵌創建topic,以便更好的與公司內部的系統結合怎么辦?
我們上篇文章中知道kafka-topics.sh內部就是調用了一下kafka.admin.TopicCommand而已,那么我們也調用一下這個可不可以?Of course,下面舉一個簡單的demo,創建一個副本數為2,分區數為4的topic-test8:
public class CreateTopicDemo {public static void main(String[] args) {//demo: 創建一個副本數為2,分區數為4的主題:topic-test8String[] options = new String[]{"--create","--zookeeper","192.168.0.2:2181/kafka100","--replication-factor", "2","--partitions", "4","--topic", "topic-test8"};kafka.admin.TopicCommand.main(options);} }可以看到這種方式和kafka-topics.sh的方式如出一轍,可以用這種方式繼承到自動化系統中以創建topic,當然對于topic的刪、改、查等都可以通過這種方法來實現,具體的篇幅限制就不一一細表了。
有關kafka的topic的創建細節其實并沒有介紹完全,比如create.topic.policy.class.name參數的具體含義與用法,這個會在后面介紹KafkaApis的時候再做具體的介紹,所以為了不迷路,為了漲知識不如關注一波公眾號,然后watch。。。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-2/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解析之topic创建(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka解惑之Old Producer
- 下一篇: 提升你的代码——Lambda!