日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 获取kafka lag,聊聊kafka consumer offset lag的监控

發(fā)布時間:2023/12/15 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 获取kafka lag,聊聊kafka consumer offset lag的监控 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

本文主要討論一下kafka consumer offset lag的監(jiān)控

方案

利用官方的類庫

ConsumerOffsetChecker

ConsumerGroupCommand

利用官方的JMX

ConsumerOffsetChecker

在0.8.2.2版本如下

kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala

object ConsumerOffsetChecker extends Logging {

private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()

private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()

private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()

private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {

//...

}

private def processPartition(zkClient: ZkClient,

group: String, topic: String, pid: Int) {

//...

}

private def processTopic(zkClient: ZkClient, group: String, topic: String) {

topicPidMap.get(topic) match {

case Some(pids) =>

pids.sorted.foreach {

pid => processPartition(zkClient, group, topic, pid)

}

case None => // ignore

}

}

private def printBrokerInfo() {

println("BROKER INFO")

for ((bid, consumerOpt)

consumerOpt match {

case Some(consumer) =>

println("%s -> %s:%d".format(bid, consumer.host, consumer.port))

case None => // ignore

}

}

def main(args: Array[String]) {

//...

try {

zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)

val topicList = topics match {

case Some(x) => x.split(",").view.toList

case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList

}

topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)

val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq

val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)

debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))

channel.send(OffsetFetchRequest(group, topicPartitions))

val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)

debug("Received offset fetch response %s.".format(offsetFetchResponse))

offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>

if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {

val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)

// this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool

// (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)

try {

val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong

offsetMap.put(topicAndPartition, offset)

} catch {

case z: ZkNoNodeException =>

if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))

offsetMap.put(topicAndPartition,-1)

else

throw z

}

}

else if (offsetAndMetadata.error == ErrorMapping.NoError)

offsetMap.put(topicAndPartition, offsetAndMetadata.offset)

else {

println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))

}

}

channel.disconnect()

println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))

topicList.sorted.foreach {

topic => processTopic(zkClient, group, topic)

}

if (options.has("broker-info"))

printBrokerInfo()

for ((_, consumerOpt)

consumerOpt match {

case Some(consumer) => consumer.close()

case None => // ignore

}

}

catch {

case t: Throwable =>

println("Exiting due to: %s.".format(t.getMessage))

}

finally {

for (consumerOpt

consumerOpt match {

case Some(consumer) => consumer.close()

case None => // ignore

}

}

if (zkClient != null)

zkClient.close()

if (channel != null)

channel.disconnect()

}

}

}

缺點就是該類是給命令行調(diào)用的,每調(diào)用一次,就new一次zkClient,對于監(jiān)控用來說,不是太合適,需要改造一下,抽取zkClient出來

ConsumerGroupCommand

0.8.2.2以上版本使用ConsumerGroupCommand替代了ConsumerOffsetChecker

kafka_2.11-0.10.2.1-sources.jar!/kafka/admin/ConsumerGroupCommand.scala

object ConsumerGroupCommand extends Logging {

//...

def main(args: Array[String]) {

val opts = new ConsumerGroupCommandOptions(args)

if (args.length == 0)

CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")

// should have exactly one action

val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)

if (actions != 1)

CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")

opts.checkArgs()

val consumerGroupService = {

if (opts.useOldConsumer) {

System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")

new ZkConsumerGroupService(opts)

} else {

System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n")

new KafkaConsumerGroupService(opts)

}

}

try {

if (opts.options.has(opts.listOpt))

consumerGroupService.listGroups().foreach(println(_))

else if (opts.options.has(opts.describeOpt)) {

val (state, assignments) = consumerGroupService.describeGroup()

val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head

assignments match {

case None =>

// applies to both old and new consumer

printError(s"The consumer group '$groupId' does not exist.")

case Some(assignments) =>

if (opts.useOldConsumer)

printAssignment(assignments, false)

else

state match {

case Some("Dead") =>

printError(s"Consumer group '$groupId' does not exist.")

case Some("Empty") =>

System.err.println(s"Consumer group '$groupId' has no active members.")

printAssignment(assignments, true)

case Some("PreparingRebalance") | Some("AwaitingSync") =>

System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")

printAssignment(assignments, true)

case Some("Stable") =>

printAssignment(assignments, true)

case other =>

// the control should never reach here

throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")

}

}

}

else if (opts.options.has(opts.deleteOpt)) {

consumerGroupService match {

case service: ZkConsumerGroupService => service.deleteGroups()

case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")

}

}

} catch {

case e: Throwable =>

printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))

} finally {

consumerGroupService.close()

}

}

}

也是基于命令行來設計的

JMX

這個是利用kafka本身寫入的JMX的數(shù)據(jù),就不用額外在去像ConsumerOffsetChecker去自己連接再去獲取。比如

ObjectName oName = new ObjectName("kafka.producer:*");

Set metricsBeans = mBeanServer.queryNames(oName, null);

for (ObjectName mBeanName : metricsBeans) {

MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName);

MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes();

for (MBeanAttributeInfo metricsAttr : metricsAttrs) {

//get value

Object value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName());

//process ...

}

}

小結(jié)

可以自己改造ConsumerOffsetChecker或者ConsumerGroupCommand,然后上報到statsd或者Prometheus。當然能利用JMX是最省事的了。

doc

總結(jié)

以上是生活随笔為你收集整理的java 获取kafka lag,聊聊kafka consumer offset lag的监控的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 免费无码av片在线观看 | 欧美性猛交富婆 | 亚洲第一视频 | 大牛影视剧免费播放在线 | 黑人巨大精品欧美一区二区 | 日日插日日操 | 大地资源高清播放在线观看 | 9色av | 九九三级 | youjizz韩国 | 韩国久久久久 | 久久逼逼 | 成人av地址 | 最近中文字幕在线免费观看 | 久久精品国产亚洲av麻豆图片 | 青青草免费在线观看 | 亚洲福利视频网站 | 99久99| 四虎精品欧美一区二区免费 | xxxx久久 | 国产人妖av | 国产 欧美 日韩 一区 | 精品无码在线观看 | 亚洲色图14p| 日韩一区二区视频在线播放 | 中文字幕在线播放一区 | 日韩三级免费观看 | 国产精品成人久久电影 | 国产精品18久久久久久无码 | 亚洲无限观看 | 亚洲夜夜夜 | 一区二区三区视频免费看 | 婷婷久久久久 | 五月婷婷激情综合网 | 女人16一毛片 | 亚洲三级黄色 | 国产a v一区二区三区 | 免费在线观看一区二区 | 激情天天| 中国特级黄色大片 | 在线欧美日韩国产 | 在线aa| 男人用嘴添女人下身免费视频 | 夜夜狠狠 | 成人午夜免费在线观看 | 又污又黄的网站 | 伊人爱爱网 | 日韩国产片 | 一区二区三区四区免费视频 | 人妻天天爽夜夜爽一区二区三区 | 日韩1区| 自拍偷拍20p| 欧美日韩在线免费观看 | 国产黄色影视 | 国产又粗又猛又爽69xx | 欧洲国产视频 | 五月婷婷一区二区三区 | 美女下部无遮挡 | 亚洲欧美bt | 久久久久久久久久久久国产 | 黄色另类小说 | 国产精品你懂的 | 成人理论视频 | 97免费观看视频 | 国产精品黄在线观看 | 日韩一级片av | 日美女网站 | 毛片在线免费 | 69视频污 | 亚洲乱码国产乱码精品精剪 | 尤物精品 | 日韩a级黄色片 | 亚洲精品视频免费 | 久久久ww | 寻找身体恐怖电影免费播放 | 亚洲第一综合色 | 男人天堂aaa | av免费播放网站 | 午夜一级大片 | 曰本丰满熟妇xxxx性 | 亚洲高清成人 | 免费网站www在线观看 | 国产男女猛烈无遮挡免费视频动漫 | 欧美8888 | 朝桐光在线播放 | 懂色av一区二区三区免费 | 国产精品久久久久久久久免费软件 | 自拍偷在线精品自拍偷无码专区 | 国产99免费| 日本精品视频网站 | sm调教羞耻姿势图片 | 中国一级免费毛片 | 性欢交69精品久久久 | 少妇3p视频 | 中国美女一级黄色片 | 一本大道久久久久精品嫩草 | 成人人人人人欧美片做爰 | 亚洲国产精品18久久久久久 | 91极品视频|