日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka重复消费问题

發(fā)布時(shí)間:2023/12/15 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka重复消费问题 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

開(kāi)篇提示:kafka重復(fù)消費(fèi)的根本原因就是“數(shù)據(jù)消費(fèi)了,但是offset沒(méi)更新!而我們要探究一般什么情況下會(huì)導(dǎo)致offset沒(méi)更新?

今天查看Elasticsearch索引的時(shí)候發(fā)現(xiàn)有一個(gè)索引莫名的多了20w+的數(shù)據(jù),頓時(shí)心里一陣驚訝,然后趕緊打開(kāi)訂閱服務(wù)的日志(消費(fèi)者),眼前的一幕讓我驚呆了,我的消費(fèi)服務(wù)的控制臺(tái)一直在不斷的刷著消費(fèi)日志(剛開(kāi)始我并沒(méi)有意識(shí)到這是重復(fù)消費(fèi)造成的),我還傻傻的以為是因?yàn)榻裉煊腥嗽谒?#xff0c;所以導(dǎo)致日志狂刷,畢竟之前也遇到過(guò)有人用自動(dòng)交易軟件瘋狂刷單的,所以當(dāng)時(shí)也沒(méi)在意;等過(guò)了幾分鐘,又去瞅了一眼控制臺(tái)仍然在瘋狂的刷著日志,媽呀!頓時(shí)隱隱感覺(jué)不對(duì)勁,趕緊看了一眼es索引,我滴天一下子多了幾萬(wàn)的數(shù)據(jù),突然在想是不是程序出問(wèn)題了(因?yàn)轭^一天晚上發(fā)了一個(gè)版本),然后就開(kāi)始死盯這日志看,發(fā)現(xiàn)了一個(gè)奇葩的問(wèn)題:tmd怎么日志打印的數(shù)據(jù)都是重復(fù)的呀!這才恍然大悟,不用想了絕逼是kakfa重復(fù)消費(fèi)了,好吧!能有什么辦法了,開(kāi)始瘋狂的尋找解決的辦法......

既然之前沒(méi)有問(wèn)題,那就是我昨天發(fā)版所導(dǎo)致的,那么我昨天究竟改了什么配置呢?對(duì)照了之前的版本比較了一下,發(fā)現(xiàn)這個(gè)參數(shù)enable-auto-commit被改成了true,即自動(dòng)提交,理論上在數(shù)據(jù)并發(fā)不大,以及數(shù)據(jù)處理不耗時(shí)的情況下設(shè)置自動(dòng)提交是沒(méi)有什么問(wèn)題的,但是我的情況恰恰相反,可能突然會(huì)并發(fā)很大(畢竟交易流水不好說(shuō)的),所以可能在規(guī)定的時(shí)間(session.time.out默認(rèn)30s)內(nèi)沒(méi)有消費(fèi)完,就會(huì)可能導(dǎo)致re-blance重平衡,導(dǎo)致一部分offset自動(dòng)提交失敗,然后重平衡后重復(fù)消費(fèi)(這種很常見(jiàn));或者關(guān)閉kafka時(shí),如果在close之前,調(diào)用consumer.unsubscribe()則可能有部分offset沒(méi)提交,下次重啟會(huì)重復(fù)消費(fèi)

try {
consumer.unsubscribe();
} catch (Exception e) {
}

try {
consumer.close();
} catch (Exception e) {
}

?

所以一般情況下我們?cè)O(shè)置offset自動(dòng)提交為false!

解決方法:

1.設(shè)置

spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest

2.就是修改offset為最新的偏移量唄!我們都知道offset是存在zookeeper中的,所以我就不贅述了!

我的解決方法:

我并沒(méi)有去修改offset偏移量,畢竟生產(chǎn)環(huán)境還是不直接改這個(gè)了;

我重新指定了一個(gè)消費(fèi)組(group.id=order_consumer_group),然后指定auto-offset-reset=latest這樣我就只需要重啟我的服務(wù)了,而不需要?jiǎng)觡afka和zookeeper了!

?

#consumer spring.kafka.consumer.group-id=order_consumer_group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消費(fèi)者從頭開(kāi)始消費(fèi)某個(gè)topic的全量數(shù)據(jù),可以重新指定一個(gè)全新的group.id=new_group,然后指定auto-offset-reset=earliest即可

?

補(bǔ)充:

在kafka0.9.0版本的時(shí)候,開(kāi)始啟用了新的consumer config,這個(gè)新的consumer config采用bootstrap.servers替代之前版本的zookeeper.connect,主要是要漸漸弱化zk的依賴,把zk依賴隱藏到broker背后。
group coordinator
使用bootstrap.servers替代之前版本的zookeeper.connect,相關(guān)的有如下兩個(gè)改動(dòng):

1.在 Server 端增加了 GroupCoordinator 這個(gè)角色 2.將 topic 的 offset 信息由之前存儲(chǔ)在 zookeeper(/consumers/<group.id>/offsets/<topic>/<partitionId>,zk寫操作性能不高) 上改為存儲(chǔ)到一個(gè)特殊的 topic 中(__consumer_offsets)

從0.8.2版本開(kāi)始Kafka開(kāi)始支持將consumer的位移信息保存在Kafka內(nèi)部的topic中(從0.9.0版本開(kāi)始默認(rèn)將offset存儲(chǔ)到系統(tǒng)topic中)
Coordinator一般指的是運(yùn)行在broker上的group Coordinator,用于管理Consumer Group中各個(gè)成員,每個(gè)KafkaServer都有一個(gè)GroupCoordinator實(shí)例,管理多個(gè)消費(fèi)者組,主要用于offset位移管理和Consumer Rebalance。
rebalance時(shí)機(jī)
在如下條件下,partition要在consumer中重新分配:

條件1:有新的consumer加入 條件2:舊的consumer掛了 條件3:coordinator掛了,集群選舉出新的coordinator 條件4:topic的partition新加 條件5:consumer調(diào)用unsubscrible(),取消topic的訂閱

__consumer_offsets
Consumer通過(guò)發(fā)送OffsetCommitRequest請(qǐng)求到指定broker(偏移量管理者)提交偏移量。這個(gè)請(qǐng)求中包含一系列分區(qū)以及在這些分區(qū)中的消費(fèi)位置(偏移量)。偏移量管理者會(huì)追加鍵值(key-value)形式的消息到一個(gè)指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition組成的,而value是偏移量。

?

參考:https://segmentfault.com/a/1190000011441747

總結(jié)

以上是生活随笔為你收集整理的kafka重复消费问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。