Kafka一些参数配置
Producer消息發(fā)送
producer.send(msg); // 用類(lèi)似這樣的方式去發(fā)送消息,就會(huì)把消息給你均勻的分布到各個(gè)分區(qū)上去
producer.send(key, msg); // 訂單id,或者是用戶id,他會(huì)根據(jù)這個(gè)key的hash值去分發(fā)到某個(gè)分區(qū)上去,他可以保證相同的key會(huì)路由分發(fā)到同一個(gè)分區(qū)上去。
每次發(fā)送消息都必須先把數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象,里面包含了要發(fā)送的topic,具體在哪個(gè)分區(qū),分區(qū)key,消息內(nèi)容,timestamp時(shí)間戳,然后這個(gè)對(duì)象交給序列化器,變成自定義協(xié)議格式的數(shù)據(jù),接著把數(shù)據(jù)交給partitioner分區(qū)器,對(duì)這個(gè)數(shù)據(jù)選擇合適的分區(qū),默認(rèn)就輪詢所有分區(qū),或者根據(jù)key來(lái)hash路由到某個(gè)分區(qū),這個(gè)topic的分區(qū)信息,都是在客戶端會(huì)有緩存的,當(dāng)然會(huì)提前跟broker去獲取。接著這個(gè)數(shù)據(jù)會(huì)被發(fā)送到producer內(nèi)部的一塊緩沖區(qū)里,然后producer內(nèi)部有一個(gè)Sender線程,會(huì)從緩沖區(qū)里提取消息封裝成一個(gè)一個(gè)的batch,然后每個(gè)batch發(fā)送給分區(qū)的leader副本所在的broker。
常見(jiàn)異常處理
常見(jiàn)的異常如下:
1)LeaderNotAvailableException:某臺(tái)機(jī)器掛了,此時(shí)leader副本不可用,會(huì)導(dǎo)致你寫(xiě)入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫(xiě)入,此時(shí)可以重試發(fā)送即可。
2)NotControllerException:這個(gè)也是同理,如果說(shuō)Controller所在Broker掛了,那么此時(shí)會(huì)有問(wèn)題,需要等待Controller重新選舉,此時(shí)也是一樣就是重試即可
3)NetworkException:網(wǎng)絡(luò)異常,重試即可
參數(shù):retries 默認(rèn)值是3
參數(shù):retry.backoff.ms 兩次重試之間的時(shí)間間隔
提升消息吞吐量
1)buffer.memory:設(shè)置發(fā)送消息的緩沖區(qū),默認(rèn)值是33554432,就是32MB
如果發(fā)送消息出去的速度小于寫(xiě)入消息進(jìn)去的速度,就會(huì)導(dǎo)致緩沖區(qū)寫(xiě)滿,此時(shí)生產(chǎn)消息就會(huì)阻塞住,所以說(shuō)這里就應(yīng)該多做一些壓測(cè),盡可能保證說(shuō)這塊緩沖區(qū)不會(huì)被寫(xiě)滿導(dǎo)致生產(chǎn)行為被阻塞住
producer.send(record, new Callback() {
@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息發(fā)送成功System.out.println("消息發(fā)送成功"); } else {// 消息發(fā)送失敗,需要重新發(fā)送}}});Long endTime=System.currentTime();If(endTime - startTime > 100){//說(shuō)明內(nèi)存被壓滿了說(shuō)明有問(wèn)題}
2)compression.type,默認(rèn)是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大producer端的cpu開(kāi)銷(xiāo)
3)batch.size,設(shè)置meigebatch的大小,如果batch太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請(qǐng)求,吞吐量下降;如果batch太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會(huì)讓內(nèi)存緩沖區(qū)有很大壓力,過(guò)多數(shù)據(jù)緩沖在內(nèi)存里
默認(rèn)值是:16384,就是16kb,也就是一個(gè)batch滿了16kb就發(fā)送出去,一般在實(shí)際生產(chǎn)環(huán)境,這個(gè)batch的值可以增大一些來(lái)提升吞吐量,可以自己壓測(cè)一下
4)linger.ms,這個(gè)值默認(rèn)是0,意思就是消息必須立即被發(fā)送,但是這是不對(duì)的,一般設(shè)置一個(gè)100毫秒之類(lèi)的,這樣的話就是說(shuō),這個(gè)消息被發(fā)送出去后進(jìn)入一個(gè)batch,如果100毫秒內(nèi),這個(gè)batch滿了16kb,自然就會(huì)發(fā)送出去。但是如果100毫秒內(nèi),batch沒(méi)滿,那么也必須把消息發(fā)送出去了,不能讓消息的發(fā)送延遲時(shí)間太長(zhǎng),也避免給內(nèi)存造成過(guò)大的一個(gè)壓力。
請(qǐng)求超時(shí)
1)max.request.size:這個(gè)參數(shù)用來(lái)控制發(fā)送出去的消息的大小,默認(rèn)是1048576字節(jié),也就1mb,這個(gè)一般太小了,很多消息可能都會(huì)超過(guò)1mb的大小,所以需要自己優(yōu)化調(diào)整,把他設(shè)置更大一些(企業(yè)一般設(shè)置成10M)
2)request.timeout.ms:這個(gè)就是說(shuō)發(fā)送一個(gè)請(qǐng)求出去之后,他有一個(gè)超時(shí)的時(shí)間限制,默認(rèn)是30秒,如果30秒都收不到響應(yīng),那么就會(huì)認(rèn)為異常,會(huì)拋出一個(gè)TimeoutException來(lái)讓我們進(jìn)行處理
ACK參數(shù)
acks參數(shù),其實(shí)是控制發(fā)送出去的消息的持久化機(jī)制的
1)如果acks=0,那么producer根本不管寫(xiě)入broker的消息到底成功沒(méi)有,發(fā)送一條消息出去,立馬就可以發(fā)送下一條消息,這是吞吐量最高的方式,但是可能消息都丟失了,你也不知道的,但是說(shuō)實(shí)話,你如果真是那種實(shí)時(shí)數(shù)據(jù)流分析的業(yè)務(wù)和場(chǎng)景,就是僅僅分析一些數(shù)據(jù)報(bào)表,丟幾條數(shù)據(jù)影響不大的。會(huì)讓你的發(fā)送吞吐量會(huì)提升很多,你發(fā)送弄一個(gè)batch出,不需要等待人家leader寫(xiě)成功,直接就可以發(fā)送下一個(gè)batch了,吞吐量很大的,哪怕是偶爾丟一點(diǎn)點(diǎn)數(shù)據(jù),實(shí)時(shí)報(bào)表,折線圖,餅圖。
2)acks=all,或者acks=-1:這個(gè)leader寫(xiě)入成功以后,必須等待其他ISR中的副本都寫(xiě)入成功,才可以返回響應(yīng)說(shuō)這條消息寫(xiě)入成功了,此時(shí)你會(huì)收到一個(gè)回調(diào)通知
3)acks=1:只要leader寫(xiě)入成功,就認(rèn)為消息成功了,默認(rèn)給這個(gè)其實(shí)就比較合適的,還是可能會(huì)導(dǎo)致數(shù)據(jù)丟失的,如果剛寫(xiě)入leader,leader就掛了,此時(shí)數(shù)據(jù)必然丟了,其他的follower沒(méi)收到數(shù)據(jù)副本,變成leader
如果要想保證數(shù)據(jù)不丟失,得如下設(shè)置:
a)min.insync.replicas = 2,ISR里必須有2個(gè)副本,一個(gè)leader和一個(gè)follower,最最起碼的一個(gè),不能只有一個(gè)leader存活,連一個(gè)follower都沒(méi)有了
b)acks = -1,每次寫(xiě)成功一定是leader和follower都成功才可以算做成功,leader掛了,follower上是一定有這條數(shù)據(jù),不會(huì)丟失
c) retries = Integer.MAX_VALUE,無(wú)限重試,如果上述兩個(gè)條件不滿足,寫(xiě)入一直失敗,就會(huì)無(wú)限次重試,保證說(shuō)數(shù)據(jù)必須成功的發(fā)送給兩個(gè)副本,如果做不到,就不停的重試,除非是面向金融級(jí)的場(chǎng)景,面向企業(yè)大客戶,或者是廣告計(jì)費(fèi),跟錢(qián)的計(jì)算相關(guān)的場(chǎng)景下,才會(huì)通過(guò)嚴(yán)格配置保證數(shù)據(jù)絕對(duì)不丟失
重試亂序
消息重試是可能導(dǎo)致消息的亂序的,因?yàn)榭赡芘旁谀愫竺娴南⒍及l(fā)送出去了,你現(xiàn)在收到回調(diào)失敗了才在重試,此時(shí)消息就會(huì)亂序,所以可以使用“max.in.flight.requests.per.connection”參數(shù)設(shè)置為1,這樣可以保證producer同一時(shí)間只能發(fā)送一條消息
Consumer架構(gòu)
Offset管理
每個(gè)consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對(duì)每個(gè)topic的每個(gè)分區(qū)的消費(fèi)offset,定期會(huì)提交offset,老版本是寫(xiě)入zk,但是那樣高并發(fā)請(qǐng)求zk是不合理的架構(gòu)設(shè)計(jì),zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級(jí)的元數(shù)據(jù)存儲(chǔ),不能負(fù)責(zé)高并發(fā)讀寫(xiě),作為數(shù)據(jù)存儲(chǔ)。所以后來(lái)就是提交offset發(fā)送給內(nèi)部topic:__consumer_offsets,提交過(guò)去的時(shí)候,key是group.id+topic+分區(qū)號(hào),value就是當(dāng)前offset的值,每隔一段時(shí)間,kafka內(nèi)部會(huì)對(duì)這個(gè)topic進(jìn)行compact。也就是每個(gè)group.id+topic+分區(qū)號(hào)就保留最新的那條數(shù)據(jù)即可。而且因?yàn)檫@個(gè) __consumer_offsets可能會(huì)接收高并發(fā)的請(qǐng)求,所以默認(rèn)分區(qū)50個(gè),這樣如果你的kafka部署了一個(gè)大的集群,比如有50臺(tái)機(jī)器,就可以用50臺(tái)機(jī)器來(lái)抗offset提交的請(qǐng)求壓力,就好很多。
Coordinator
Coordinator的作用
每個(gè)consumer group都會(huì)選擇一個(gè)broker作為自己的coordinator,他是負(fù)責(zé)監(jiān)控這個(gè)消費(fèi)組里的各個(gè)消費(fèi)者的心跳,以及判斷是否宕機(jī),然后開(kāi)啟rebalance,
根據(jù)內(nèi)部的一個(gè)選擇機(jī)制,會(huì)挑選一個(gè)對(duì)應(yīng)的Broker,Kafka總會(huì)把你的各個(gè)消費(fèi)組均勻分配給各個(gè)Broker作為coordinator來(lái)進(jìn)行管理的,consumer group中的每個(gè)consumer剛剛啟動(dòng)就會(huì)跟選舉出來(lái)的這個(gè)consumer group對(duì)應(yīng)的coordinator所在的broker進(jìn)行通信,然后由coordinator分配分區(qū)給你的這個(gè)consumer來(lái)進(jìn)行消費(fèi)。coordinator會(huì)盡可能均勻的分配分區(qū)給各個(gè)consumer來(lái)消費(fèi)。
如何選擇哪臺(tái)是coordinator
首先對(duì)消費(fèi)組的groupId進(jìn)行hash,接著對(duì)__consumer_offsets的分區(qū)數(shù)量取模,默認(rèn)是50,可以通過(guò)offsets.topic.num.partitions來(lái)設(shè)置,找到你的這個(gè)consumer group的offset要提交到__consumer_offsets的哪個(gè)分區(qū)。比如說(shuō):groupId,“membership-consumer-group” -> hash值(數(shù)字)-> 對(duì)50取模 -> 就知道這個(gè)consumer group下的所有的消費(fèi)者提交offset的時(shí)候是往哪個(gè)分區(qū)去提交offset,找到__consumer_offsets的一個(gè)分區(qū),__consumer_offset的分區(qū)的副本數(shù)量默認(rèn)來(lái)說(shuō)1,只有一個(gè)leader,然后對(duì)這個(gè)分區(qū)找到對(duì)應(yīng)的leader所在的broker,這個(gè)broker就是這個(gè)consumer group的coordinator了,consumer接著就會(huì)維護(hù)一個(gè)Socket連接跟這個(gè)Broker進(jìn)行通信。
Rebalance策略
比如我們消費(fèi)的一個(gè)主題有12個(gè)分區(qū):
p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假設(shè)我們的消費(fèi)者組里面有三個(gè)消費(fèi)者
1.range策略
range策略就是按照partiton的序號(hào)范圍
p0~3 consumer1
p4~7 consumer2
p8~11 consumer3
默認(rèn)就是這個(gè)策略;
2.round-robin策略
consumer1:0,3,6,9
consumer2:1,4,7,10
consumer3:2,5,8,11
但是前面的這兩個(gè)方案有個(gè)問(wèn)題:
假設(shè)consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3
這樣的話,原本在consumer2上的的p6,p7分區(qū)就被分配到了 consumer3上。
3.sticky策略
最新的一個(gè)sticky策略,就是說(shuō)盡可能保證在rebalance的時(shí)候,讓原本屬于這個(gè)consumer
的分區(qū)還是屬于他們,
然后把多余的分區(qū)再均勻分配過(guò)去,這樣盡可能維持原來(lái)的分區(qū)分配的策略
consumer1:0-3
consumer2: 4-7
consumer3: 8-11
假設(shè)consumer3掛了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11
Rebalance分代機(jī)制
在rebalance的時(shí)候,可能你本來(lái)消費(fèi)了partition3的數(shù)據(jù),結(jié)果有些數(shù)據(jù)消費(fèi)了還沒(méi)提交offset,結(jié)果此時(shí)rebalance,把partition3分配給了另外一個(gè)cnosumer了,此時(shí)你如果提交partition3的數(shù)據(jù)的offset,能行嗎?必然不行,所以每次rebalance會(huì)觸發(fā)一次consumer group generation,分代,每次分代會(huì)加1,然后你提交上一個(gè)分代的offset是不行的,那個(gè)partiton可能已經(jīng)不屬于你了,大家全部按照新的partiton分配方案重新消費(fèi)數(shù)據(jù)。
Consumer核心參數(shù)
【heartbeat.interval.ms】
consumer心跳時(shí)間,必須得保持心跳才能知道consumer是否故障了,然后如果故障之后,就會(huì)通過(guò)心跳下發(fā)rebalance的指令給其他的consumer通知他們進(jìn)行rebalance的操作
【session.timeout.ms】
kafka多長(zhǎng)時(shí)間感知不到一個(gè)consumer就認(rèn)為他故障了,默認(rèn)是10秒
【max.poll.interval.ms】
如果在兩次poll操作之間,超過(guò)了這個(gè)時(shí)間,那么就會(huì)認(rèn)為這個(gè)consume處理能力太弱了,會(huì)被踢出消費(fèi)組,分區(qū)分配給別人去消費(fèi),一遍來(lái)說(shuō)結(jié)合你自己的業(yè)務(wù)處理的性能來(lái)設(shè)置就可以了
【fetch.max.bytes】
獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些
【max.poll.records】
一次poll返回消息的最大條數(shù),默認(rèn)是500條
【connection.max.idle.ms】
consumer跟broker的socket連接如果空閑超過(guò)了一定的時(shí)間,此時(shí)就會(huì)自動(dòng)回收連接,但是下次消費(fèi)就要重新建立socket連接,這個(gè)建議設(shè)置為-1,不要去回收
【auto.offset.reset】
earliest
當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
topica -> partition0:1000
partitino1:2000
latest
當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從當(dāng)前位置開(kāi)始消費(fèi)
none
topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
【enable.auto.commit】
這個(gè)就是開(kāi)啟自動(dòng)提交唯一
【auto.commit.ineterval.ms】
這個(gè)指的是多久條件一次偏移量
總結(jié)
以上是生活随笔為你收集整理的Kafka一些参数配置的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 阿里京东滴滴等大厂面试题汇总
- 下一篇: TextRank、BM25算法提取关键字