Kafka调优
配置參數(shù)調(diào)優(yōu)
broker配置
num.recovery.threads.per.data.dir
對于如下 3 種情況,Kafka 會(huì)使用可配置的線程池來處理日志片段:
- 服務(wù)器正常啟動(dòng),用于打開每個(gè)分區(qū)的日志片段;
- 服務(wù)器崩潰后重啟,用于檢查和截短每個(gè)分區(qū)的日志片段;
- 服務(wù)器正常關(guān)閉,用于關(guān)閉日志片段。
默認(rèn)情況下,每個(gè)日志目錄只使用一個(gè)線程。因?yàn)檫@些線程只是在服務(wù)器啟動(dòng)和關(guān)閉時(shí)會(huì)用到,所以完全可以設(shè)置大量的線程來達(dá)到并行操作的目的。特別是對于包含大量分區(qū)的服務(wù)器來說,一旦發(fā)生崩潰,在進(jìn)行恢復(fù)時(shí)使用并行操作可能會(huì)省下數(shù)小時(shí)的時(shí)間。設(shè)置此參數(shù)時(shí)需要注意,所配置的數(shù)字對應(yīng)的是 log.dirs 指定的單個(gè)日志目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設(shè)為 8,并且 log.dir 指定了 3 個(gè)路徑,那么總共需要 24 個(gè)線程。
主題的默認(rèn)配置
num.partitions
num.partitions 參數(shù)指定了新創(chuàng)建的主題將包含多少個(gè)分區(qū)。如果啟用了主題自動(dòng)創(chuàng)建功能(該功能默認(rèn)是啟用的),主題分區(qū)的個(gè)數(shù)就是該參數(shù)指定的值。該參數(shù)的默認(rèn)值是 1。Kafka 集群通過分區(qū)對主題進(jìn)行橫向擴(kuò)展,所以當(dāng)有新的 broker 加入集群時(shí),可以通過分區(qū)個(gè)數(shù)來實(shí)現(xiàn)集群的負(fù)載均衡。擁有大量消息的主題如果要進(jìn)行負(fù)載分散,就需要大量的分區(qū)。
如何選定分區(qū)數(shù)量
你需要很多分區(qū),但不能太多。因?yàn)榉謪^(qū)越多,占用的內(nèi)存越多,完成首領(lǐng)選舉需要的時(shí)間也越長。如果你估算出主題的吞吐量和消費(fèi)者吞吐量,可以用主題吞吐量除以消費(fèi)者吞吐量算出分區(qū)的個(gè)數(shù)。也就是說,如果每秒鐘要從主題上寫入和讀取 1GB 的數(shù)據(jù),并且每個(gè)消費(fèi)者每秒鐘可以處理 50MB 的數(shù)據(jù),那么至少需要 20 個(gè)分區(qū)。這樣就可以讓 20 個(gè)消費(fèi)者同時(shí)讀取這些分區(qū),從而達(dá)到每秒鐘 1GB 的吞吐量。
log.retention.ms
Kafka 通常根據(jù)時(shí)間來決定數(shù)據(jù)可以被保留多久。默認(rèn)使用 log.retention.hours 參數(shù)來配置時(shí)間,默認(rèn)值為 168 小時(shí),也就是一周。除此以外,還有其他兩個(gè)參數(shù) log.retention.minutes 和?log.retention.ms?。這 3 個(gè)參數(shù)的作用是一樣的,都是決定消息多久以后會(huì)被刪除,不過還是推薦使用?log.retention.ms?。如果指定了不止一個(gè)參數(shù),Kafka 會(huì)優(yōu)先使用具有最小值的那個(gè)參數(shù)。
log.retention.bytes
另一種方式是通過保留的消息字節(jié)數(shù)來判斷消息是否過期。它的值通過參數(shù) log.retention.bytes 來指定,作用在每一個(gè)分區(qū)上。也就是說,如果有一個(gè)包含 8 個(gè)分區(qū)的主題,并且 log.retention.bytes 被設(shè)為 1GB,那么這個(gè)主題最多可以保留 8GB 的數(shù)據(jù)。所以,當(dāng)主題的分區(qū)個(gè)數(shù)增加時(shí),整個(gè)主題可以保留的數(shù)據(jù)也隨之增加。
根據(jù)字節(jié)大小和時(shí)間保留數(shù)據(jù)
如果同時(shí)指定了 log.retention.bytes 和?log.retention.ms?(或者另一個(gè)時(shí)間參數(shù)),只要任意一個(gè)條件得到滿足,消息就會(huì)被刪除。例如,假設(shè)?log.retention.ms?設(shè)置為 86 400 000(也就是 1 天),log.retention.bytes 設(shè)置為 1 000 000 000(也就是 1GB),如果消息字節(jié)總數(shù)在不到一天的時(shí)間就超過了 1GB,那么多出來的部分就會(huì)被刪除。相反,如果消息字節(jié)總數(shù)小于 1GB,那么一天之后這些消息也會(huì)被刪除,盡管分區(qū)的數(shù)據(jù)總量小于 1GB。
log.segment.bytes
以上的設(shè)置都作用在日志片段上,而不是作用在單個(gè)消息上。當(dāng)消息到達(dá) broker 時(shí),它們被追加到分區(qū)的當(dāng)前日志片段上。當(dāng)日志片段大小達(dá)到 log.segment.bytes 指定的上限(默認(rèn)是 1GB)時(shí),當(dāng)前日志片段就會(huì)被關(guān)閉,一個(gè)新的日志片段被打開。如果一個(gè)日志片段被關(guān)閉,就開始等待過期。這個(gè)參數(shù)的值越小,就會(huì)越頻繁地關(guān)閉和分配新文件,從而降低磁盤寫入的整體效率。
如果主題的消息量不大,那么如何調(diào)整這個(gè)參數(shù)的大小就變得尤為重要。例如,如果一個(gè)主題每天只接收 100MB 的消息,而 log.segment.bytes 使用默認(rèn)設(shè)置,那么需要 10 天時(shí)間才能填滿一個(gè)日志片段。因?yàn)樵谌罩酒伪魂P(guān)閉之前消息是不會(huì)過期的,所以如果?log.retention.ms?被設(shè)為 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才會(huì)過期。
這是因?yàn)殛P(guān)閉日志片段需要 10 天的時(shí)間,而根據(jù)配置的過期時(shí)間,還需要再保留 7 天時(shí)間(要等到日志片段里的最后一個(gè)消息過期才能被刪除)。
簡單來說,對于使用時(shí)間戳獲取偏移量的操作來說,日志片段越小,結(jié)果越準(zhǔn)確。(這句沒懂)
log.segment.ms
另一個(gè)可以控制日志片段關(guān)閉時(shí)間的參數(shù)是?log.segment.ms?,它指定了多長時(shí)間之后日志片段會(huì)被關(guān)閉。就像 log.retention.bytes 和?log.retention.ms?這兩個(gè)參數(shù)一樣,log.segment.bytes 和?log.retention.ms?這兩個(gè)參數(shù)之間也不存在互斥問題。日志片段會(huì)在大小或時(shí)間達(dá)到上限時(shí)被關(guān)閉,就看哪個(gè)條件先得到滿足。默認(rèn)情況下,log.segment.ms?沒有設(shè)定值,所以只根據(jù)大小來關(guān)閉日志片段。
在使用基于時(shí)間的日志片段時(shí),要著重考慮并行關(guān)閉多個(gè)日志片段對磁盤性能的影響。如果多個(gè)分區(qū)的日志片段永遠(yuǎn)不能達(dá)到大小的上限,就會(huì)發(fā)生這種情況,因?yàn)?broker 在啟動(dòng)之后就開始計(jì)算日志片段的過期時(shí)間,對于那些數(shù)據(jù)量小的分區(qū)來說,日志片段的關(guān)閉操作總是同時(shí)發(fā)生。
message.max.bytes
broker 通過設(shè)置 message.max.bytes 參數(shù)來限制單個(gè)消息的大小,默認(rèn)值是 1 000 000,也就是 1MB。如果生產(chǎn)者嘗試發(fā)送的消息超過這個(gè)大小,不僅消息不會(huì)被接收,還會(huì)收到 broker 返回的錯(cuò)誤信息。跟其他與字節(jié)相關(guān)的配置參數(shù)一樣,該參數(shù)指的是壓縮后的消息大小,也就是說,只要壓縮后的消息小于 message.max.bytes 指定的值,消息的實(shí)際大小可以遠(yuǎn)大于這個(gè)值。
這個(gè)值對性能有顯著的影響。值越大,那么負(fù)責(zé)處理網(wǎng)絡(luò)連接和請求的線程就需要花越多的時(shí)間來處理這些請求。它還會(huì)增加磁盤寫入塊的大小,從而影響 IO 吞吐量。
在服務(wù)端和客戶端之間協(xié)調(diào)消息大小的配置
消費(fèi)者客戶端設(shè)置的 fetch.message.max.bytes 必須與服務(wù)器端設(shè)置的消息大小進(jìn)行協(xié)調(diào)。如果這個(gè)值比 message.max.bytes 小,那么消費(fèi)者就無法讀取比較大的消息,導(dǎo)致出現(xiàn)消費(fèi)者被阻塞的情況。在為集群里的 broker 配置 replica.fetch.max.bytes 參數(shù)時(shí),也遵循同樣的原則。
kafka集群配置
需要多少個(gè)broker
一個(gè) Kafka 集群需要多少個(gè) broker 取決于以下幾個(gè)因素。首先,需要多少磁盤空間來保留數(shù)據(jù),以及單個(gè) broker 有多少空間可用。如果整個(gè)集群需要保留 10TB 的數(shù)據(jù),每個(gè) broker 可以存儲(chǔ) 2TB,那么至少需要 5 個(gè) broker。如果啟用了數(shù)據(jù)復(fù)制,那么至少還需要一倍的空間,不過這要取決于配置的復(fù)制系數(shù)是多少(將在第 6 章介紹)。也就是說,如果啟用了數(shù)據(jù)復(fù)制,那么這個(gè)集群至少需要 10 個(gè) broker。
第二個(gè)要考慮的因素是集群處理請求的能力。這通常與網(wǎng)絡(luò)接口處理客戶端流量的能力有關(guān),特別是當(dāng)有多個(gè)消費(fèi)者存在或者在數(shù)據(jù)保留期間流量發(fā)生波動(dòng)(比如高峰時(shí)段的流量爆發(fā))時(shí)。如果單個(gè) broker 的網(wǎng)絡(luò)接口在高峰時(shí)段可以達(dá)到 80% 的使用量,并且有兩個(gè)消費(fèi)者,那么消費(fèi)者就無法保持峰值,除非有兩個(gè) broker。如果集群啟用了復(fù)制功能,則要把這個(gè)額外的消費(fèi)者考慮在內(nèi)。因磁盤吞吐量低和系統(tǒng)內(nèi)存不足造成的性能問題,也可以通過擴(kuò)展多個(gè) broker 來解決。(這里沒看懂)
操作系統(tǒng)調(diào)優(yōu)
虛擬內(nèi)存
一般來說,Linux 的虛擬內(nèi)存會(huì)根據(jù)系統(tǒng)的工作負(fù)荷進(jìn)行自動(dòng)調(diào)整。我們可以對交換分區(qū)的處理方式和內(nèi)存臟頁進(jìn)行調(diào)整,從而讓 Kafka 更好地處理工作負(fù)載。對于大多數(shù)依賴吞吐量的應(yīng)用程序來說,要盡量避免內(nèi)存交換。內(nèi)存頁和磁盤之間的交換對 Kafka 各方面的性能都有重大影響。Kafka 大量地使用系統(tǒng)頁面緩存,如果虛擬內(nèi)存被交換到磁盤,說明已經(jīng)沒有多余內(nèi)存可以分配給頁面緩存了。
一種避免內(nèi)存交換的方法是不設(shè)置任何交換分區(qū)。內(nèi)存交換不是必需的,不過它確實(shí)能夠在系統(tǒng)發(fā)生災(zāi)難性錯(cuò)誤時(shí)提供一些幫助。進(jìn)行內(nèi)存交換可以防止操作系統(tǒng)由于內(nèi)存不足而突然終止進(jìn)程。基于上述原因,建議把 vm.swappiness 參數(shù)的值設(shè)置得小一點(diǎn),比如 1。該參數(shù)指明了虛擬機(jī)的子系統(tǒng)將如何使用交換分區(qū),而不是只把內(nèi)存頁從頁面緩存里移除。要優(yōu)先考慮減小頁面緩存,而不是進(jìn)行內(nèi)存交換。
臟頁會(huì)被沖刷到磁盤上,調(diào)整內(nèi)核對臟頁的處理方式可以讓我們從中獲益。Kafka 依賴 I/O 性能為生產(chǎn)者提供快速的響應(yīng)。這就是為什么日志片段一般要保存在快速磁盤上,不管是單個(gè)快速磁盤(如 SSD)還是具有 NVRAM 緩存的磁盤子系統(tǒng)(如 RAID)。這樣一來,在后臺(tái)刷新進(jìn)程將臟頁寫入磁盤之前,可以減少臟頁的數(shù)量,這個(gè)可以通過將 vm.dirty_background_ratio 設(shè)為小于 10 的值來實(shí)現(xiàn)。該值指的是系統(tǒng)內(nèi)存的百分比,大部分情況下設(shè)為 5 就可以了。它不應(yīng)該被設(shè)為 0,因?yàn)槟菢訒?huì)促使內(nèi)核頻繁地刷新頁面,從而降低內(nèi)核為底層設(shè)備的磁盤寫入提供緩沖的能力。
通過設(shè)置 vm.dirty_ratio 參數(shù)可以增加被內(nèi)核進(jìn)程刷新到磁盤之前的臟頁數(shù)量,可以將它設(shè)為大于 20 的值(這也是系統(tǒng)內(nèi)存的百分比)。這個(gè)值可設(shè)置的范圍很廣,60~80 是個(gè)比較合理的區(qū)間。不過調(diào)整這個(gè)參數(shù)會(huì)帶來一些風(fēng)險(xiǎn),包括未刷新磁盤操作的數(shù)量和同步刷新引起的長時(shí)間 I/O 等待。如果該參數(shù)設(shè)置了較高的值,建議啟用 Kafka 的復(fù)制功能,避免因系統(tǒng)崩潰造成數(shù)據(jù)丟失。
為了給這些參數(shù)設(shè)置合適的值,最好是在 Kafka 集群運(yùn)行期間檢查臟頁的數(shù)量,不管是在生存環(huán)境還是模擬環(huán)境。可以在 /proc/vmstat 文件里查看當(dāng)前臟頁數(shù)量。
這些都是可控制的選項(xiàng),根據(jù)工作負(fù)載和數(shù)據(jù),你可以決定如何設(shè)置它們:
$ sysctl -a | grep dirty vm.dirty_background_bytes = 0 vm.dirty_background_ratio = 10 vm.dirty_bytes = 0 vm.dirty_ratio = 20 vm.dirty_writeback_centisecs = 500 vm.dirty_expire_centisecs = 3000 vm.dirtytime_expire_seconds = 43200- vm.dirty_background_ratio?是內(nèi)存可以填充臟數(shù)據(jù)的百分比。這些臟數(shù)據(jù)稍后會(huì)寫入磁盤,后臺(tái)進(jìn)程會(huì)稍后清理臟數(shù)據(jù)。比如,我有32G內(nèi)存,那么有3.2G的臟數(shù)據(jù)可以待著內(nèi)存里,超過3.2G的話就會(huì)有后臺(tái)進(jìn)程來清理。
- vm.dirty_ratio是可以用臟數(shù)據(jù)填充的絕對最大系統(tǒng)內(nèi)存量,當(dāng)系統(tǒng)到達(dá)此點(diǎn)時(shí),必須將所有臟數(shù)據(jù)提交到磁盤,同時(shí)所有新的I/O塊都會(huì)被阻塞,直到臟數(shù)據(jù)被寫入磁盤。這通常是長I/O卡頓的原因,但這也是保證內(nèi)存中不會(huì)存在過量臟數(shù)據(jù)的保護(hù)機(jī)制。
- vm.dirty_background_bytes和vm.dirty_bytes是另一種指定這些參數(shù)的方法。如果設(shè)置_bytes版本,則_ratio版本將變?yōu)?,反之亦然。
- vm.dirty_expire_centisecs?指定臟數(shù)據(jù)能存活的時(shí)間。在這里它的值是30秒。當(dāng)?pdflush/flush/kdmflush?在運(yùn)行的時(shí)候,他們會(huì)檢查是否有數(shù)據(jù)超過這個(gè)時(shí)限,如果有則會(huì)把它異步地寫到磁盤中。畢竟數(shù)據(jù)在內(nèi)存里待太久也會(huì)有丟失風(fēng)險(xiǎn)。
- vm.dirty_writeback_centisecs?指定多長時(shí)間?pdflush/flush/kdmflush?這些進(jìn)程會(huì)喚醒一次,然后檢查是否有緩存需要清理。
可以通過下面方式看內(nèi)存中有多少臟數(shù)據(jù):一共有106頁的臟數(shù)據(jù):
$ cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 106 nr_writeback 0 nr_writeback_temp 0 nr_dirty_threshold 3934012 nr_dirty_background_threshold 1964604方法2:增加緩存
在某些情況下,顯著提高緩存對性能有積極的影響。在這些情況下,Linux客戶機(jī)上包含的數(shù)據(jù)不是關(guān)鍵的,可能會(huì)丟失,而且應(yīng)用程序通常會(huì)重復(fù)或以可重復(fù)的方式寫入相同的文件。理論上,通過允許內(nèi)存中存在更多臟頁,你將在緩存中一遍又一遍地重寫相同的塊,只需要每隔一段時(shí)間向?qū)嶋H磁盤寫一次。為此,我們提出了以下參數(shù):
vm.dirty_background_ratio = 50 vm.dirty_ratio = 80有時(shí)候還會(huì)提高vm.dirty_expire_centisecs?這個(gè)參數(shù)的值,來允許臟數(shù)據(jù)更長時(shí)間地停留。除了增加數(shù)據(jù)丟失的風(fēng)險(xiǎn)之外,如果緩存已滿并需要同步,還會(huì)有長時(shí)間I/O卡頓的風(fēng)險(xiǎn),因?yàn)樵诖笮吞摂M機(jī)緩存中有大量數(shù)據(jù)。
方法3:增減都用
有時(shí)候系統(tǒng)需要應(yīng)對突如其來的高峰數(shù)據(jù),它可能會(huì)拖慢磁盤。比如說:每小時(shí)或者午夜進(jìn)行批處理作業(yè)、在Raspberry Pi上寫SD卡等等。這種情況下,我們可以允許大量的寫I/O存儲(chǔ)在緩存中,這樣后臺(tái)刷新操作就可以慢慢異步處理它:
vm.dirty_background_ratio = 5 vm.dirty_ratio = 80這個(gè)時(shí)候,系統(tǒng)后臺(tái)進(jìn)程在臟數(shù)據(jù)達(dá)到5%時(shí)就開始異步清理,但在80%之前系統(tǒng)不會(huì)強(qiáng)制同步寫磁盤。在此基礎(chǔ)上,你只需要調(diào)整RAM和vm.dirty_ratio大小以便能緩存所有的寫數(shù)據(jù)。當(dāng)然,磁盤上的數(shù)據(jù)一致性也存在一定風(fēng)險(xiǎn)。
無論你選擇哪種方式,都應(yīng)該始終收集數(shù)據(jù)來支持你的更改,并幫助你確定是在改進(jìn)還是變得更糟。我們可以從應(yīng)用程序,/proc/vmstat,?/proc/meminfo,?iostat,?vmstat?以及/proc/sys/vm里面獲得大量有用信息。
為什么不把 vm.swappiness 設(shè)為零
先前,人們建議盡量把 vm.swapiness 設(shè)為 0,它意味著“除非發(fā)生內(nèi)存溢出,否則不要進(jìn)行內(nèi)存交換”。直到 Linux 內(nèi)核 3.5-rc1 版本發(fā)布,這個(gè)值的意義才發(fā)生了變化。這個(gè)變化被移植到其他的發(fā)行版上,包括 Red Hat 企業(yè)版內(nèi)核 2.6.32-303。在發(fā)生變化之后,0 意味著“在任何情況下都不要發(fā)生交換”。所以現(xiàn)在建議把這個(gè)值設(shè)為 1。
磁盤
除了選擇合適的磁盤硬件設(shè)備和使用 RAID 外,文件系統(tǒng)是影響性能的另一個(gè)重要因素。有很多種文件系統(tǒng)可供選擇,不過對于本地文件系統(tǒng)來說,EXT4(第四代可擴(kuò)展文件系統(tǒng))和 XFS 最為常見。近來,XFS 成為很多 Linux 發(fā)行版默認(rèn)的文件系統(tǒng),因?yàn)樗恍枰錾倭空{(diào)優(yōu)就可以承擔(dān)大部分的工作負(fù)荷,比 EXT4 具有更好的表現(xiàn)。EXT4 也可以做得很好,但需要做更多的調(diào)優(yōu),存在較大的風(fēng)險(xiǎn)。其中就包括設(shè)置更長的提交間隔(默認(rèn)是 5),以便降低刷新的頻率。EXT4 還引入了塊分配延遲,一旦系統(tǒng)崩潰,更容易造成數(shù)據(jù)丟失和文件系統(tǒng)毀壞。XFS 也使用了分配延遲算法,不過比 EXT4 的要安全些。XFS 為 Kafka 提供了更好的性能,除了由文件系統(tǒng)提供的自動(dòng)調(diào)優(yōu)之外,無需額外的調(diào)優(yōu)。批量磁盤寫入具有更高的效率,可以提升整體的 I/O 吞吐量。
不管使用哪一種文件系統(tǒng)來存儲(chǔ)日志片段,最好要對掛載點(diǎn)的 noatime 參數(shù)進(jìn)行合理的設(shè)置。文件元數(shù)據(jù)包含 3 個(gè)時(shí)間戳:創(chuàng)建時(shí)間(ctime)、最后修改時(shí)間(mtime)以及最后訪問時(shí)間(atime)。默認(rèn)情況下,每次文件被讀取后都會(huì)更新 atime,這會(huì)導(dǎo)致大量的磁盤寫操作,而且 atime 屬性的用處不大,除非某些應(yīng)用程序想要知道某個(gè)文件在最近一次修改后有沒有被訪問過(這種情況可以使用 realtime )。Kafka 用不到該屬性,所以完全可以把它禁用掉。為掛載點(diǎn)設(shè)置 noatime 參數(shù)可以防止更新 atime,但不會(huì)影響 ctime 和 mtime。
網(wǎng)絡(luò)
默認(rèn)情況下,系統(tǒng)內(nèi)核沒有針對快速的大流量網(wǎng)絡(luò)傳輸進(jìn)行優(yōu)化,所以對于應(yīng)用程序來說,一般需要對 Linux 系統(tǒng)的網(wǎng)絡(luò)棧進(jìn)行調(diào)優(yōu),以實(shí)現(xiàn)對大流量的支持。實(shí)際上,調(diào)整 Kafka 的網(wǎng)絡(luò)配置與調(diào)整其他大部分 Web 服務(wù)器和網(wǎng)絡(luò)應(yīng)用程序的網(wǎng)絡(luò)配置是一樣的。首先可以對分配給 socket 讀寫緩沖區(qū)的內(nèi)存大小作出調(diào)整,這樣可以顯著提升網(wǎng)絡(luò)的傳輸性能。socket 讀寫緩沖區(qū)對應(yīng)的參數(shù)分別是 net.core.wmem_default 和 net.core.rmem_default ,合理的值是 131 072(也就是 128KB)。讀寫緩沖區(qū)最大值對應(yīng)的參數(shù)分別是 net.core.wmem_max 和 net.core.rmem_max ,合理的值是 2 097 152(也就是 2MB)。要注意,最大值并不意味著每個(gè) socket 一定要有這么大的緩沖空間,只是說在必要的情況下才會(huì)達(dá)到這個(gè)值。
除了設(shè)置 socket 外,還需要設(shè)置 TCP socket 的讀寫緩沖區(qū),它們的參數(shù)分別是 net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem 。這些參數(shù)的值由 3 個(gè)整數(shù)組成,它們使用空格分隔,分別表示最小值、默認(rèn)值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“4096 65536 2048000”表示最小值是 4KB、默認(rèn)值是 64KB、最大值是 2MB。根據(jù) Kafka 服務(wù)器接收流量的實(shí)際情況,可能需要設(shè)置更高的最大值,為網(wǎng)絡(luò)連接提供更大的緩沖空間。
還有其他一些有用的網(wǎng)絡(luò)參數(shù)。例如,把 net.ipv4.tcp_window_scaling 設(shè)為 1,啟用 TCP 時(shí)間窗擴(kuò)展,可以提升客戶端傳輸數(shù)據(jù)的效率,傳輸?shù)臄?shù)據(jù)可以在服務(wù)器端進(jìn)行緩沖。把 net.ipv4.tcp_max_syn_backlog 設(shè)為比默認(rèn)值 1024 更大的值,可以接受更多的并發(fā)連接。把 net.core.netdev_max_backlog 設(shè)為比默認(rèn)值 1000 更大的值,有助于應(yīng)對網(wǎng)絡(luò)流量的爆發(fā),特別是在使用千兆網(wǎng)絡(luò)的情況下,允許更多的數(shù)據(jù)包排隊(duì)等待內(nèi)核處理。
垃圾回收器選項(xiàng)
為應(yīng)用程序調(diào)整 Java 垃圾回收參數(shù)就像是一門藝術(shù),我們需要知道應(yīng)用程序是如何使用內(nèi)存的,還需要大量的觀察和試錯(cuò)。幸運(yùn)的是,Java 7 為我們帶來了 G1 垃圾回收器,讓這種狀況有所改觀。在應(yīng)用程序的整個(gè)生命周期,G1 會(huì)自動(dòng)根據(jù)工作負(fù)載情況進(jìn)行自我調(diào)節(jié),而且它的停頓時(shí)間是恒定的。它可以輕松地處理大塊的堆內(nèi)存,把堆內(nèi)存分為若干小塊的區(qū)域,每次停頓時(shí)并不會(huì)對整個(gè)堆空間進(jìn)行回收。
正常情況下,G1 只需要很少的配置就能完成這些工作。以下是 G1 的兩個(gè)調(diào)整參數(shù)。
MaxGCPauseMillis :
該參數(shù)指定每次垃圾回收默認(rèn)的停頓時(shí)間。該值不是固定的,G1 可以根據(jù)需要使用更長的時(shí)間。它的默認(rèn)值是 200ms。也就是說,G1 會(huì)決定垃圾回收的頻率以及每一輪需要回收多少個(gè)區(qū)域,這樣算下來,每一輪垃圾回收大概需要 200ms 的時(shí)間。
InitiatingHeapOccupancyPercent :
該參數(shù)指定了在 G1 啟動(dòng)新一輪垃圾回收之前可以使用的堆內(nèi)存百分比,默認(rèn)值是 45。也就是說,在堆內(nèi)存的使用率達(dá)到 45% 之前,G1 不會(huì)啟動(dòng)垃圾回收。這個(gè)百分比包括新生代和老年代的內(nèi)存。
Kafka 對堆內(nèi)存的使用率非常高,容易產(chǎn)生垃圾對象,所以可以把這些值設(shè)得小一些。如果一臺(tái)服務(wù)器有 64GB 內(nèi)存,并且使用 5GB 堆內(nèi)存來運(yùn)行 Kafka,那么可以參考以下的配置:MaxGCPauseMillis 可以設(shè)為 20ms;InitiatingHeapOccupancyPercent 可以設(shè)為 35,這樣可以讓垃圾回收比默認(rèn)的要早一些啟動(dòng)。
Kafka 的啟動(dòng)腳本并沒有啟用 G1 回收器,而是使用了 Parallel New 和 CMS( Concurrent Mark-Sweep,并發(fā)標(biāo)記和清除)垃圾回收器。不過它可以通過環(huán)境變量來修改。本章前面的內(nèi)容使用 start 命令來修改它:
# export JAVA_HOME=/usr/java/jdk1.8.0_51 # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #數(shù)據(jù)中心布局
在開發(fā)階段,人們并不會(huì)太關(guān)心 Kafka 服務(wù)器在數(shù)據(jù)中心所處的物理位置,因?yàn)榧词辜涸诙虝r(shí)間內(nèi)出現(xiàn)局部或完全不可用,也不會(huì)造成太大影響。但是,在生產(chǎn)環(huán)境,服務(wù)不可用意味著金錢的損失,具體表現(xiàn)為無法為用戶提供服務(wù)或者不知道用戶正在做什么。這個(gè)時(shí)候,使用 Kafka 集群的復(fù)制功能就變得尤為重要(請參考第 6 章),而服務(wù)器在數(shù)據(jù)中心所處的物理位置也變得重要起來。如果在部署 Kafka 之前沒有考慮好這個(gè)問題,那么在后續(xù)的維護(hù)過程中,移動(dòng)服務(wù)器需要耗費(fèi)更高的成本。
在為 broker 增加新的分區(qū)時(shí),broker 并無法獲知機(jī)架的信息。也就是說,兩個(gè) broker 有可能是在同一個(gè)機(jī)架上,或者在同一個(gè)可用區(qū)域里(如果運(yùn)行在像 AWS 這樣的的云服務(wù)上),所以,在為分區(qū)添加副本的時(shí)候,這些副本很可能被分配給同一個(gè)機(jī)架上的 broker,它們使用相同的電源和網(wǎng)絡(luò)連接。如果該機(jī)架出了問題,這些分區(qū)就會(huì)離線,客戶端就無法訪問到它們。更糟糕的是,如果發(fā)生不完整的主節(jié)點(diǎn)選舉,那么在恢復(fù)時(shí)就有可能丟失數(shù)據(jù)(第 6 章將介紹更多細(xì)節(jié))。
所以,最好把集群的 broker 安裝在不同的機(jī)架上,至少不要讓它們共享可能出現(xiàn)單點(diǎn)故障的基礎(chǔ)設(shè)施,比如電源和網(wǎng)絡(luò)。也就是說,部署服務(wù)器需要至少兩個(gè)電源連接(兩個(gè)不同的回路)和兩個(gè)網(wǎng)絡(luò)交換器(保證可以進(jìn)行無縫的故障切換)。除了這些以外,最好還要把 broker 安放在不同的機(jī)架上。因?yàn)殡S著時(shí)間的推移,機(jī)架也需要進(jìn)行維護(hù),而這會(huì)導(dǎo)致機(jī)器離線(比如移動(dòng)機(jī)器或者重新連接電源)。
共享Zookeeper
Kafka 使用 Zookeeper 來保存 broker、主題和分區(qū)的元數(shù)據(jù)信息。對于一個(gè)包含多個(gè)節(jié)點(diǎn)的 Zookeeper 群組來說,Kafka 集群的這些流量并不算多,那些寫操作只是用于構(gòu)造消費(fèi)者群組或集群本身。實(shí)際上,在很多部署環(huán)境里,會(huì)讓多個(gè) Kafka 集群共享一個(gè) Zookeeper 群組(每個(gè)集群使用一個(gè) chroot 路徑)。
不過,消費(fèi)者和 Zookeeper 之間還是有一個(gè)值得注意的地方,消費(fèi)者可以選擇將偏移量提交到 Zookeeper 或 Kafka,還可以選擇提交偏移量的時(shí)間間隔。如果消費(fèi)者將偏移量提交到 Zookeeper,那么在每個(gè)提交時(shí)間點(diǎn)上,消費(fèi)者將會(huì)為每一個(gè)消費(fèi)的分區(qū)往 Zookeeper 寫入一次偏移量。合理的提交間隔是 1 分鐘,因?yàn)檫@剛好是消費(fèi)者群組的某個(gè)消費(fèi)者發(fā)生失效時(shí)能夠讀取到重復(fù)消息的時(shí)間。值得注意的是,這些提交對于 Zookeeper 來說流量不算小,特別是當(dāng)集群里有多個(gè)消費(fèi)者的時(shí)候。如果 Zookeeper 群組無法處理太大的流量,就有必要使用長一點(diǎn)的提交時(shí)間間隔。不過不管怎樣,還是建議使用最新版本的 Kafka,讓消費(fèi)者把偏移量提交到 Kafka 服務(wù)器上,消除對 Zookeeper 的依賴。
雖然多個(gè) Kafka 集群可以共享一個(gè) Zookeeper 群組,但如果有可能的話,不建議把 Zookeeper 共享給其他應(yīng)用程序。Kafka 對 Zookeeper 的延遲和超時(shí)比較敏感,與 Zookeeper 群組之間的一個(gè)通信異常就可能導(dǎo)致 Kafka 服務(wù)器出現(xiàn)無法預(yù)測的行為。這樣很容易讓多個(gè) broker 同時(shí)離線,如果它們與 Zookeeper 之間斷開連接,也會(huì)導(dǎo)致分區(qū)離線。這也會(huì)給集群控制器帶來壓力,在服務(wù)器離線一段時(shí)間之后,當(dāng)控制器嘗試關(guān)閉一個(gè)服務(wù)器時(shí),會(huì)表現(xiàn)出一些細(xì)小的錯(cuò)誤。其他的應(yīng)用程序因重度使用或進(jìn)行不恰當(dāng)?shù)牟僮鹘o Zookeeper 群組帶來壓力,所以最好讓它們使用自己的 Zookeeper 群組。
生產(chǎn)者配置
生產(chǎn)者有很多可配置的參數(shù),在 Kafka 文檔里都有說明,它們大部分都有合理的默認(rèn)值,所以沒有必要去修改它們。不過有幾個(gè)參數(shù)在內(nèi)存使用、性能和可靠性方面對生產(chǎn)者影響比較大,接下來我們會(huì)一一說明。
acks
acks 參數(shù)指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫入是成功的。這個(gè)參數(shù)對消息丟失的可能性有重要影響。該參數(shù)有如下選項(xiàng)。
- 如果 acks=0 ,生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng)。也就是說,如果當(dāng)中出現(xiàn)了問題,導(dǎo)致服務(wù)器沒有收到消息,那么生產(chǎn)者就無從得知,消息也就丟失了。不過,因?yàn)樯a(chǎn)者不需要等待服務(wù)器的響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而達(dá)到很高的吞吐量。
- 如果 acks=1 ,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。如果消息無法到達(dá)首領(lǐng)節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩潰,新的首領(lǐng)還沒有被選舉出來),生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息。不過,如果一個(gè)沒有收到消息的節(jié)點(diǎn)成為新首領(lǐng),消息還是會(huì)丟失。這個(gè)時(shí)候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過調(diào)用 Future 對象的 get() 方法),顯然會(huì)增加延遲(在網(wǎng)絡(luò)上傳輸一個(gè)來回的延遲)。如果客戶端使用回調(diào),延遲問題就可以得到緩解,不過吞吐量還是會(huì)受發(fā)送中消息數(shù)量的限制(比如,生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息)。
- 如果 acks=all ,只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來自服務(wù)器的成功響應(yīng)。這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息,就算有服務(wù)器發(fā)生崩潰,整個(gè)集群仍然可以運(yùn)行(第 5 章將討論更多的細(xì)節(jié))。不過,它的延遲比 acks=1 時(shí)更高,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。
buffer.memory
該參數(shù)用來設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果應(yīng)用程序發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候, send() 方法調(diào)用要么被阻塞,要么拋出異常,取決于如何設(shè)置 block.on.buffer.full 參數(shù)(在 0.9.0.0 版本里被替換成了?max.block.ms?,表示在拋出異常之前可以阻塞一段時(shí)間)。
compression.type
默認(rèn)情況下,消息發(fā)送時(shí)不會(huì)被壓縮。該參數(shù)可以設(shè)置為 snappy 、gzip 或 lz4 ,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進(jìn)行壓縮。snappy 壓縮算法由 Google 發(fā)明,它占用較少的 CPU,卻能提供較好的性能和相當(dāng)可觀的壓縮比,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬,可以使用這種算法。gzip 壓縮算法一般會(huì)占用較多的 CPU,但會(huì)提供更高的壓縮比,所以如果網(wǎng)絡(luò)帶寬比較有限,可以使用這種算法。使用壓縮可以降低網(wǎng)絡(luò)傳輸開銷和存儲(chǔ)開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。
retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))。在這種情況下,retries 參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待 100ms,不過可以通過?retry.backoff.ms?參數(shù)來改變這個(gè)時(shí)間間隔。建議在設(shè)置重試次數(shù)和重試時(shí)間間隔之前,先測試一下恢復(fù)一個(gè)崩潰節(jié)點(diǎn)需要多少時(shí)間(比如所有分區(qū)選舉出首領(lǐng)需要多長時(shí)間),讓總的重試時(shí)間比 Kafka 集群從崩潰中恢復(fù)的時(shí)間長,否則生產(chǎn)者會(huì)過早地放棄重試。不過有些錯(cuò)誤不是臨時(shí)性錯(cuò)誤,沒辦法通過重試來解決(比如“消息太大”錯(cuò)誤)。一般情況下,因?yàn)樯a(chǎn)者會(huì)自動(dòng)進(jìn)行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯(cuò)誤。你只需要處理那些不可重試的錯(cuò)誤或重試次數(shù)超出上限的情況。
batch.size
當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過生產(chǎn)者并不一定都會(huì)等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個(gè)消息的批次也有可能被發(fā)送。所以就算把批次大小設(shè)置得很大,也不會(huì)造成延遲,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息,會(huì)增加一些額外的開銷。
linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。KafkaProducer 會(huì)在批次填滿或?linger.ms?達(dá)到上限時(shí)把批次發(fā)送出去。默認(rèn)情況下,只要有可用的線程,生產(chǎn)者就會(huì)把消息發(fā)送出去,就算批次里只有一個(gè)消息。把?linger.ms?設(shè)置成比 0 大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒,使更多的消息加入到這個(gè)批次。雖然這樣會(huì)增加延遲,但也會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息,每個(gè)消息的開銷就變小了)。
max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會(huì)占用越多的內(nèi)存,不過也會(huì)提升吞吐量。把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
timeout.ms、request.timeout.ms?和?metadata.fetch.timeout.ms
request.timeout.ms?指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間,metadata.fetch.timeout.ms?指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間。如果等待響應(yīng)超時(shí),那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個(gè)錯(cuò)誤(拋出異?;驁?zhí)行回調(diào))。timeout.ms?指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間,與 asks 的配置相匹配——如果在指定時(shí)間內(nèi)沒有收到同步副本的確認(rèn),那么 broker 就會(huì)返回一個(gè)錯(cuò)誤。
max.block.ms
該參數(shù)指定了在調(diào)用 send() 方法或使用 partitionsFor() 方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間。當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數(shù)據(jù)時(shí),這些方法就會(huì)阻塞。在阻塞時(shí)間達(dá)到?max.block.ms?時(shí),生產(chǎn)者會(huì)拋出超時(shí)異常。
max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小。它可以指能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請求里所有消息總的大小。例如,假設(shè)這個(gè)值為 1MB,那么可以發(fā)送的單個(gè)最大消息為 1MB,或者生產(chǎn)者可以在單個(gè)請求里發(fā)送一個(gè)批次,該批次包含了 1000 個(gè)消息,每個(gè)消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes ),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。
receive.buffer.bytes 和 send.buffer.bytes
這兩個(gè)參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設(shè)為 -1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
順序保證
Kafka 可以保證同一個(gè)分區(qū)里的消息是有序的。也就是說,如果生產(chǎn)者按照一定的順序發(fā)送消息,broker 就會(huì)按照這個(gè)順序把它們寫入分區(qū),消費(fèi)者也會(huì)按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,往一個(gè)賬戶存入 100 元再取出來,這個(gè)與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。
如果把 retries 設(shè)為非零整數(shù),同時(shí)把 max.in.flight.requests.per.connection 設(shè)為比 1 大的數(shù),那么,如果第一個(gè)批次消息寫入失敗,而第二個(gè)批次寫入成功,broker 會(huì)重試寫入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫入成功,那么兩個(gè)批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關(guān)鍵的,所以不建議把 retries 設(shè)為 0。可以把 max.in.flight.requests.per.connection 設(shè)為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí),就不會(huì)有其他的消息發(fā)送給 broker。不過這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量,所以只有在對消息的順序有嚴(yán)格要求的情況下才能這么做。
消費(fèi)者配置
fetch.min.bytes
該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請求時(shí),如果可用的數(shù)據(jù)量小于 fetch.min.bytes 指定的大小,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載,因?yàn)樗鼈冊谥黝}不是很活躍的時(shí)候(或者一天里的低谷時(shí)段)就不需要來來回回地處理消息。如果沒有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費(fèi)者的數(shù)量比較多,把該屬性的值設(shè)置得大一點(diǎn)可以降低 broker 的工作負(fù)載。
fetch.max.wait.ms
我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。而?feth.max.wait.ms?則用于指定 broker 的等待時(shí)間,默認(rèn)是 500ms。如果沒有足夠的數(shù)據(jù)流入 Kafka,消費(fèi)者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲。如果要降低潛在的延遲(為了滿足 SLA),可以把該參數(shù)值設(shè)置得小一些。如果?fetch.max.wait.ms?被設(shè)為 100ms,并且 fetch.min.bytes 被設(shè)為 1MB,那么 Kafka 在收到消費(fèi)者的請求后,要么返回 1MB 數(shù)據(jù),要么在 100ms 后返回所有可用的數(shù)據(jù),就看哪個(gè)條件先得到滿足。
max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。它的默認(rèn)值是 1MB,也就是說,KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個(gè)主題有 20 個(gè)分區(qū)和 5 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者需要至少 4MB 的可用內(nèi)存來接收記錄。在為消費(fèi)者分配內(nèi)存時(shí),可以給它們多分配一些,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置)大,否則消費(fèi)者可能無法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試。在設(shè)置該屬性時(shí),另一個(gè)需要考慮的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁調(diào)用 poll() 方法來避免會(huì)話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時(shí)間來處理,可能無法及時(shí)進(jìn)行下一個(gè)輪詢來避免會(huì)話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會(huì)話過期時(shí)間。
session.timeout.ms
該屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)是 3s。如果消費(fèi)者沒有在?session.timeout.ms?指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就被認(rèn)為已經(jīng)死亡,協(xié)調(diào)器就會(huì)觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費(fèi)者。該屬性與?heartbeat.interval.ms?緊密相關(guān)。heartbeat.interval.ms?指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms?則指定了消費(fèi)者可以多久不發(fā)送心跳。所以,一般需要同時(shí)修改這兩個(gè)屬性,heartbeat.interval.ms?必須比?session.timeout.ms?小,一般是?session.timeout.ms?的三分之一。如果?session.timeout.ms?是 3s,那么?heartbeat.interval.ms?應(yīng)該是 1s。把?session.timeout.ms?值設(shè)得比默認(rèn)值小,可以更快地檢測和恢復(fù)崩潰的節(jié)點(diǎn),不過長時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置得大一些,可以減少意外的再均衡,不過檢測節(jié)點(diǎn)崩潰需要更長的時(shí)間。
auto.offset.reset
該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費(fèi)者長時(shí)間失效,包含偏移量的記錄已經(jīng)過時(shí)并被刪除)該作何處理。它的默認(rèn)值是 latest ,意思是說,在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)。另一個(gè)值是 earliest ,意思是說,在偏移量無效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄。
enable.auto.commit
該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)值是 true 。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false ,由自己控制何時(shí)提交偏移量。如果把它設(shè)為 true ,還可以通過配置?auto.commit.interval.ms?屬性來控制提交的頻率。
partition.assignment.strategy
我們知道,分區(qū)會(huì)被分配給群組里的消費(fèi)者。PartitionAssignor 根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者。Kafka 有兩個(gè)默認(rèn)的分配策略。可以通過設(shè)置partition.assignment.strategy 來選擇分區(qū)策略。默認(rèn)使用的是 org.apache.kafka.clients.consumer.RangeAssignor ,這個(gè)類實(shí)現(xiàn)了 Range 策略,不過也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor 。我們還可以使用自定義策略,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字。
max.poll.records
Range
該策略會(huì)把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。假設(shè)消費(fèi)者 C1 和消費(fèi)者 C2 同時(shí)訂閱了主題 T1 和主題 T2,并且每個(gè)主題有 3 個(gè)分區(qū)。那么消費(fèi)者 C1 有可能分配到這兩個(gè)主題的分區(qū) 0 和分區(qū) 1,而消費(fèi)者 C2 分配到這兩個(gè)主題的分區(qū) 2。因?yàn)槊總€(gè)主題擁有奇數(shù)個(gè)分區(qū),而分配是在主題內(nèi)獨(dú)立完成的,第一個(gè)消費(fèi)者最后分配到比第二個(gè)消費(fèi)者更多的分區(qū)。只要使用了 Range 策略,而且分區(qū)數(shù)量無法被消費(fèi)者數(shù)量整除,就會(huì)出現(xiàn)這種情況。
RoundRobin
該策略把主題的所有分區(qū)逐個(gè)分配給消費(fèi)者。如果使用 RoundRobin 策略來給消費(fèi)者 C1 和消費(fèi)者 C2 分配分區(qū),那么消費(fèi)者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1,消費(fèi)者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2。一般來說,如果所有消費(fèi)者都訂閱相同的主題(這種情況很常見),RoundRobin 策略會(huì)給所有消費(fèi)者分配相同數(shù)量的分區(qū)(或最多就差一個(gè)分區(qū))。
max.poll.records
該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。
receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為 -1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
以上內(nèi)容摘抄至:《Kafka權(quán)威指南》
總結(jié)
- 上一篇: Kafka配置offsets.reten
- 下一篇: 阿里云监控插件安装 | 非阿里云服务器安