Lag 應該算是最最重要的監控指標了。它直接反映了一個消費者的運行情況。一個正常工作的消費者,它的 Lag 值應該很小,甚至是接近于 0 的,這表示該消費者能夠及時地消費生產者生產出來的消息,滯后程度很小。反之,如果一個消費者 Lag 值很大,通常就表明它無法跟上生產者的速度,最終 Lag 會越來越大,從而拖慢下游消息的處理速度。
通常來說,Lag 的單位是消息數,而且我們一般是在主題這個級別上討論 Lag 的,但實際上,Kafka 監控 Lag 的層級是在分區上的。如果要計算主題級別的,你需要手動匯總所有主題分區的 Lag,將它們累加起來,合并成最終的 Lag 值。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));}} catch (InterruptedException e) {Thread.currentThread().interrupt();// 處理中斷異常// ...return Collections.emptyMap();} catch (ExecutionException e) {// 處理ExecutionException// ...return Collections.emptyMap();} catch (TimeoutException e) {throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);}}}
?
Kafka JMX 監控指標
Kafka 默認提供的 JMX 監控指標來監控消費者的 Lag 值
Kafka 消費者提供了一個名為?kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標,里面有很多屬性。和我們今天所講內容相關的有兩組屬性:records-lag-max?和?records-lead-min,它們分別表示此消費者在測試窗口時間內曾經達到的最大的 Lag 值和最小的 Lead 值。
Lead 值是指消費者最新消費消息的位移與分區當前第一條消息位移的差值。很顯然,Lag 和 Lead 是一體的兩個方面:Lag 越大的話,Lead 就越小,反之也是同理。
Kafka 消費者還在分區級別提供了額外的 JMX 指標,用于單獨監控分區級別的 Lag 和 Lead 值。JMX 名稱為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”