kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?
問題描述
今天現(xiàn)場實(shí)施同事說Kafka Manager上顯示有3500w條記錄,但使用我們的平臺落地后,一統(tǒng)計(jì)發(fā)現(xiàn)只有2200w條記錄,這是不是說明我們的平臺存在丟數(shù)據(jù)的可能。
經(jīng)了解,對接方是通過如下界面來判斷topic中的記錄條數(shù)的。
?????上圖是Kafka Manager的其中一個(gè)界面,該界面顯示了Kafka Topic的分區(qū)數(shù),Broker的分布情況,以及每個(gè)Topic中Recent Offset之和。(在各個(gè)分區(qū)中,Offset值都是從0開始往后遞增的)
在很久以前,我們團(tuán)隊(duì)其實(shí)已經(jīng)考慮到了數(shù)據(jù)丟失的可能,于是,利用StreamingListener接口去監(jiān)聽StreamingListenerBatchCompleted事件,只有監(jiān)聽到該事件,才會去提交offset。理論上來說,我們的平臺是可以保證“至少一次”的語義。竟然敢懷疑我維護(hù)了四年的大數(shù)據(jù)平臺!不能忍,我得證明不是我的問題啊~
在開啟甩鍋大法之前,首先介紹一下與本題有一定關(guān)聯(lián)度的知識點(diǎn)——Kafka的老化機(jī)制。
kafka老化機(jī)制
有兩種情況,會觸發(fā)Kafka的老化機(jī)制
一、根據(jù)設(shè)定時(shí)間觸發(fā)Kafka老化。
相關(guān)的配置項(xiàng)有
log.retention.hourslog.retention.minuteslog.retention.ms這三個(gè)參數(shù)都是用來設(shè)置老化時(shí)間的,只是時(shí)間單位不太一樣。這些參數(shù)都是topic級別的,既可以通過命令的方式對特定的topic設(shè)置老化時(shí)間,也可以在server.properties文件里配置。
命令方式設(shè)置老化時(shí)間如下:
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000默認(rèn)情況下,配置文件中設(shè)置了log.retention.hours為168,另外兩個(gè)沒有給初始值,所以默認(rèn)情況下,kafka老化時(shí)間是7天。
二、根據(jù)每個(gè)分區(qū)保存數(shù)據(jù)大小。?
log.retention.bytes?該參數(shù)指定了每個(gè)分區(qū)保留多少字節(jié)數(shù)據(jù),數(shù)據(jù)量超過該值,便可觸發(fā)老化的動作。該參數(shù)值默認(rèn)為-1,是沒有限制的。
????另外,還要介紹一個(gè)配置項(xiàng)
log.retention.check.interval.ms默認(rèn)值300000????該參數(shù)表示kafka進(jìn)程中起了一個(gè)定時(shí)線程,該線程5分鐘掃描本機(jī)器管理的所有partition數(shù)據(jù),是否有分區(qū)滿足老化條件,從而觸發(fā)真正的老化動作。
????綜上:
????????默認(rèn)情況下,kafka僅僅根據(jù)時(shí)間來觸發(fā)老化動作。
Kafka?Manager介紹
Kafka Manager是Yahoo開源的項(xiàng)目,也是為了更方便去維護(hù)Yahoo的kafka集群。在我的平常使用中,我可以用它來做如下的工作:
1、觀察kafka topic 吞吐量,即每秒進(jìn)入多少條數(shù)據(jù)。根據(jù)這個(gè)信息,我們就可以判斷Spark Streaming要達(dá)到什么樣的處理速度,才能夠及時(shí)處理數(shù)據(jù)。
2、觀察kafka中最新的offset值之和,一般可以大致知道topic中的數(shù)據(jù)量。
3、觀察消費(fèi)組的消費(fèi)情況,消費(fèi)組消費(fèi)了各個(gè)分區(qū)消費(fèi)了多少條,滯留了多少條。這個(gè)信息,也可以判斷自己系統(tǒng)的處理能力。
4、觀察各個(gè)分區(qū)的數(shù)據(jù)是否均衡。
甩鍋過程
????下面過程用來證明Kafka Manager列出的summed recent offset值,在老化的情況下,并不能代表topic總條數(shù)。
1、向topic aypayp1 發(fā)送10條數(shù)據(jù),并在kafka Manager中確定summed recent offset值也是10。
2、用測試代碼,重頭消費(fèi)aypayp1 ?topic中的數(shù)據(jù)。
結(jié)果如下:
證明數(shù)據(jù)也是進(jìn)去了。
3、對該topic設(shè)置老化時(shí)間,命令如下:
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name aypayp1 --alter --add-config retention.ms=300000查看是否生效
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic aypayp1 --describe4、五分鐘后,再往該topic中發(fā)送10條消息,并用Kafka Manager確認(rèn)summed recent offset值為20。
5、再過五分鐘,重頭消費(fèi)該topic,消費(fèi)結(jié)果如下:
????通過上述過程,可以證明對接方的觀點(diǎn)是錯(cuò)誤的。
總結(jié)
1、kafka manager上顯示的summed recent offset值,只是表征該topic接收到多少條消息,并不能代表去消費(fèi)時(shí),就能消費(fèi)出這么多的消息。
2、Kafka提供了kafka-run-class.sh腳本,利用該腳本也可以查看topic每個(gè)partition的offset值區(qū)間范圍,該腳本的使用方式
# 查看各個(gè)partition的最小offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic --time -2 --broker-list host1:9092,host2:9092,host3:9092#?查看各個(gè)partition的最新offset值。sh kafka-run-class.sh kafka.tools.GetOffsetShell --topic mytopic--time?-1?--broker-list?host1:9092,host2:9092,host3:9092????查看GetOffsetShell類可知,--time設(shè)為-1時(shí),表示使用latest方式;--time設(shè)為-2時(shí),表示使用earliest方式。通過?該類的底層實(shí)現(xiàn)可知,其實(shí)就是用latest或earliest方式去消費(fèi)該topic,從而拿到這個(gè)區(qū)間范圍。
總結(jié)
以上是生活随笔為你收集整理的kafka查看topic中的数据_实战!Kafka Manager能统计出Topic中的记录条数吗?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支付宝支付回调异常_支付宝崩了是怎么回事
- 下一篇: 与基础事务管理器的通信失败 存货申请_金