日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

如何获取Kafka的消费者详情——从Scala到Java的切换

發布時間:2024/4/11 java 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何获取Kafka的消费者详情——从Scala到Java的切换 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/how-to-get-kafka-consumer-details/


前文摘要

在上一篇文章《Kafka的Lag計算誤區及正確實現》中介紹了如何計算消費者的消費滯后量(Lag),并且講解了如何調用Kafka的kafka.admin.ConsumerGroupCommand文件中的KafkaConsumerGroupService來發送OffsetRequest和OffsetFetchRequest兩個請求,進而通過兩個請求結果之間的差值來獲得結果。不過如果你不想修改kafka-core的代碼并重新編譯的話,這種實現方式無法成功,所以本文的主要目的就是通過調用更底層的API來實現不修改kafka-core的代碼來實現KafkaConsumerGroupService的功能,即通過Java調用Scala的代碼來實現獲取Kafka的消費者詳情的功能。

目標及實現

實現如同 bin/kafka-consumer-group.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID的效果:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic-test1 0 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 1 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 2 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID topic-test1 3 1648 1648 0 CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261 /192.168.92.1 CLIENT_ID

KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法參數為一個consumer group的groupId,方法輸出為上面示例中的列表信息。CollectGroupAssignment方法主要有以下幾個步驟:

  • 根據groupId調用describeConsumerGroup方法(內部原理是發送DescribeGroupsRequest請求)來獲取consumer group的基本信息,參考上面示例中的CONSUMER-ID、HOST、CLIENT-ID以及TopicPartition信息,但是沒有CURRENT-OFFSET、LOG-END-OFFSET、LAG信息。注意這里的LOG-END-OFFSET是消費者可見的LEO,不是生產者可見的LEO,也就是通俗意義上的HW。
  • 根據groupId調用listGroupOffsets方法(內部原理是發送OffsetFetchRequest請求)來獲取各個分區(Partition)的對應的消費位移CURRENT-OFFSET。
  • 通過調用KafkaConsumer的endOffsets方法來獲取TopicPartition對應的HW,即示例中的LOG-END-OFFSET。
  • 計算Lag并組合成信息列表List<PartitionAssignmentState>。
  • 改造

    對應Java版的KafkaConsumerGroupService改造代碼可以參見代碼,目錄結構如下圖所示:

    其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是簡單的JavaBean, PartitionAssignmentState是用來保存每個TopicPartition的消費者信息的,具體內容參考如下。KafkaConsumerGroupCustomService就是本文所要陳述的Java改造辦的KafkaConsumerGroupSerivice,ConsumerGroupUtils用來存放一些公用的代碼。

    @Data @Builder public class PartitionAssignmentState {private String group; // groupIdprivate Node coordinator; // consumer coodinator節點信息private String topic;private int partition;private long offset;private long lag;private String consumerId;private String host;private String clientId;private long logEndOffset; }

    初始化KafkaConsumerGroupCustomService需要Kafka的服務端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了眾多管理類方法,主要是通過發送各種自定義協議請求來完成,上面步驟中所說的describeConsumerGroup和listGroupOffsets方法也是通過AdminClient來實現的;KafkaConsumer主要是用來獲取TopicPartition對應的HW(消費者可見的LogEndOffsets)的。

    KafkaConsumerGroupCustomService中與scala版對應的collectGroupAssignment方法如下(詳細步驟參考代碼注釋):

    public List<PartitionAssignmentState> collectGroupAssignment(AdminClient adminClient, KafkaConsumer<String, String> consumer,String group) {//1. 獲取consumer group的基本信息,包括CONSUMER-ID、HOST、// CLIENT-ID以及TopicPartition信息AdminClient.ConsumerGroupSummary consumerGroupSummary= adminClient.describeConsumerGroup(group, 0);List<TopicPartition> assignedTopicPartitions = new ArrayList<>();List<PartitionAssignmentState> rowsWithConsumer = new ArrayList<>();scala.collection.immutable.List<AdminClient.ConsumerSummary> consumers= consumerGroupSummary.consumers().get();if (consumers != null) {//2. 獲取各個分區(Partition)的對應的消費位移CURRENT-OFFSETscala.collection.immutable.Map<TopicPartition, Object> offsets= adminClient.listGroupOffsets(group);if (offsets.nonEmpty()) {String state = consumerGroupSummary.state();// 3. 還有一個狀態是Dead表示"group"對應的consumer group不存在if (state.equals("Stable") || state.equals("Empty")|| state.equals("PreparingRebalance")|| state.equals("AwaitingSync")) {List<ConsumerSummary> consumerList = changeToJavaList(consumers);// 4. 獲取當前有消費者的消費信息,即包含CONSUMER-ID、HOST、CLIENT-IDrowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets,consumer, consumerList, assignedTopicPartitions, group);}}//5. 獲取當前沒有消費者的消費信息List<PartitionAssignmentState> rowsWithoutConsumer =getRowsWithoutConsumer(consumerGroupSummary,offsets, consumer, assignedTopicPartitions, group);//6. 合并結果rowsWithConsumer.addAll(rowsWithoutConsumer);}return rowsWithConsumer; }

    KafkaConsumerGroupCustomService類中包含有getRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala語言與Java語言之間進行切換,這樣可以不需要修改kafka-core的原生代碼而通過外部的封裝調用既可以實現獲取Kafka消費者詳情的功能。光看代碼比較抽象,建議對此感興趣的同學可以親自對比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice與筆者自定義的KafkaConsumerGroupCustomService的實現來了解下Scala語言到Java語言的轉換。

    如果需要打印詳情可以調用KafkaConsumerGroupCustomService同目錄的ConsumerGroupUtils類中的printPasList(List list)方法。注意要運行這些代碼需要JDK8的環境,筆者為了讓代碼顯得“騷氣”一點就用來一點Java8的語法,如果需要Java7的代碼實現可以關注私聊。

    或許有些同學對于Scala和Java交叉的代碼并不感冒,想要尋求一種存Java式的實現方式,那么在這里怎么實現呢?答案是通過KafkaAdminClient,它是AdminClient的Java版實現,從Kafka0.11.0.0版本開始引入的,不過KafkaAdminClient本身并沒有提供describeConsumerGroup、listGroupOffsets之類的方法給我們直接使用,擴展一下也很方便,由于篇幅限制,這部分的內容將在下一篇文章中進行介紹,如果想要先一睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….

    歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/how-to-get-kafka-consumer-details/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的如何获取Kafka的消费者详情——从Scala到Java的切换的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。