kafka重复消费问题
開(kāi)篇提示:kafka重復(fù)消費(fèi)的根本原因就是“數(shù)據(jù)消費(fèi)了,但是offset沒(méi)更新”!而我們要探究一般什么情況下會(huì)導(dǎo)致offset沒(méi)更新?
今天查看Elasticsearch索引的時(shí)候發(fā)現(xiàn)有一個(gè)索引莫名的多了20w+的數(shù)據(jù),頓時(shí)心里一陣驚訝,然后趕緊打開(kāi)訂閱服務(wù)的日志(消費(fèi)者),眼前的一幕讓我驚呆了,我的消費(fèi)服務(wù)的控制臺(tái)一直在不斷的刷著消費(fèi)日志(剛開(kāi)始我并沒(méi)有意識(shí)到這是重復(fù)消費(fèi)造成的),我還傻傻的以為是因?yàn)榻裉煊腥嗽谒?#xff0c;所以導(dǎo)致日志狂刷,畢竟之前也遇到過(guò)有人用自動(dòng)交易軟件瘋狂刷單的,所以當(dāng)時(shí)也沒(méi)在意;等過(guò)了幾分鐘,又去瞅了一眼控制臺(tái)仍然在瘋狂的刷著日志,媽呀!頓時(shí)隱隱感覺(jué)不對(duì)勁,趕緊看了一眼es索引,我滴天一下子多了幾萬(wàn)的數(shù)據(jù),突然在想是不是程序出問(wèn)題了(因?yàn)轭^一天晚上發(fā)了一個(gè)版本),然后就開(kāi)始死盯這日志看,發(fā)現(xiàn)了一個(gè)奇葩的問(wèn)題:tmd怎么日志打印的數(shù)據(jù)都是重復(fù)的呀!這才恍然大悟,不用想了絕逼是kakfa重復(fù)消費(fèi)了,好吧!能有什么辦法了,開(kāi)始瘋狂的尋找解決的辦法......
既然之前沒(méi)有問(wèn)題,那就是我昨天發(fā)版所導(dǎo)致的,那么我昨天究竟改了什么配置呢?對(duì)照了之前的版本比較了一下,發(fā)現(xiàn)這個(gè)參數(shù)enable-auto-commit被改成了true,即自動(dòng)提交,理論上在數(shù)據(jù)并發(fā)不大,以及數(shù)據(jù)處理不耗時(shí)的情況下設(shè)置自動(dòng)提交是沒(méi)有什么問(wèn)題的,但是我的情況恰恰相反,可能突然會(huì)并發(fā)很大(畢竟交易流水不好說(shuō)的),所以可能在規(guī)定的時(shí)間(session.time.out默認(rèn)30s)內(nèi)沒(méi)有消費(fèi)完,就會(huì)可能導(dǎo)致re-blance重平衡,導(dǎo)致一部分offset自動(dòng)提交失敗,然后重平衡后重復(fù)消費(fèi)(這種很常見(jiàn));或者關(guān)閉kafka時(shí),如果在close之前,調(diào)用consumer.unsubscribe()則可能有部分offset沒(méi)提交,下次重啟會(huì)重復(fù)消費(fèi)
try {
consumer.unsubscribe();
} catch (Exception e) {
}
try {
consumer.close();
} catch (Exception e) {
}
?
所以一般情況下我們?cè)O(shè)置offset自動(dòng)提交為false!
解決方法:
1.設(shè)置
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest2.就是修改offset為最新的偏移量唄!我們都知道offset是存在zookeeper中的,所以我就不贅述了!
我的解決方法:
我并沒(méi)有去修改offset偏移量,畢竟生產(chǎn)環(huán)境還是不直接改這個(gè)了;
我重新指定了一個(gè)消費(fèi)組(group.id=order_consumer_group),然后指定auto-offset-reset=latest這樣我就只需要重啟我的服務(wù)了,而不需要?jiǎng)觡afka和zookeeper了!
?
#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest注:如果你想要消費(fèi)者從頭開(kāi)始消費(fèi)某個(gè)topic的全量數(shù)據(jù),可以重新指定一個(gè)全新的group.id=new_group,然后指定auto-offset-reset=earliest即可
?
補(bǔ)充:
在kafka0.9.0版本的時(shí)候,開(kāi)始啟用了新的consumer config,這個(gè)新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要漸漸弱化zk的依賴,把zk依賴隱藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相關(guān)的有如下兩個(gè)改動(dòng):
從0.8.2版本開(kāi)始Kafka開(kāi)始支持將consumer的位移信息保存在Kafka內(nèi)部的topic中(從0.9.0版本開(kāi)始默認(rèn)將offset存儲(chǔ)到系統(tǒng)topic中)
Coordinator一般指的是運(yùn)行在broker上的group Coordinator,用于管理Consumer Group中各個(gè)成員,每個(gè)KafkaServer都有一個(gè)GroupCoordinator實(shí)例,管理多個(gè)消費(fèi)者組,主要用于offset位移管理和Consumer Rebalance。
rebalance時(shí)機(jī)
在如下條件下,partition要在consumer中重新分配:
__consumer_offsets
Consumer通過(guò)發(fā)送OffsetCommitRequest請(qǐng)求到指定broker(偏移量管理者)提交偏移量。這個(gè)請(qǐng)求中包含一系列分區(qū)以及在這些分區(qū)中的消費(fèi)位置(偏移量)。偏移量管理者會(huì)追加鍵值(key-value)形式的消息到一個(gè)指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition組成的,而value是偏移量。
?
參考:https://segmentfault.com/a/1190000011441747
總結(jié)
以上是生活随笔為你收集整理的kafka重复消费问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 博客转移至 https://www.ba
- 下一篇: kafka自动提交offset失败:Au