Kafka Shell Lag
Kafka Shell Lag
kafka 版本:2.1.0
前言
在生產環境中,比如你正在使用group kafka-lag消費某topic內的數據。目前你沒有搭建對應的監控系統,你如何去查看對應partition 的堆積信息呢?很多人都會去使用這個命令:
# 正常使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default# 系統存在kerberos認證使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default --command-config /home/xiahu/client.propertiesclient.properties
security.protocol=PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka沒錯,今天我們就來研究一下這個命令,先從kafka-consumer-groups啟動腳本看起
1. kafka-consumer-groups.sh
# 該腳本只是簡單的去調用了另外一個腳本kafka-run-class.sh,并將參數傳遞過去 exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"2. kafka-run-class.sh
# 這個腳本內的內容太多了,其他的我也沒看,但是你所需要明白的是: # 在命令行執行: kafka-consumer-groups --bootstrap-server master:9092 --describe --group default # 最終調用:kafka.admin.ConsumerGroupCommand --bootstrap-server master:9092 --describe --group default # 所以主要看源碼:kafka.admin.ConsumerGroupCommand 這個類 if [ "x$DAEMON_MODE" = "xtrue" ]; thenecho $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi3. ConsumerGroupCommand
def main(args: Array[String]) {val opts = new ConsumerGroupCommandOptions(args)if (args.length == 0)CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)if (actions != 1)CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets")//參數判斷opts.checkArgs()//通過ConsumerGroupCommandOptions,構造ConsumerGroupService對象val consumerGroupService = new ConsumerGroupService(opts)try {if (opts.options.has(opts.listOpt))consumerGroupService.listGroups().foreach(println(_))else if (opts.options.has(opts.describeOpt))//因為此次我們探究的是kafka lag的數據,所以主要看方法consumerGroupService.describeGroup()else if (opts.options.has(opts.deleteOpt))... }4. describeGroup()
def describeGroup(): Unit = {// 從配置類中獲取配置val group = opts.options.valuesOf(opts.groupOpt).asScala.headval membersOptPresent = opts.options.has(opts.membersOpt)val stateOptPresent = opts.options.has(opts.stateOpt)val offsetsOptPresent = opts.options.has(opts.offsetsOpt)val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)if (subActions == 0 || offsetsOptPresent) {// kafka lag 信息的查詢,主要封裝與該類中val offsets = collectGroupOffsets()printOffsets(group, offsets._1, offsets._2)} else if (membersOptPresent) {val members = collectGroupMembers(opts.options.has(opts.verboseOpt))printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))} elseprintState(group, collectGroupState()) }5. collectGroupOffsets()
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {val groupId = opts.options.valueOf(opts.groupOpt)// 首先構造AdminClient 對象// 關于Admin Client,查看該博客即可了解:https://blog.csdn.net/zc0565/article/details/102791488// AdminClient 根據 groupId 獲取 ConsumerGroupDescription //ConsumerGroupDescription: A detailed description of a single consumer group in the cluster.val consumerGroup = adminClient.describeConsumerGroups(List(groupId).asJava,withTimeoutMs(new DescribeConsumerGroupsOptions())).describedGroups.get(groupId).getval state = consumerGroup.state// 根據groupId 返回一個Map對象<TopicPartition,OffsetAndMetadata>// TopicPartition: 內部封裝topic,partition// OffsetAndMetadata : 內部封裝當前topic,partition 對應的groupId 的 當前的offset 和元數據信息// 比如: // topic:kafka_lag_test partition:0 groupId:kafka-lag// 眾所周知,topic + partition + groupId 都對應著唯一的 :currentOffset val committedOffsets = getCommittedOffsets(groupId).asScala.toMapvar assignedTopicPartitions = ListBuffer[TopicPartition]()// 下面這段代碼主要過濾空的TopicPartition,并且封裝TopicPartition 對應的currentOffsetval rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>val topicPartitions = consumerSummary.assignment.topicPartitions.asScalaassignedTopicPartitions = assignedTopicPartitions ++ topicPartitionsval partitionOffsets = consumerSummary.assignment.topicPartitions.asScala.map { topicPartition =>topicPartition -> committedOffsets.get(topicPartition).map(_.offset)}.toMap// 主要看一下這個方法collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),Some(s"${consumerSummary.clientId}"))}val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {case (topicPartition, offset) =>collectConsumerAssignment(groupId,Option(consumerGroup.coordinator),Seq(topicPartition),Map(topicPartition -> Some(offset.offset)),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE))}(Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))}6. collectConsumerAssignment
//該方法返回一個PartitionAssignmentState數據 private def collectConsumerAssignment(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {// 一般情況下,topicPartitions為空if (topicPartitions.isEmpty) {Array[PartitionAssignmentState](PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None))}else// 主要看這個方法describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt) }7. describePartitions
private def describePartitions(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = {val offset = getPartitionOffset(topicPartition)PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset,getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)}//getLogEndOffsets//1. 根據bootstrap-server,groupId 實例化KafkaConsumer對象//2. 根據TopicPartition,調用KafkaConsumer的endOffsets方法,獲取topic內每一個partition的最大offset//3. 根據之前查詢到的groupId對應topic內每一個partition的currentOffset,與此次獲取到的offset,做一個計算,最終得到Lag,并將其封裝PartitionAssignmentState返回getLogEndOffsets(topicPartitions).map {case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)}.toArray }說明
在kafka內,有以下幾個概念
分別說明:
1. broker
broker可以理解為一臺安裝kafka的機器,多個broker構成kafka集群,如果只有一個broker,那么這個kafka服務是單機的2. topic
topic 翻譯過來為主題. 一個kafka集群下有多個topic3. partition
partition翻譯為分區,hive里面就有分區的概念,與hive的分區類似,一個topic 內有多個partition4. groupId
結合實際說明:
目前,我有 topic: kafka_lag ,該topic有兩個partition,目前往topic內生產10000條數據,按照默認的分區測試,partition 0,partition 1 分別有 5000 條數據.
除此之外,我有兩個group:kafka-consumer-lag-1,kafka-consumer-lag-2
首先:我使用kafka-consumer-lag-1 去消費topic內的數據,加入,partition0,1 分別消費2000 ,則offset 如下:
| kafka-consumer-lag-1 | kafka_lag | 0 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-1 | kafka_lag | 1 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 0 | 0 | 5000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 1 | 0 | 5000 | 5000 |
然后,我用 kafka-consumer-lag-2 去消費topic內的數據,partition 0,1 分區消費4000 ,則offset如下:
| kafka-consumer-lag-1 | kafka_lag | 0 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-1 | kafka_lag | 1 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 0 | 4000 | 1000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 1 | 4000 | 1000 | 5000 |
總結
由上面的數據展示可知:
topic + partition 對應唯一的endOffset
topic + partition + group 對應唯一的currentOffset
其實kafka 提供的 kafka-run-class.sh 就是使用的這個原理
由于kafka 源碼是使用scala寫的,沒了解過scala的人看起來會比較困難,我用java重新給邏輯實現了一遍,代碼如下:
package com.clb.lag;import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; import java.util.function.Consumer;/*** @author Xiahu* @create 2021/1/11*/ public class KafkaOffsetTool {private AdminClient adminClient;private static final String MISSING_COLUMN_VALUE = "-";private KafkaConsumer consumer;public KafkaOffsetTool() {Properties properties = new Properties();properties.put("bootstrap.servers", "node2:9092");//kerberos認證需要自己實現if (false) {properties.put("sasl.kerberos.service.name", "kafka");properties.put("sasl.mechanism", "GSSAPI");properties.put("security.protocol", "PLAINTEXT");}this.adminClient = AdminClient.create(properties);}public List<PartitionOffsetState> collectGroupOffsets(String group) throws Exception {List<PartitionOffsetState> result = new ArrayList<>();List<String> groupId = Arrays.asList(group);Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = adminClient.describeConsumerGroups(groupId).describedGroups();ConsumerGroupDescription consumerGroup = describedGroups.get(group).get();ConsumerGroupState state = consumerGroup.state();Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommitsOffsets(group);Collection<MemberDescription> memberDescriptions = consumerGroup.members();Set<MemberDescription> memberDescriptionSet = new HashSet<>();Iterator<MemberDescription> iterator = memberDescriptions.iterator();while (iterator.hasNext()) {MemberDescription memberDescription = iterator.next();if (null != memberDescription.assignment().topicPartitions()) {memberDescriptionSet.add(memberDescription);}}memberDescriptionSet.stream().sorted(new Comparator<MemberDescription>() {@Overridepublic int compare(MemberDescription o1, MemberDescription o2) {if (o1.assignment().topicPartitions().size() >= o2.assignment().topicPartitions().size()) {return 1;} else {return -1;}}}).forEach(new Consumer<MemberDescription>() {@Overridepublic void accept(MemberDescription memberDescription) {Set<TopicPartition> topicPartitions = memberDescription.assignment().topicPartitions();for (TopicPartition tp : topicPartitions) {long offset = committedOffsets.get(tp).offset();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(memberDescription.host());partitionOffsetState.setClientId(memberDescription.clientId());partitionOffsetState.setConsumerId(memberDescription.consumerId());partitionOffsetState.setPartition(tp.partition());partitionOffsetState.setTopic(tp.topic());partitionOffsetState.setOffset(offset);result.add(partitionOffsetState);}}});//封裝committedOffsetsIterator<Map.Entry<TopicPartition, OffsetAndMetadata>> entryIterator = committedOffsets.entrySet().iterator();while (entryIterator.hasNext()) {Map.Entry<TopicPartition, OffsetAndMetadata> entry = entryIterator.next();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(MISSING_COLUMN_VALUE);partitionOffsetState.setClientId(MISSING_COLUMN_VALUE);partitionOffsetState.setConsumerId(MISSING_COLUMN_VALUE);partitionOffsetState.setPartition(entry.getKey().partition());partitionOffsetState.setTopic(entry.getKey().topic());partitionOffsetState.setOffset(entry.getValue().offset());result.add(partitionOffsetState);}return result;}private Map<TopicPartition, OffsetAndMetadata> getCommitsOffsets(String groupId) throws Exception {Map<TopicPartition, OffsetAndMetadata> result = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();return result;}public List<PartitionOffsetState> getLag(List<PartitionOffsetState> partitionOffsetStateList,String groupId) {getConsumer(new Properties(), groupId);List<TopicPartition> topicPartitionList = new ArrayList<>();for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {topicPartitionList.add(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()));}Map<TopicPartition, Long> map = consumer.endOffsets(topicPartitionList);for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {if (entry.getKey().topic().equals(partitionOffset.getTopic()) && entry.getKey().partition() == partitionOffset.getPartition()) {partitionOffset.setLag(entry.getValue() - partitionOffset.getOffset());partitionOffset.setLogEndOffset(entry.getValue());}}}return partitionOffsetStateList;}private KafkaConsumer getConsumer(Properties prop, String groupId) {if (consumer == null) {createConsumer(prop, groupId);}return consumer;}public void createConsumer(Properties prop, String groupId) {//kerberos認證需要自己實現if (false) {System.setProperty("java.security.krb5.conf", prop.getProperty(NuwaConstant.KERBEROS_KRB5));System.setProperty("java.security.auth.login.config", prop.getProperty(NuwaConstant.KERBEROS_LOGIN_CONFIG));prop.put(NuwaConstant.KAFKA_SECURITY_PROTOCOL, prop.getProperty(NuwaConstant.KAFKA_SECURITY_PROTOCOL));prop.put(NuwaConstant.KAFKA_SASL_MECHANISM, prop.getProperty(NuwaConstant.KAFKA_SASL_MECHANISM));prop.put(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME, prop.getProperty(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME));}String deserializer = StringDeserializer.class.getName();String broker = "node1:9092";prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);consumer = new KafkaConsumer(prop);}public static void main(String[] args) throws Exception {KafkaOffsetTool kafkaOffsetTool = new KafkaOffsetTool();List<PartitionOffsetState> partitionOffsetStates = kafkaOffsetTool.collectGroupOffsets("kafka-lag");partitionOffsetStates = kafkaOffsetTool.getLag(partitionOffsetStates,"kafka-lag");System.out.println(partitionOffsetStates);} }PartitionOffsetState
package com.clb.lag;import lombok.Data;/*** @author Xiahu* @create 2021/1/11*/ @Data public class PartitionOffsetState {private String group;private String coordinator;private String topic;private int partition;private Long offset;private Long lag;private String consumerId;private String host;private String clientId;private Long logEndOffset; }總結
以上是生活随笔為你收集整理的Kafka Shell Lag的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android7.1root工具,And
- 下一篇: IT人必看!2018年上半年云栖大会30