日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Shell Lag

發布時間:2023/12/15 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Shell Lag 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka Shell Lag

kafka 版本:2.1.0

前言

在生產環境中,比如你正在使用group kafka-lag消費某topic內的數據。目前你沒有搭建對應的監控系統,你如何去查看對應partition 的堆積信息呢?很多人都會去使用這個命令:

# 正常使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default# 系統存在kerberos認證使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default --command-config /home/xiahu/client.properties

client.properties

security.protocol=PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka

沒錯,今天我們就來研究一下這個命令,先從kafka-consumer-groups啟動腳本看起

1. kafka-consumer-groups.sh

# 該腳本只是簡單的去調用了另外一個腳本kafka-run-class.sh,并將參數傳遞過去 exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

2. kafka-run-class.sh

# 這個腳本內的內容太多了,其他的我也沒看,但是你所需要明白的是: # 在命令行執行: kafka-consumer-groups --bootstrap-server master:9092 --describe --group default # 最終調用:kafka.admin.ConsumerGroupCommand --bootstrap-server master:9092 --describe --group default # 所以主要看源碼:kafka.admin.ConsumerGroupCommand 這個類 if [ "x$DAEMON_MODE" = "xtrue" ]; thenecho $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi

3. ConsumerGroupCommand

def main(args: Array[String]) {val opts = new ConsumerGroupCommandOptions(args)if (args.length == 0)CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)if (actions != 1)CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets")//參數判斷opts.checkArgs()//通過ConsumerGroupCommandOptions,構造ConsumerGroupService對象val consumerGroupService = new ConsumerGroupService(opts)try {if (opts.options.has(opts.listOpt))consumerGroupService.listGroups().foreach(println(_))else if (opts.options.has(opts.describeOpt))//因為此次我們探究的是kafka lag的數據,所以主要看方法consumerGroupService.describeGroup()else if (opts.options.has(opts.deleteOpt))... }

4. describeGroup()

def describeGroup(): Unit = {// 從配置類中獲取配置val group = opts.options.valuesOf(opts.groupOpt).asScala.headval membersOptPresent = opts.options.has(opts.membersOpt)val stateOptPresent = opts.options.has(opts.stateOpt)val offsetsOptPresent = opts.options.has(opts.offsetsOpt)val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)if (subActions == 0 || offsetsOptPresent) {// kafka lag 信息的查詢,主要封裝與該類中val offsets = collectGroupOffsets()printOffsets(group, offsets._1, offsets._2)} else if (membersOptPresent) {val members = collectGroupMembers(opts.options.has(opts.verboseOpt))printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))} elseprintState(group, collectGroupState()) }

5. collectGroupOffsets()

def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {val groupId = opts.options.valueOf(opts.groupOpt)// 首先構造AdminClient 對象// 關于Admin Client,查看該博客即可了解:https://blog.csdn.net/zc0565/article/details/102791488// AdminClient 根據 groupId 獲取 ConsumerGroupDescription //ConsumerGroupDescription: A detailed description of a single consumer group in the cluster.val consumerGroup = adminClient.describeConsumerGroups(List(groupId).asJava,withTimeoutMs(new DescribeConsumerGroupsOptions())).describedGroups.get(groupId).getval state = consumerGroup.state// 根據groupId 返回一個Map對象<TopicPartition,OffsetAndMetadata>// TopicPartition: 內部封裝topic,partition// OffsetAndMetadata : 內部封裝當前topic,partition 對應的groupId 的 當前的offset 和元數據信息// 比如: // topic:kafka_lag_test partition:0 groupId:kafka-lag// 眾所周知,topic + partition + groupId 都對應著唯一的 :currentOffset val committedOffsets = getCommittedOffsets(groupId).asScala.toMapvar assignedTopicPartitions = ListBuffer[TopicPartition]()// 下面這段代碼主要過濾空的TopicPartition,并且封裝TopicPartition 對應的currentOffsetval rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>val topicPartitions = consumerSummary.assignment.topicPartitions.asScalaassignedTopicPartitions = assignedTopicPartitions ++ topicPartitionsval partitionOffsets = consumerSummary.assignment.topicPartitions.asScala.map { topicPartition =>topicPartition -> committedOffsets.get(topicPartition).map(_.offset)}.toMap// 主要看一下這個方法collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),Some(s"${consumerSummary.clientId}"))}val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {case (topicPartition, offset) =>collectConsumerAssignment(groupId,Option(consumerGroup.coordinator),Seq(topicPartition),Map(topicPartition -> Some(offset.offset)),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE))}(Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))}

6. collectConsumerAssignment

//該方法返回一個PartitionAssignmentState數據 private def collectConsumerAssignment(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {// 一般情況下,topicPartitions為空if (topicPartitions.isEmpty) {Array[PartitionAssignmentState](PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None))}else// 主要看這個方法describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt) }

7. describePartitions

private def describePartitions(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = {val offset = getPartitionOffset(topicPartition)PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset,getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)}//getLogEndOffsets//1. 根據bootstrap-server,groupId 實例化KafkaConsumer對象//2. 根據TopicPartition,調用KafkaConsumer的endOffsets方法,獲取topic內每一個partition的最大offset//3. 根據之前查詢到的groupId對應topic內每一個partition的currentOffset,與此次獲取到的offset,做一個計算,最終得到Lag,并將其封裝PartitionAssignmentState返回getLogEndOffsets(topicPartitions).map {case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)}.toArray }

說明

在kafka內,有以下幾個概念

  • broker
  • topic
  • partition
  • group
  • offset
  • 分別說明:

    1. broker

    broker可以理解為一臺安裝kafka的機器,多個broker構成kafka集群,如果只有一個broker,那么這個kafka服務是單機的

    2. topic

    topic 翻譯過來為主題. 一個kafka集群下有多個topic

    3. partition

    partition翻譯為分區,hive里面就有分區的概念,與hive的分區類似,一個topic 內有多個partition

    4. groupId

    結合實際說明:
    目前,我有 topic: kafka_lag ,該topic有兩個partition,目前往topic內生產10000條數據,按照默認的分區測試,partition 0,partition 1 分別有 5000 條數據.
    除此之外,我有兩個group:kafka-consumer-lag-1,kafka-consumer-lag-2

    首先:我使用kafka-consumer-lag-1 去消費topic內的數據,加入,partition0,1 分別消費2000 ,則offset 如下:

    groupIdtopicpartitioncurrentOffsetlagendOffset
    kafka-consumer-lag-1kafka_lag0200030005000
    kafka-consumer-lag-1kafka_lag1200030005000
    kafka-consumer-lag-2kafka_lag0050005000
    kafka-consumer-lag-2kafka_lag1050005000

    然后,我用 kafka-consumer-lag-2 去消費topic內的數據,partition 0,1 分區消費4000 ,則offset如下:

    groupIdtopicpartitioncurrentOffsetlagendOffset
    kafka-consumer-lag-1kafka_lag0200030005000
    kafka-consumer-lag-1kafka_lag1200030005000
    kafka-consumer-lag-2kafka_lag0400010005000
    kafka-consumer-lag-2kafka_lag1400010005000

    總結

    由上面的數據展示可知:

    topic + partition 對應唯一的endOffset

    topic + partition + group 對應唯一的currentOffset

    其實kafka 提供的 kafka-run-class.sh 就是使用的這個原理

  • 構造AdminClient,使用AdminClient 的listConsumerGroupOffsets() 根據groupid 獲取每一個 topic + partition + groupId 對應的唯一的currentOffset
  • 實例化KafkaConsumer對象,根據topic + partiton 組成的TopicPartition 對象集合,獲取 topic + partition 對應的唯一的endOffset
  • 通過一系列計算(endOffset - currentOffset),獲取到了groupID 對應的Lag ,最終打印呈現
  • 由于kafka 源碼是使用scala寫的,沒了解過scala的人看起來會比較困難,我用java重新給邏輯實現了一遍,代碼如下:

    package com.clb.lag;import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; import java.util.function.Consumer;/*** @author Xiahu* @create 2021/1/11*/ public class KafkaOffsetTool {private AdminClient adminClient;private static final String MISSING_COLUMN_VALUE = "-";private KafkaConsumer consumer;public KafkaOffsetTool() {Properties properties = new Properties();properties.put("bootstrap.servers", "node2:9092");//kerberos認證需要自己實現if (false) {properties.put("sasl.kerberos.service.name", "kafka");properties.put("sasl.mechanism", "GSSAPI");properties.put("security.protocol", "PLAINTEXT");}this.adminClient = AdminClient.create(properties);}public List<PartitionOffsetState> collectGroupOffsets(String group) throws Exception {List<PartitionOffsetState> result = new ArrayList<>();List<String> groupId = Arrays.asList(group);Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = adminClient.describeConsumerGroups(groupId).describedGroups();ConsumerGroupDescription consumerGroup = describedGroups.get(group).get();ConsumerGroupState state = consumerGroup.state();Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommitsOffsets(group);Collection<MemberDescription> memberDescriptions = consumerGroup.members();Set<MemberDescription> memberDescriptionSet = new HashSet<>();Iterator<MemberDescription> iterator = memberDescriptions.iterator();while (iterator.hasNext()) {MemberDescription memberDescription = iterator.next();if (null != memberDescription.assignment().topicPartitions()) {memberDescriptionSet.add(memberDescription);}}memberDescriptionSet.stream().sorted(new Comparator<MemberDescription>() {@Overridepublic int compare(MemberDescription o1, MemberDescription o2) {if (o1.assignment().topicPartitions().size() >= o2.assignment().topicPartitions().size()) {return 1;} else {return -1;}}}).forEach(new Consumer<MemberDescription>() {@Overridepublic void accept(MemberDescription memberDescription) {Set<TopicPartition> topicPartitions = memberDescription.assignment().topicPartitions();for (TopicPartition tp : topicPartitions) {long offset = committedOffsets.get(tp).offset();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(memberDescription.host());partitionOffsetState.setClientId(memberDescription.clientId());partitionOffsetState.setConsumerId(memberDescription.consumerId());partitionOffsetState.setPartition(tp.partition());partitionOffsetState.setTopic(tp.topic());partitionOffsetState.setOffset(offset);result.add(partitionOffsetState);}}});//封裝committedOffsetsIterator<Map.Entry<TopicPartition, OffsetAndMetadata>> entryIterator = committedOffsets.entrySet().iterator();while (entryIterator.hasNext()) {Map.Entry<TopicPartition, OffsetAndMetadata> entry = entryIterator.next();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(MISSING_COLUMN_VALUE);partitionOffsetState.setClientId(MISSING_COLUMN_VALUE);partitionOffsetState.setConsumerId(MISSING_COLUMN_VALUE);partitionOffsetState.setPartition(entry.getKey().partition());partitionOffsetState.setTopic(entry.getKey().topic());partitionOffsetState.setOffset(entry.getValue().offset());result.add(partitionOffsetState);}return result;}private Map<TopicPartition, OffsetAndMetadata> getCommitsOffsets(String groupId) throws Exception {Map<TopicPartition, OffsetAndMetadata> result = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();return result;}public List<PartitionOffsetState> getLag(List<PartitionOffsetState> partitionOffsetStateList,String groupId) {getConsumer(new Properties(), groupId);List<TopicPartition> topicPartitionList = new ArrayList<>();for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {topicPartitionList.add(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()));}Map<TopicPartition, Long> map = consumer.endOffsets(topicPartitionList);for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {if (entry.getKey().topic().equals(partitionOffset.getTopic()) && entry.getKey().partition() == partitionOffset.getPartition()) {partitionOffset.setLag(entry.getValue() - partitionOffset.getOffset());partitionOffset.setLogEndOffset(entry.getValue());}}}return partitionOffsetStateList;}private KafkaConsumer getConsumer(Properties prop, String groupId) {if (consumer == null) {createConsumer(prop, groupId);}return consumer;}public void createConsumer(Properties prop, String groupId) {//kerberos認證需要自己實現if (false) {System.setProperty("java.security.krb5.conf", prop.getProperty(NuwaConstant.KERBEROS_KRB5));System.setProperty("java.security.auth.login.config", prop.getProperty(NuwaConstant.KERBEROS_LOGIN_CONFIG));prop.put(NuwaConstant.KAFKA_SECURITY_PROTOCOL, prop.getProperty(NuwaConstant.KAFKA_SECURITY_PROTOCOL));prop.put(NuwaConstant.KAFKA_SASL_MECHANISM, prop.getProperty(NuwaConstant.KAFKA_SASL_MECHANISM));prop.put(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME, prop.getProperty(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME));}String deserializer = StringDeserializer.class.getName();String broker = "node1:9092";prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);consumer = new KafkaConsumer(prop);}public static void main(String[] args) throws Exception {KafkaOffsetTool kafkaOffsetTool = new KafkaOffsetTool();List<PartitionOffsetState> partitionOffsetStates = kafkaOffsetTool.collectGroupOffsets("kafka-lag");partitionOffsetStates = kafkaOffsetTool.getLag(partitionOffsetStates,"kafka-lag");System.out.println(partitionOffsetStates);} }

    PartitionOffsetState

    package com.clb.lag;import lombok.Data;/*** @author Xiahu* @create 2021/1/11*/ @Data public class PartitionOffsetState {private String group;private String coordinator;private String topic;private int partition;private Long offset;private Long lag;private String consumerId;private String host;private String clientId;private Long logEndOffset; }

    總結

    以上是生活随笔為你收集整理的Kafka Shell Lag的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。