Kafka参数详解及调优--生产者
在實(shí)際的kafka開(kāi)發(fā)中,我們會(huì)發(fā)現(xiàn),無(wú)論是生產(chǎn)者還是消費(fèi)者,都需要構(gòu)建一個(gè)Properties對(duì)象,里面設(shè)置了很多參數(shù)。對(duì)于很多初學(xué)者來(lái)說(shuō),會(huì)看不懂這些參數(shù)分別代表什么含義。
在本篇文章我們就來(lái)詳細(xì)地了解一下這些參數(shù)的作用,并探討下如何使用合理的配置去優(yōu)化提高生產(chǎn)/消費(fèi)效率
kafka生產(chǎn)者參數(shù)
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.137.200:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> procuder = new KafkaProducer<String,String>(props);在這段代碼中有很多常用的參數(shù)配置,在線上使用時(shí),我們要根據(jù)實(shí)際的數(shù)據(jù)量和數(shù)據(jù)大小來(lái)決定這些配置的具體值。下面來(lái)挑出其中比較重要的幾個(gè)參數(shù)來(lái)詳細(xì)解析一下。
1 bootstrap.servers
host/port列表,用于初始化建立和Kafka集群的連接。列表格式為host1:port1,host2:port2,....,無(wú)需添加所有的集群地址,kafka會(huì)根據(jù)提供的地址發(fā)現(xiàn)其他的地址(你可以多提供幾個(gè),以防提供的服務(wù)器關(guān)閉)
2 acks
生產(chǎn)者需要leader確認(rèn)請(qǐng)求完成之前接收的應(yīng)答數(shù)。此配置控制了發(fā)送消息的耐用性,支持以下配置:
- acks=0 如果設(shè)置為0,那么生產(chǎn)者將不等待任何消息確認(rèn)。消息將立刻添加到socket緩沖區(qū)并考慮發(fā)送。在這種情況下不能保障消息被服務(wù)器接收到。并且重試機(jī)制不會(huì)生效(因?yàn)榭蛻?hù)端不知道故障了沒(méi)有)。每個(gè)消息返回的offset始終設(shè)置為-1。
- acks=1,這意味著leader寫(xiě)入消息到本地日志就立即響應(yīng),而不等待所有follower應(yīng)答。在這種情況下,如果響應(yīng)消息之后但follower還未復(fù)制之前l(fā)eader立即故障,那么消息將會(huì)丟失。
- acks=all 這意味著leader將等待所有副本同步后應(yīng)答消息。此配置保障消息不會(huì)丟失(只要至少有一個(gè)同步的副本)。這是最強(qiáng)壯的可用性保障。等價(jià)于acks=-1。
3 buffer.memory
生產(chǎn)者用來(lái)緩存等待發(fā)送到服務(wù)器的消息的內(nèi)存總字節(jié)數(shù)。如果消息發(fā)送比可傳遞到服務(wù)器的快,生產(chǎn)者將阻塞max.block.ms之后,拋出異常。
此設(shè)置應(yīng)該大致的對(duì)應(yīng)生產(chǎn)者將要使用的總內(nèi)存,但不是硬約束,因?yàn)樯a(chǎn)者所使用的所有內(nèi)存都用于緩沖。一些額外的內(nèi)存將用于壓縮(如果啟動(dòng)壓縮),以及用于保持發(fā)送中的請(qǐng)求。
首先要明確一點(diǎn),那就是在內(nèi)存緩沖里大量的消息會(huì)緩沖在里面,形成一個(gè)一個(gè)的Batch,每個(gè)Batch里包含多條消息。然后KafkaProducer有一個(gè)Sender線程會(huì)把多個(gè)Batch打包成一個(gè)Request發(fā)送到Kafka服務(wù)器上去。 那么如果要是內(nèi)存設(shè)置的太小,可能導(dǎo)致一個(gè)問(wèn)題:消息快速的寫(xiě)入內(nèi)存緩沖里面,但是Sender線程來(lái)不及把Request發(fā)送到Kafka服務(wù)器。這樣是不是會(huì)造成內(nèi)存緩沖很快就被寫(xiě)滿(mǎn)?一旦被寫(xiě)滿(mǎn),就會(huì)阻塞用戶(hù)線程,不讓繼續(xù)往Kafka寫(xiě)消息了。
所以對(duì)于“buffer.memory”這個(gè)參數(shù)應(yīng)該結(jié)合自己的實(shí)際情況來(lái)進(jìn)行壓測(cè),你需要測(cè)算一下在生產(chǎn)環(huán)境,你的用戶(hù)線程會(huì)以每秒多少消息的頻率來(lái)寫(xiě)入內(nèi)存緩沖。比如說(shuō)每秒300條消息,那么你就需要壓測(cè)一下,假設(shè)內(nèi)存緩沖就32MB,每秒寫(xiě)300條消息到內(nèi)存緩沖,是否會(huì)經(jīng)常把內(nèi)存緩沖寫(xiě)滿(mǎn)?經(jīng)過(guò)這樣的壓測(cè),你可以調(diào)試出來(lái)一個(gè)合理的內(nèi)存大小。
4 batch.size
當(dāng)多個(gè)消息要發(fā)送到相同分區(qū)的時(shí),生產(chǎn)者嘗試將消息批量打包在一起,以減少請(qǐng)求交互。這樣有助于客戶(hù)端和服務(wù)端的性能提升。該配置的默認(rèn)批次大小(以字節(jié)為單位):
不會(huì)打包大于此配置大小的消息。
發(fā)送到broker的請(qǐng)求將包含多個(gè)批次,每個(gè)分區(qū)一個(gè),用于發(fā)送數(shù)據(jù)。
較小的批次大小有可能降低吞吐量(批次大小為0則完全禁用批處理)。一個(gè)非常大的批次大小可能更浪費(fèi)內(nèi)存。因?yàn)槲覀儠?huì)預(yù)先分配這個(gè)資源。
比如說(shuō)發(fā)送消息的頻率就是每秒300條,那么如果比如“batch.size”調(diào)節(jié)到了32KB,或者64KB,是否可以提升發(fā)送消息的整體吞吐量。
因?yàn)槔碚撋蟻?lái)說(shuō),提升batch的大小,可以允許更多的數(shù)據(jù)緩沖在里面,那么一次Request發(fā)送出去的數(shù)據(jù)量就更多了,這樣吞吐量可能會(huì)有所提升。
但是這個(gè)東西也不能無(wú)限的大,過(guò)于大了之后,要是數(shù)據(jù)老是緩沖在Batch里遲遲不發(fā)送出去,那么豈不是你發(fā)送消息的延遲就會(huì)很高。
比如說(shuō),一條消息進(jìn)入了Batch,但是要等待5秒鐘Batch才湊滿(mǎn)了64KB,才能發(fā)送出去。那這條消息的延遲就是5秒鐘。
所以需要在這里按照生產(chǎn)環(huán)境的發(fā)消息的速率,調(diào)節(jié)不同的Batch大小自己測(cè)試一下最終出去的吞吐量以及消息的 延遲,設(shè)置一個(gè)最合理的參數(shù)。
5 compression.type
數(shù)據(jù)壓縮的類(lèi)型。默認(rèn)為空(就是不壓縮)。有效的值有 none,gzip,snappy, 或 lz4。壓縮全部的數(shù)據(jù)批,因此批的效果也將影響壓縮的比率(更多的批次意味著更好的壓縮)。
6 retries
設(shè)置一個(gè)比零大的值,客戶(hù)端如果發(fā)送失敗則會(huì)重新發(fā)送。注意,這個(gè)重試功能和客戶(hù)端在接到錯(cuò)誤之后重新發(fā)送沒(méi)什么不同。如果max.in.flight.requests.per.connection沒(méi)有設(shè)置為1,有可能改變消息發(fā)送的順序,因?yàn)槿绻?個(gè)批次發(fā)送到一個(gè)分區(qū)中,并第一個(gè)失敗了并重試,但是第二個(gè)成功了,那么第二個(gè)批次將超過(guò)第一個(gè)。
“retries”和“retries.backoff.ms”決定了重試機(jī)制,也就是如果一個(gè)請(qǐng)求失敗了可以重試幾次,每次重試的間隔是多少毫秒。這個(gè)大家適當(dāng)設(shè)置幾次重試的機(jī)會(huì),給一定的重試間隔即可,比如給100ms的重試間隔。
7 client.id
當(dāng)發(fā)出請(qǐng)求時(shí)傳遞給服務(wù)器的id字符串。這樣做的目的是允許服務(wù)器請(qǐng)求記錄記錄這個(gè)【邏輯應(yīng)用名】,這樣能夠追蹤請(qǐng)求的源,而不僅僅只是ip/prot
8 linger.ms
生產(chǎn)者組將發(fā)送的消息組合成單個(gè)批量請(qǐng)求。正常情況下,只有消息到達(dá)的速度比發(fā)送速度快的情況下才會(huì)出現(xiàn)。但是,在某些情況下,即使在適度的負(fù)載下,客戶(hù)端也可能希望減少請(qǐng)求數(shù)量。此設(shè)置通過(guò)添加少量人為延遲來(lái)實(shí)現(xiàn)。- 也就是說(shuō),不是立即發(fā)出一個(gè)消息,生產(chǎn)者將等待一個(gè)給定的延遲,以便和其他的消息可以組合成一個(gè)批次。這類(lèi)似于Nagle在TCP中的算法。此設(shè)置給出批量延遲的上限:一旦我們達(dá)到分區(qū)的batch.size值的記錄,將立即發(fā)送,不管這個(gè)設(shè)置如何,但是,如果比這個(gè)小,我們將在指定的“l(fā)inger”時(shí)間內(nèi)等待更多的消息加入。此設(shè)置默認(rèn)為0(即無(wú)延遲)。假設(shè),設(shè)置 linger.ms=5,將達(dá)到減少發(fā)送的請(qǐng)求數(shù)量的效果,但對(duì)于在沒(méi)有負(fù)載情況,將增加5ms的延遲。
舉個(gè)例子:首先假設(shè)你的Batch是32KB,那么你得估算一下,正常情況下,一般多久會(huì)湊夠一個(gè)Batch,比如正常來(lái)說(shuō)可能20ms就會(huì)湊夠一個(gè)Batch。
那么你的linger.ms就可以設(shè)置為25ms,也就是說(shuō),正常來(lái)說(shuō),大部分的Batch在20ms內(nèi)都會(huì)湊滿(mǎn),但是你的linger.ms可以保證,哪怕遇到低峰時(shí)期,20ms湊不滿(mǎn)一個(gè)Batch,還是會(huì)在25ms之后強(qiáng)制Batch發(fā)送出去。
如果要是你把linger.ms設(shè)置的太小了,比如說(shuō)默認(rèn)就是0ms,或者你設(shè)置個(gè)5ms,那可能導(dǎo)致你的Batch雖然設(shè)置了32KB,但是經(jīng)常是還沒(méi)湊夠32KB的數(shù)據(jù),5ms之后就直接強(qiáng)制Batch發(fā)送出去,這樣也不太好其實(shí),會(huì)導(dǎo)致你的Batch形同虛設(shè),一直湊不滿(mǎn)數(shù)據(jù)。
9 max.request.size
請(qǐng)求的最大大小(以字節(jié)為單位)。此設(shè)置將限制生產(chǎn)者的單個(gè)請(qǐng)求中發(fā)送的消息批次數(shù),以避免發(fā)送過(guò)大的請(qǐng)求。這也是最大消息批量大小的上限。請(qǐng)注意,服務(wù)器擁有自己的批量大小,可能與此不同。
10 max.block.ms
該配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 將阻塞多長(zhǎng)時(shí)間。此外這些方法被阻止,也可能是因?yàn)榫彌_區(qū)已滿(mǎn)或元數(shù)據(jù)不可用。在用戶(hù)提供的序列化程序或分區(qū)器中的鎖定不會(huì)計(jì)入此超時(shí)。默認(rèn)為60000ms。
總結(jié)
以上是生活随笔為你收集整理的Kafka参数详解及调优--生产者的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 程序员,你想不想进大厂?
- 下一篇: 62岁程序员植入逻辑炸弹, 面临10年监