日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?

發(fā)布時(shí)間:2025/3/19 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗? 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問題描述

今天現(xiàn)場實(shí)施同事說Kafka Manager上顯示有3500w條記錄,但使用我們的平臺落地后,一統(tǒng)計(jì)發(fā)現(xiàn)只有2200w條記錄,這是不是說明我們的平臺存在丟數(shù)據(jù)的可能。

經(jīng)了解,對接方是通過如下界面來判斷topic中的記錄條數(shù)的。

?????上圖是Kafka Manager的其中一個(gè)界面,該界面顯示了Kafka Topic的分區(qū)數(shù),Broker的分布情況,以及每個(gè)Topic中Recent Offset之和。(在各個(gè)分區(qū)中,Offset值都是從0開始往后遞增的)

在很久以前,我們團(tuán)隊(duì)其實(shí)已經(jīng)考慮到了數(shù)據(jù)丟失的可能,于是,利用StreamingListener接口去監(jiān)聽StreamingListenerBatchCompleted事件,只有監(jiān)聽到該事件,才會去提交offset。理論上來說,我們的平臺是可以保證“至少一次”的語義。竟然敢懷疑我維護(hù)了四年的大數(shù)據(jù)平臺!不能忍,我得證明不是我的問題啊~

在開啟甩鍋大法之前,首先介紹一下與本題有一定關(guān)聯(lián)度的知識點(diǎn)——Kafka的老化機(jī)制。

kafka老化機(jī)制

有兩種情況,會觸發(fā)Kafka的老化機(jī)制

一、根據(jù)設(shè)定時(shí)間觸發(fā)Kafka老化。

相關(guān)的配置項(xiàng)有

log.retention.hourslog.retention.minuteslog.retention.ms

這三個(gè)參數(shù)都是用來設(shè)置老化時(shí)間的,只是時(shí)間單位不太一樣。這些參數(shù)都是topic級別的,既可以通過命令的方式對特定的topic設(shè)置老化時(shí)間,也可以在server.properties文件里配置。

命令方式設(shè)置老化時(shí)間如下:

./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000

默認(rèn)情況下,配置文件中設(shè)置了log.retention.hours為168,另外兩個(gè)沒有給初始值,所以默認(rèn)情況下,kafka老化時(shí)間是7天。

二、根據(jù)每個(gè)分區(qū)保存數(shù)據(jù)大小。?

log.retention.bytes

?該參數(shù)指定了每個(gè)分區(qū)保留多少字節(jié)數(shù)據(jù),數(shù)據(jù)量超過該值,便可觸發(fā)老化的動作。該參數(shù)值默認(rèn)為-1,是沒有限制的。

????另外,還要介紹一個(gè)配置項(xiàng)

log.retention.check.interval.ms默認(rèn)值300000

????該參數(shù)表示kafka進(jìn)程中起了一個(gè)定時(shí)線程,該線程5分鐘掃描本機(jī)器管理的所有partition數(shù)據(jù),是否有分區(qū)滿足老化條件,從而觸發(fā)真正的老化動作。

????綜上:

????????默認(rèn)情況下,kafka僅僅根據(jù)時(shí)間來觸發(fā)老化動作。

Kafka?Manager介紹

Kafka Manager是Yahoo開源的項(xiàng)目,也是為了更方便去維護(hù)Yahoo的kafka集群。在我的平常使用中,我可以用它來做如下的工作:

1、觀察kafka topic 吞吐量,即每秒進(jìn)入多少條數(shù)據(jù)。根據(jù)這個(gè)信息,我們就可以判斷Spark Streaming要達(dá)到什么樣的處理速度,才能夠及時(shí)處理數(shù)據(jù)。

2、觀察kafka中最新的offset值之和,一般可以大致知道topic中的數(shù)據(jù)量。

3、觀察消費(fèi)組的消費(fèi)情況,消費(fèi)組消費(fèi)了各個(gè)分區(qū)消費(fèi)了多少條,滯留了多少條。這個(gè)信息,也可以判斷自己系統(tǒng)的處理能力。

4、觀察各個(gè)分區(qū)的數(shù)據(jù)是否均衡。

甩鍋過程

????下面過程用來證明Kafka Manager列出的summed recent offset值,在老化的情況下,并不能代表topic總條數(shù)。

1、向topic aypayp1 發(fā)送10條數(shù)據(jù),并在kafka Manager中確定summed recent offset值也是10。

2、用測試代碼,重頭消費(fèi)aypayp1 ?topic中的數(shù)據(jù)。

結(jié)果如下:

證明數(shù)據(jù)也是進(jìn)去了。

3、對該topic設(shè)置老化時(shí)間,命令如下:

./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000

查看是否生效

./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic aypayp1 --describe

4、五分鐘后,再往該topic中發(fā)送10條消息,并用Kafka Manager確認(rèn)summed recent offset值為20。

5、再過五分鐘,重頭消費(fèi)該topic,消費(fèi)結(jié)果如下:

????通過上述過程,可以證明對接方的觀點(diǎn)是錯(cuò)誤的。

總結(jié)

1、kafka manager上顯示的summed recent offset值,只是表征該topic接收到多少條消息,并不能代表去消費(fèi)時(shí),就能消費(fèi)出這么多的消息。

2、Kafka提供了kafka-run-class.sh腳本,利用該腳本也可以查看topic每個(gè)partition的offset值區(qū)間范圍,該腳本的使用方式

# 查看各個(gè)partition的最小offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic --time -2 --broker-list host1:9092,host2:9092,host3:9092#?查看各個(gè)partition的最新offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic--time?-1?--broker-list?host1:9092,host2:9092,host3:9092

????查看GetOffsetShell類可知,--time設(shè)為-1時(shí),表示使用latest方式;--time設(shè)為-2時(shí),表示使用earliest方式。通過?該類的底層實(shí)現(xiàn)可知,其實(shí)就是用latest或earliest方式去消費(fèi)該topic,從而拿到這個(gè)區(qū)間范圍。

總結(jié)

以上是生活随笔為你收集整理的kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。