日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)

發布時間:2023/12/8 java 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java使用kafka的API來監控kafka的某些topic的數據量增量,offset,定時查總量之后,然后計算差值,然后就可以算單位間隔的每個topic的增量,kafka監控一般都是監控的吞吐量,即數據量的大小,而不在意這個count,數量。額,這個就是在意count。統計一下count??偨Y最近的偏移量---Summed Recent Offsets.

使用的jar依賴

compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'

Java代碼

import com.google.common.collect.Lists; import com.google.common.collect.Maps; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Date; import java.util.List; import java.util.Map;/*** kafka監控 topic的數據消費情況** @author LiXuekai on 2020/9/16*/ public class KafkaMonitorTools {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class);public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}/*** @param brokers broker 地址* @param topic topic* @return map<分區, 分區count信息>*/public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) {Map<Integer, PartitionMetadata> map = Maps.newHashMap();for (String broker : brokers) {SimpleConsumer consumer = null;try {String[] hostAndPort = broker.split(":");consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());List<String> topics = Lists.newArrayList(topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {map.put(part.partitionId(), part);}}} catch (Exception e) {LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);} finally {if (consumer != null)consumer.close();}}return map;}public static Map<String, Long> monitor(List<String> brokers, List<String> topics) {if (brokers == null || brokers.isEmpty()) {return null;}if (topics == null || topics.isEmpty()) {return null;}Map<String, Long> map = Maps.newTreeMap();for (String topicName : topics) {Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName);long size = 0L;for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) {int partition = entry.getKey();String leadBroker = entry.getValue().leader().host();String clientName = "Client_" + topicName + "_" + partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);size += readOffset;consumer.close();}map.put(topicName, size);}return map;} }

測試代碼:

private final String topics = "a,b,c,d,e,f";private final String server = "1.1.1.11:92";@Testpublic void monitor() {Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(",")));monitor.forEach((k, v)-> System.out.println("topic:" + k + " \tSummed Recent Offsets:" + v));}

使用的卡夫卡版本的截圖

總結

以上是生活随笔為你收集整理的Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。