kafka rebalance 部分分区没有owner
?轉(zhuǎn)發(fā)請注明原創(chuàng)地址http://www.cnblogs.com/dongxiao-yang/p/6234673.html ? ??
? ? ? 最近業(yè)務(wù)同學(xué)反饋kafka上線的時候某個topic的部分分區(qū)一直沒有owner注冊上,監(jiān)控界面形式如圖,其中分區(qū)5和7無法被消費者注冊到,重啟客戶端程序rebalance依舊是這兩個分區(qū)沒有被消費。
? ? ?由于最近業(yè)務(wù)方機房大遷移,第一反應(yīng)是網(wǎng)絡(luò)連通性,但是消費端程序挨個測試網(wǎng)絡(luò)沒有問題,而且即使通過增加或者減少consumer數(shù)量,甚至消費端只開一個客戶端,rebalance結(jié)束后依然會有分區(qū)沒有owner,而且隨著消費端個數(shù)的變化,無owner的分區(qū)號也發(fā)生了變化,整個rebalance過程客戶端程序沒有任何錯誤日志。
? ? 這種情況還得去過客戶端日志,在只起了兩個客戶端的時候發(fā)現(xiàn)有這么一段:
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], Consumer xxx rebalancing the following partitions: ArrayBuffer(5, 7, 3, 8, 1, 4, 6, 2, 0, 9) for topic onlineAdDemographicPredict with consumers: List(aaa-0, yyy-0, xxx-0)
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 2
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 0
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 9
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 0 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 9 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 2 for topic onlineAdDemographicPredict
ArrayBuffer里分區(qū)10個分區(qū)都全了說明客戶端讀取所有Partirtion個數(shù)是沒有問題的,出問題的是with consumers: List()這個信息,此時業(yè)務(wù)方只起了xxx和yyy兩個客戶端,
但是Consumer確拿到了三個client-id,然后經(jīng)過計算自己正好需要注冊三個分區(qū)2,0,9,剩下的分區(qū)就沒人認(rèn)領(lǐng)了。
查找日志對應(yīng)kafka源碼如下
?
class RangeAssignor() extends PartitionAssignor with Logging {def assign(ctx: AssignmentContext) = {val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { val curConsumers = ctx.consumersForTopic(topic)val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)val nPartsPerConsumer = curPartitions.size / curConsumers.sizeval nConsumersWithExtraPart = curPartitions.size % curConsumers.sizeinfo("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +" for topic " + topic + " with consumers: " + curConsumers)for (consumerThreadId <- consumerThreadIdSet) {val myConsumerPosition = curConsumers.indexOf(consumerThreadId)assert(myConsumerPosition >= 0)val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)/*** Range-partition the sorted partitions to consumers for better locality.* The first few consumers pick up an extra partition, if any.*/if (nParts <= 0)warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)else {for (i <- startPart until startPart + nParts) {val partition = curPartitions(i)info(consumerThreadId + " attempting to claim partition " + partition)// record the partition ownership decisionpartitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)}}}}partitionOwnershipDecision} }?
object PartitionAssignor {def createInstance(assignmentStrategy: String) = assignmentStrategy match {case "roundrobin" => new RoundRobinAssignor()case _ => new RangeAssignor()} }class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)myTopicCount.getConsumerThreadIdsPerTopic}val partitionsForTopic: collection.Map[String, Seq[Int]] =ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq) val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted }?
class ZKGroupDirs(val group: String) {def consumerDir = ConsumersPathdef consumerGroupDir = consumerDir + "/" + groupdef consumerRegistryDir = consumerGroupDir + "/ids"def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"def consumerGroupOwnersDir = consumerGroupDir + "/owners" }def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {val dirs = new ZKGroupDirs(group)val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]for (consumer <- consumers) {val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {for (consumerThreadId <- consumerThreadIdSet)consumersPerTopicMap.get(topic) match {case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))}}}for ( (topic, consumerList) <- consumersPerTopicMap )consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))consumersPerTopicMap}?
def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = { val dirs = new ZKGroupDirs(group)val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1var subscriptionPattern: String = nullvar topMap: Map[String, Int] = nulltry {Json.parseFull(topicCountString) match {case Some(m) =>val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]consumerRegistrationMap.get("pattern") match {case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}consumerRegistrationMap.get("subscription") match {case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)}} catch {case e: Throwable =>
? ? ? 通過上面著色的代碼一路跟下來,可以看出來Consumer獲取group所有客戶端數(shù)量邏輯是讀取zk上 /kafkachroot/consumers/{groupid}/ids路徑下
所有存在的consumerid,然后讀取這些consumerid對應(yīng)的topic信息,最終返回一個[topic, List[ConsumerThreadId]] 的二維數(shù)組。
? ? ??于是跑到zk上看節(jié)點結(jié)構(gòu),發(fā)現(xiàn)在出問題的group/ids 路徑下果然存在aaa這個臨時節(jié)點,通知應(yīng)用方發(fā)現(xiàn)原來有個很老的程序之前也用同樣的groupid消費過這個topic,但是現(xiàn)在業(yè)務(wù)程序很久沒人管處在一個半假死的狀態(tài),所以這個臨時節(jié)點一直不過期,導(dǎo)致后來使用同樣group消費同樣的每次都會感知到一個多余的消費段存在,所以每次都有部分分區(qū)無法被消費。
?
附:
?1 ?Consumer Rebalance的算法
2 ?本文討論的版本建立在kafka 0.8.2-beta版本前提上,新出的版本目前沒有研究,可能情況不符。
?
轉(zhuǎn)載于:https://www.cnblogs.com/dongxiao-yang/p/6234673.html
總結(jié)
以上是生活随笔為你收集整理的kafka rebalance 部分分区没有owner的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DP专题训练之HDU 1087 Supe
- 下一篇: Codeforces Good Bye