日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka java获取topic_通过编程方式获取Kafka中Topic的Metadata信息

發(fā)布時間:2025/3/11 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka java获取topic_通过编程方式获取Kafka中Topic的Metadata信息 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

如果我們需要通過編程的方式來獲取到TopicMetadataRequest請求到 def findLeader(topic: String): Unit = {

val consumer = connect("www.iteblog.com", 9092)

val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,

0, kafkaGroupId, List(topic))

val topicMetadataResponse = consumer.send(req)

val topicsMetadataSet = topicMetadataResponse.topicsMetadata

topicsMetadataSet.foreach { topicMetadata =>

println(topicMetadata.topic)

val metadataSet = topicMetadata.partitionsMetadata

metadataSet.foreach { metadata =>

val partitionId = metadata.partitionId

val isr = metadata.isr.map(_.connectionString).mkString("[", ",", "]")

val replicas = metadata.replicas.map(_.connectionString).mkString("[", ",", "]")

val leader = metadata.leader.map (_.connectionString).get

println(s"\tPartition: $partitionId, Leader: $leader Replicas: $replicas ISR: $isr")

}

}

}

TopicMetadataRequest是一個case class,其各個參數(shù)如下: case class TopicMetadataRequest(val versionId: Short,

val correlationId: Int,

val clientId: String,

val topics: Seq[String])

構(gòu)造完成TopicMetadataRequest之后,通過SimpleConsumer的send方法發(fā)送請求,然后返回TopicMetadataResponse對象,其中就包含了Topic各個分區(qū)的相關(guān)信息,我們運行這個函數(shù),可以得到以下的信息: iteblog

Partition: 0, Leader: www.iteblog.com:9091 Replicas: [www.iteblog.com:9091] ISR: [www.iteblog.com:9091]

Partition: 1, Leader: www.iteblog.com:9097 Replicas: [www.iteblog.com:9097] ISR: [www.iteblog.com:9097]

Partition: 2, Leader: www.iteblog.com:9095 Replicas: [www.iteblog.com:9095] ISR: [www.iteblog.com:9095]

Partition: 3, Leader: www.iteblog.com:9096 Replicas: [www.iteblog.com:9096] ISR: [www.iteblog.com:9096]

Partition: 4, Leader: www.iteblog.com:9094 Replicas: [www.iteblog.com:9094] ISR: [www.iteblog.com:9094]

Partition: 5, Leader: www.iteblog.com:9092 Replicas: [www.iteblog.com:9092] ISR: [www.iteblog.com:9092]

Partition: 6, Leader: www.iteblog.com:9093 Replicas: [www.iteblog.com:9093] ISR: [www.iteblog.com:9093]

這個輸出是不是很熟悉,是的,輸出的結(jié)果類似于運行以下的Kafka自帶系統(tǒng)命令: [iteblog@www.iteblog.com kafka]$ ./bin/kafka-topics.sh --topic iteblog --describe \

--zookeeper www.iteblog.com

Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs:

Topic: iteblog Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Topic: iteblog Partition: 1 Leader: 7 Replicas: 7 Isr: 7

Topic: iteblog Partition: 2 Leader: 5 Replicas: 5 Isr: 5

Topic: iteblog Partition: 3 Leader: 6 Replicas: 6 Isr: 6

Topic: iteblog Partition: 4 Leader: 4 Replicas: 4 Isr: 4

Topic: iteblog Partition: 5 Leader: 2 Replicas: 2 Isr: 2

Topic: iteblog Partition: 6 Leader: 3 Replicas: 3 Isr: 3

如果我們設(shè)置空的topic的列表,如:TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, kafkaGroupId, Seq()),那么我們可以獲取Kafka server中所有Topic的信息。

總結(jié)

以上是生活随笔為你收集整理的kafka java获取topic_通过编程方式获取Kafka中Topic的Metadata信息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。