日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka调优

發布時間:2024/4/18 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka调优 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

配置參數調優

broker配置

num.recovery.threads.per.data.dir

對于如下 3 種情況,Kafka 會使用可配置的線程池來處理日志片段:

  • 服務器正常啟動,用于打開每個分區的日志片段;
  • 服務器崩潰后重啟,用于檢查和截短每個分區的日志片段;
  • 服務器正常關閉,用于關閉日志片段。

默認情況下,每個日志目錄只使用一個線程。因為這些線程只是在服務器啟動和關閉時會用到,所以完全可以設置大量的線程來達到并行操作的目的。特別是對于包含大量分區的服務器來說,一旦發生崩潰,在進行恢復時使用并行操作可能會省下數小時的時間。設置此參數時需要注意,所配置的數字對應的是 log.dirs 指定的單個日志目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設為 8,并且 log.dir 指定了 3 個路徑,那么總共需要 24 個線程。

主題的默認配置

num.partitions

num.partitions 參數指定了新創建的主題將包含多少個分區。如果啟用了主題自動創建功能(該功能默認是啟用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。Kafka 集群通過分區對主題進行橫向擴展,所以當有新的 broker 加入集群時,可以通過分區個數來實現集群的負載均衡。擁有大量消息的主題如果要進行負載分散,就需要大量的分區。

如何選定分區數量

你需要很多分區,但不能太多。因為分區越多,占用的內存越多,完成首領選舉需要的時間也越長。如果你估算出主題的吞吐量和消費者吞吐量,可以用主題吞吐量除以消費者吞吐量算出分區的個數。也就是說,如果每秒鐘要從主題上寫入和讀取 1GB 的數據,并且每個消費者每秒鐘可以處理 50MB 的數據,那么至少需要 20 個分區。這樣就可以讓 20 個消費者同時讀取這些分區,從而達到每秒鐘 1GB 的吞吐量。

log.retention.ms

Kafka 通常根據時間來決定數據可以被保留多久。默認使用 log.retention.hours 參數來配置時間,默認值為 168 小時,也就是一周。除此以外,還有其他兩個參數 log.retention.minutes 和?log.retention.ms?。這 3 個參數的作用是一樣的,都是決定消息多久以后會被刪除,不過還是推薦使用?log.retention.ms?。如果指定了不止一個參數,Kafka 會優先使用具有最小值的那個參數。

log.retention.bytes

另一種方式是通過保留的消息字節數來判斷消息是否過期。它的值通過參數 log.retention.bytes 來指定,作用在每一個分區上。也就是說,如果有一個包含 8 個分區的主題,并且 log.retention.bytes 被設為 1GB,那么這個主題最多可以保留 8GB 的數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。

根據字節大小和時間保留數據

如果同時指定了 log.retention.bytes 和?log.retention.ms?(或者另一個時間參數),只要任意一個條件得到滿足,消息就會被刪除。例如,假設?log.retention.ms?設置為 86 400 000(也就是 1 天),log.retention.bytes 設置為 1 000 000 000(也就是 1GB),如果消息字節總數在不到一天的時間就超過了 1GB,那么多出來的部分就會被刪除。相反,如果消息字節總數小于 1GB,那么一天之后這些消息也會被刪除,盡管分區的數據總量小于 1GB。

log.segment.bytes

以上的設置都作用在日志片段上,而不是作用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日志片段上。當日志片段大小達到 log.segment.bytes 指定的上限(默認是 1GB)時,當前日志片段就會被關閉,一個新的日志片段被打開。如果一個日志片段被關閉,就開始等待過期。這個參數的值越小,就會越頻繁地關閉和分配新文件,從而降低磁盤寫入的整體效率。

如果主題的消息量不大,那么如何調整這個參數的大小就變得尤為重要。例如,如果一個主題每天只接收 100MB 的消息,而 log.segment.bytes 使用默認設置,那么需要 10 天時間才能填滿一個日志片段。因為在日志片段被關閉之前消息是不會過期的,所以如果?log.retention.ms?被設為 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才會過期。

這是因為關閉日志片段需要 10 天的時間,而根據配置的過期時間,還需要再保留 7 天時間(要等到日志片段里的最后一個消息過期才能被刪除)。
簡單來說,對于使用時間戳獲取偏移量的操作來說,日志片段越小,結果越準確。(這句沒懂)

log.segment.ms

另一個可以控制日志片段關閉時間的參數是?log.segment.ms?,它指定了多長時間之后日志片段會被關閉。就像 log.retention.bytes 和?log.retention.ms?這兩個參數一樣,log.segment.bytes 和?log.retention.ms?這兩個參數之間也不存在互斥問題。日志片段會在大小或時間達到上限時被關閉,就看哪個條件先得到滿足。默認情況下,log.segment.ms?沒有設定值,所以只根據大小來關閉日志片段。
在使用基于時間的日志片段時,要著重考慮并行關閉多個日志片段對磁盤性能的影響。如果多個分區的日志片段永遠不能達到大小的上限,就會發生這種情況,因為 broker 在啟動之后就開始計算日志片段的過期時間,對于那些數據量小的分區來說,日志片段的關閉操作總是同時發生。

message.max.bytes

broker 通過設置 message.max.bytes 參數來限制單個消息的大小,默認值是 1 000 000,也就是 1MB。如果生產者嘗試發送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤信息。跟其他與字節相關的配置參數一樣,該參數指的是壓縮后的消息大小,也就是說,只要壓縮后的消息小于 message.max.bytes 指定的值,消息的實際大小可以遠大于這個值。

這個值對性能有顯著的影響。值越大,那么負責處理網絡連接和請求的線程就需要花越多的時間來處理這些請求。它還會增加磁盤寫入塊的大小,從而影響 IO 吞吐量。

在服務端和客戶端之間協調消息大小的配置

消費者客戶端設置的 fetch.message.max.bytes 必須與服務器端設置的消息大小進行協調。如果這個值比 message.max.bytes 小,那么消費者就無法讀取比較大的消息,導致出現消費者被阻塞的情況。在為集群里的 broker 配置 replica.fetch.max.bytes 參數時,也遵循同樣的原則。

kafka集群配置

需要多少個broker

一個 Kafka 集群需要多少個 broker 取決于以下幾個因素。首先,需要多少磁盤空間來保留數據,以及單個 broker 有多少空間可用。如果整個集群需要保留 10TB 的數據,每個 broker 可以存儲 2TB,那么至少需要 5 個 broker。如果啟用了數據復制,那么至少還需要一倍的空間,不過這要取決于配置的復制系數是多少(將在第 6 章介紹)。也就是說,如果啟用了數據復制,那么這個集群至少需要 10 個 broker。

第二個要考慮的因素是集群處理請求的能力。這通常與網絡接口處理客戶端流量的能力有關,特別是當有多個消費者存在或者在數據保留期間流量發生波動(比如高峰時段的流量爆發)時。如果單個 broker 的網絡接口在高峰時段可以達到 80% 的使用量,并且有兩個消費者,那么消費者就無法保持峰值,除非有兩個 broker。如果集群啟用了復制功能,則要把這個額外的消費者考慮在內。因磁盤吞吐量低和系統內存不足造成的性能問題,也可以通過擴展多個 broker 來解決。(這里沒看懂)

操作系統調優

虛擬內存

一般來說,Linux 的虛擬內存會根據系統的工作負荷進行自動調整。我們可以對交換分區的處理方式和內存臟頁進行調整,從而讓 Kafka 更好地處理工作負載。對于大多數依賴吞吐量的應用程序來說,要盡量避免內存交換。內存頁和磁盤之間的交換對 Kafka 各方面的性能都有重大影響。Kafka 大量地使用系統頁面緩存,如果虛擬內存被交換到磁盤,說明已經沒有多余內存可以分配給頁面緩存了。

一種避免內存交換的方法是不設置任何交換分區。內存交換不是必需的,不過它確實能夠在系統發生災難性錯誤時提供一些幫助。進行內存交換可以防止操作系統由于內存不足而突然終止進程。基于上述原因,建議把 vm.swappiness 參數的值設置得小一點,比如 1。該參數指明了虛擬機的子系統將如何使用交換分區,而不是只把內存頁從頁面緩存里移除。要優先考慮減小頁面緩存,而不是進行內存交換。

臟頁會被沖刷到磁盤上,調整內核對臟頁的處理方式可以讓我們從中獲益。Kafka 依賴 I/O 性能為生產者提供快速的響應。這就是為什么日志片段一般要保存在快速磁盤上,不管是單個快速磁盤(如 SSD)還是具有 NVRAM 緩存的磁盤子系統(如 RAID)。這樣一來,在后臺刷新進程將臟頁寫入磁盤之前,可以減少臟頁的數量,這個可以通過將 vm.dirty_background_ratio 設為小于 10 的值來實現。該值指的是系統內存的百分比,大部分情況下設為 5 就可以了。它不應該被設為 0,因為那樣會促使內核頻繁地刷新頁面,從而降低內核為底層設備的磁盤寫入提供緩沖的能力。

通過設置 vm.dirty_ratio 參數可以增加被內核進程刷新到磁盤之前的臟頁數量,可以將它設為大于 20 的值(這也是系統內存的百分比)。這個值可設置的范圍很廣,60~80 是個比較合理的區間。不過調整這個參數會帶來一些風險,包括未刷新磁盤操作的數量和同步刷新引起的長時間 I/O 等待。如果該參數設置了較高的值,建議啟用 Kafka 的復制功能,避免因系統崩潰造成數據丟失。

為了給這些參數設置合適的值,最好是在 Kafka 集群運行期間檢查臟頁的數量,不管是在生存環境還是模擬環境??梢栽?/proc/vmstat 文件里查看當前臟頁數量。

這些都是可控制的選項,根據工作負載和數據,你可以決定如何設置它們:

$ sysctl -a | grep dirty vm.dirty_background_bytes = 0 vm.dirty_background_ratio = 10 vm.dirty_bytes = 0 vm.dirty_ratio = 20 vm.dirty_writeback_centisecs = 500 vm.dirty_expire_centisecs = 3000 vm.dirtytime_expire_seconds = 43200
  • vm.dirty_background_ratio?是內存可以填充臟數據的百分比。這些臟數據稍后會寫入磁盤,后臺進程會稍后清理臟數據。比如,我有32G內存,那么有3.2G的臟數據可以待著內存里,超過3.2G的話就會有后臺進程來清理。
  • vm.dirty_ratio是可以用臟數據填充的絕對最大系統內存量,當系統到達此點時,必須將所有臟數據提交到磁盤,同時所有新的I/O塊都會被阻塞,直到臟數據被寫入磁盤。這通常是長I/O卡頓的原因,但這也是保證內存中不會存在過量臟數據的保護機制。
  • vm.dirty_background_bytes和vm.dirty_bytes是另一種指定這些參數的方法。如果設置_bytes版本,則_ratio版本將變為0,反之亦然。
  • vm.dirty_expire_centisecs?指定臟數據能存活的時間。在這里它的值是30秒。當?pdflush/flush/kdmflush?在運行的時候,他們會檢查是否有數據超過這個時限,如果有則會把它異步地寫到磁盤中。畢竟數據在內存里待太久也會有丟失風險。
  • vm.dirty_writeback_centisecs?指定多長時間?pdflush/flush/kdmflush?這些進程會喚醒一次,然后檢查是否有緩存需要清理。

可以通過下面方式看內存中有多少臟數據:一共有106頁的臟數據:

$ cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 106 nr_writeback 0 nr_writeback_temp 0 nr_dirty_threshold 3934012 nr_dirty_background_threshold 1964604

方法2:增加緩存

在某些情況下,顯著提高緩存對性能有積極的影響。在這些情況下,Linux客戶機上包含的數據不是關鍵的,可能會丟失,而且應用程序通常會重復或以可重復的方式寫入相同的文件。理論上,通過允許內存中存在更多臟頁,你將在緩存中一遍又一遍地重寫相同的塊,只需要每隔一段時間向實際磁盤寫一次。為此,我們提出了以下參數:

vm.dirty_background_ratio = 50 vm.dirty_ratio = 80

有時候還會提高vm.dirty_expire_centisecs?這個參數的值,來允許臟數據更長時間地停留。除了增加數據丟失的風險之外,如果緩存已滿并需要同步,還會有長時間I/O卡頓的風險,因為在大型虛擬機緩存中有大量數據。

方法3:增減都用

有時候系統需要應對突如其來的高峰數據,它可能會拖慢磁盤。比如說:每小時或者午夜進行批處理作業、在Raspberry Pi上寫SD卡等等。這種情況下,我們可以允許大量的寫I/O存儲在緩存中,這樣后臺刷新操作就可以慢慢異步處理它:

vm.dirty_background_ratio = 5 vm.dirty_ratio = 80

這個時候,系統后臺進程在臟數據達到5%時就開始異步清理,但在80%之前系統不會強制同步寫磁盤。在此基礎上,你只需要調整RAM和vm.dirty_ratio大小以便能緩存所有的寫數據。當然,磁盤上的數據一致性也存在一定風險。

無論你選擇哪種方式,都應該始終收集數據來支持你的更改,并幫助你確定是在改進還是變得更糟。我們可以從應用程序,/proc/vmstat,?/proc/meminfo,?iostat,?vmstat?以及/proc/sys/vm里面獲得大量有用信息。

為什么不把 vm.swappiness 設為零

先前,人們建議盡量把 vm.swapiness 設為 0,它意味著“除非發生內存溢出,否則不要進行內存交換”。直到 Linux 內核 3.5-rc1 版本發布,這個值的意義才發生了變化。這個變化被移植到其他的發行版上,包括 Red Hat 企業版內核 2.6.32-303。在發生變化之后,0 意味著“在任何情況下都不要發生交換”。所以現在建議把這個值設為 1。

磁盤

除了選擇合適的磁盤硬件設備和使用 RAID 外,文件系統是影響性能的另一個重要因素。有很多種文件系統可供選擇,不過對于本地文件系統來說,EXT4(第四代可擴展文件系統)和 XFS 最為常見。近來,XFS 成為很多 Linux 發行版默認的文件系統,因為它只需要做少量調優就可以承擔大部分的工作負荷,比 EXT4 具有更好的表現。EXT4 也可以做得很好,但需要做更多的調優,存在較大的風險。其中就包括設置更長的提交間隔(默認是 5),以便降低刷新的頻率。EXT4 還引入了塊分配延遲,一旦系統崩潰,更容易造成數據丟失和文件系統毀壞。XFS 也使用了分配延遲算法,不過比 EXT4 的要安全些。XFS 為 Kafka 提供了更好的性能,除了由文件系統提供的自動調優之外,無需額外的調優。批量磁盤寫入具有更高的效率,可以提升整體的 I/O 吞吐量。

不管使用哪一種文件系統來存儲日志片段,最好要對掛載點的 noatime 參數進行合理的設置。文件元數據包含 3 個時間戳:創建時間(ctime)、最后修改時間(mtime)以及最后訪問時間(atime)。默認情況下,每次文件被讀取后都會更新 atime,這會導致大量的磁盤寫操作,而且 atime 屬性的用處不大,除非某些應用程序想要知道某個文件在最近一次修改后有沒有被訪問過(這種情況可以使用 realtime )。Kafka 用不到該屬性,所以完全可以把它禁用掉。為掛載點設置 noatime 參數可以防止更新 atime,但不會影響 ctime 和 mtime。

網絡

默認情況下,系統內核沒有針對快速的大流量網絡傳輸進行優化,所以對于應用程序來說,一般需要對 Linux 系統的網絡棧進行調優,以實現對大流量的支持。實際上,調整 Kafka 的網絡配置與調整其他大部分 Web 服務器和網絡應用程序的網絡配置是一樣的。首先可以對分配給 socket 讀寫緩沖區的內存大小作出調整,這樣可以顯著提升網絡的傳輸性能。socket 讀寫緩沖區對應的參數分別是 net.core.wmem_default 和 net.core.rmem_default ,合理的值是 131 072(也就是 128KB)。讀寫緩沖區最大值對應的參數分別是 net.core.wmem_max 和 net.core.rmem_max ,合理的值是 2 097 152(也就是 2MB)。要注意,最大值并不意味著每個 socket 一定要有這么大的緩沖空間,只是說在必要的情況下才會達到這個值。

除了設置 socket 外,還需要設置 TCP socket 的讀寫緩沖區,它們的參數分別是 net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem 。這些參數的值由 3 個整數組成,它們使用空格分隔,分別表示最小值、默認值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“4096 65536 2048000”表示最小值是 4KB、默認值是 64KB、最大值是 2MB。根據 Kafka 服務器接收流量的實際情況,可能需要設置更高的最大值,為網絡連接提供更大的緩沖空間。

還有其他一些有用的網絡參數。例如,把 net.ipv4.tcp_window_scaling 設為 1,啟用 TCP 時間窗擴展,可以提升客戶端傳輸數據的效率,傳輸的數據可以在服務器端進行緩沖。把 net.ipv4.tcp_max_syn_backlog 設為比默認值 1024 更大的值,可以接受更多的并發連接。把 net.core.netdev_max_backlog 設為比默認值 1000 更大的值,有助于應對網絡流量的爆發,特別是在使用千兆網絡的情況下,允許更多的數據包排隊等待內核處理。

垃圾回收器選項

為應用程序調整 Java 垃圾回收參數就像是一門藝術,我們需要知道應用程序是如何使用內存的,還需要大量的觀察和試錯。幸運的是,Java 7 為我們帶來了 G1 垃圾回收器,讓這種狀況有所改觀。在應用程序的整個生命周期,G1 會自動根據工作負載情況進行自我調節,而且它的停頓時間是恒定的。它可以輕松地處理大塊的堆內存,把堆內存分為若干小塊的區域,每次停頓時并不會對整個堆空間進行回收。

正常情況下,G1 只需要很少的配置就能完成這些工作。以下是 G1 的兩個調整參數。

MaxGCPauseMillis :

該參數指定每次垃圾回收默認的停頓時間。該值不是固定的,G1 可以根據需要使用更長的時間。它的默認值是 200ms。也就是說,G1 會決定垃圾回收的頻率以及每一輪需要回收多少個區域,這樣算下來,每一輪垃圾回收大概需要 200ms 的時間。

InitiatingHeapOccupancyPercent :

該參數指定了在 G1 啟動新一輪垃圾回收之前可以使用的堆內存百分比,默認值是 45。也就是說,在堆內存的使用率達到 45% 之前,G1 不會啟動垃圾回收。這個百分比包括新生代和老年代的內存。

Kafka 對堆內存的使用率非常高,容易產生垃圾對象,所以可以把這些值設得小一些。如果一臺服務器有 64GB 內存,并且使用 5GB 堆內存來運行 Kafka,那么可以參考以下的配置:MaxGCPauseMillis 可以設為 20ms;InitiatingHeapOccupancyPercent 可以設為 35,這樣可以讓垃圾回收比默認的要早一些啟動。

Kafka 的啟動腳本并沒有啟用 G1 回收器,而是使用了 Parallel New 和 CMS( Concurrent Mark-Sweep,并發標記和清除)垃圾回收器。不過它可以通過環境變量來修改。本章前面的內容使用 start 命令來修改它:

# export JAVA_HOME=/usr/java/jdk1.8.0_51 # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #

數據中心布局

在開發階段,人們并不會太關心 Kafka 服務器在數據中心所處的物理位置,因為即使集群在短時間內出現局部或完全不可用,也不會造成太大影響。但是,在生產環境,服務不可用意味著金錢的損失,具體表現為無法為用戶提供服務或者不知道用戶正在做什么。這個時候,使用 Kafka 集群的復制功能就變得尤為重要(請參考第 6 章),而服務器在數據中心所處的物理位置也變得重要起來。如果在部署 Kafka 之前沒有考慮好這個問題,那么在后續的維護過程中,移動服務器需要耗費更高的成本。

在為 broker 增加新的分區時,broker 并無法獲知機架的信息。也就是說,兩個 broker 有可能是在同一個機架上,或者在同一個可用區域里(如果運行在像 AWS 這樣的的云服務上),所以,在為分區添加副本的時候,這些副本很可能被分配給同一個機架上的 broker,它們使用相同的電源和網絡連接。如果該機架出了問題,這些分區就會離線,客戶端就無法訪問到它們。更糟糕的是,如果發生不完整的主節點選舉,那么在恢復時就有可能丟失數據(第 6 章將介紹更多細節)。

所以,最好把集群的 broker 安裝在不同的機架上,至少不要讓它們共享可能出現單點故障的基礎設施,比如電源和網絡。也就是說,部署服務器需要至少兩個電源連接(兩個不同的回路)和兩個網絡交換器(保證可以進行無縫的故障切換)。除了這些以外,最好還要把 broker 安放在不同的機架上。因為隨著時間的推移,機架也需要進行維護,而這會導致機器離線(比如移動機器或者重新連接電源)。

共享Zookeeper

Kafka 使用 Zookeeper 來保存 broker、主題和分區的元數據信息。對于一個包含多個節點的 Zookeeper 群組來說,Kafka 集群的這些流量并不算多,那些寫操作只是用于構造消費者群組或集群本身。實際上,在很多部署環境里,會讓多個 Kafka 集群共享一個 Zookeeper 群組(每個集群使用一個 chroot 路徑)。
不過,消費者和 Zookeeper 之間還是有一個值得注意的地方,消費者可以選擇將偏移量提交到 Zookeeper 或 Kafka,還可以選擇提交偏移量的時間間隔。如果消費者將偏移量提交到 Zookeeper,那么在每個提交時間點上,消費者將會為每一個消費的分區往 Zookeeper 寫入一次偏移量。合理的提交間隔是 1 分鐘,因為這剛好是消費者群組的某個消費者發生失效時能夠讀取到重復消息的時間。值得注意的是,這些提交對于 Zookeeper 來說流量不算小,特別是當集群里有多個消費者的時候。如果 Zookeeper 群組無法處理太大的流量,就有必要使用長一點的提交時間間隔。不過不管怎樣,還是建議使用最新版本的 Kafka,讓消費者把偏移量提交到 Kafka 服務器上,消除對 Zookeeper 的依賴。

雖然多個 Kafka 集群可以共享一個 Zookeeper 群組,但如果有可能的話,不建議把 Zookeeper 共享給其他應用程序。Kafka 對 Zookeeper 的延遲和超時比較敏感,與 Zookeeper 群組之間的一個通信異常就可能導致 Kafka 服務器出現無法預測的行為。這樣很容易讓多個 broker 同時離線,如果它們與 Zookeeper 之間斷開連接,也會導致分區離線。這也會給集群控制器帶來壓力,在服務器離線一段時間之后,當控制器嘗試關閉一個服務器時,會表現出一些細小的錯誤。其他的應用程序因重度使用或進行不恰當的操作給 Zookeeper 群組帶來壓力,所以最好讓它們使用自己的 Zookeeper 群組。

生產者配置

生產者有很多可配置的參數,在 Kafka 文檔里都有說明,它們大部分都有合理的默認值,所以沒有必要去修改它們。不過有幾個參數在內存使用、性能和可靠性方面對生產者影響比較大,接下來我們會一一說明。

acks

acks 參數指定了必須要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。該參數有如下選項。

  • 如果 acks=0 ,生產者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現了問題,導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發送消息,從而達到很高的吞吐量。
  • 如果 acks=1 ,只要集群的首領節點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節點(比如首領節點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發送還是異步發送。如果讓發送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生產者在收到服務器響應之前可以發送多少個消息)。
  • 如果 acks=all ,只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發生崩潰,整個集群仍然可以運行(第 5 章將討論更多的細節)。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節點接收消息。

buffer.memory

該參數用來設置生產者內存緩沖區的大小,生產者用它緩沖要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候, send() 方法調用要么被阻塞,要么拋出異常,取決于如何設置 block.on.buffer.full 參數(在 0.9.0.0 版本里被替換成了?max.block.ms?,表示在拋出異常之前可以阻塞一段時間)。

compression.type

默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy 、gzip 或 lz4 ,它指定了消息被發送給 broker 之前使用哪一種壓縮算法進行壓縮。snappy 壓縮算法由 Google 發明,它占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發送消息的瓶頸所在。

retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領)。在這種情況下,retries 參數的值決定了生產者可以重發消息的次數,如果達到這個次數,生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待 100ms,不過可以通過?retry.backoff.ms?參數來改變這個時間間隔。建議在設置重試次數和重試時間間隔之前,先測試一下恢復一個崩潰節點需要多少時間(比如所有分區選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)。一般情況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數超出上限的情況。

batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算(而不是消息個數)。當批次被填滿,批次里的所有消息會被發送出去。不過生產者并不一定都會等到批次被填滿才發送,半滿的批次,甚至只包含一個消息的批次也有可能被發送。所以就算把批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發送消息,會增加一些額外的開銷。

linger.ms

該參數指定了生產者在發送批次之前等待更多消息加入批次的時間。KafkaProducer 會在批次填滿或?linger.ms?達到上限時把批次發送出去。默認情況下,只要有可用的線程,生產者就會把消息發送出去,就算批次里只有一個消息。把?linger.ms?設置成比 0 大的數,讓生產者在發送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發送更多的消息,每個消息的開銷就變小了)。

max.in.flight.requests.per.connection

該參數指定了生產者在收到服務器響應之前可以發送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為 1 可以保證消息是按照發送的順序寫入服務器的,即使發生了重試。

timeout.ms、request.timeout.ms?和?metadata.fetch.timeout.ms

request.timeout.ms?指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms?指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發送數據,要么返回一個錯誤(拋出異?;驁绦谢卣{)。timeout.ms?指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配——如果在指定時間內沒有收到同步副本的確認,那么 broker 就會返回一個錯誤。

max.block.ms

該參數指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當生產者的發送緩沖區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到?max.block.ms?時,生產者會拋出超時異常。

max.request.size

該參數用于控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發送的單個最大消息為 1MB,或者生產者可以在單個請求里發送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes ),所以兩邊的配置最好可以匹配,避免生產者發送的消息被 broker 拒絕。

receive.buffer.bytes 和 send.buffer.bytes

這兩個參數分別指定了 TCP socket 接收和發送數據包的緩沖區大小。如果它們被設為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處于不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

順序保證

Kafka 可以保證同一個分區里的消息是有序的。也就是說,如果生產者按照一定的順序發送消息,broker 就會按照這個順序把它們寫入分區,消費者也會按照同樣的順序讀取它們。在某些情況下,順序是非常重要的。例如,往一個賬戶存入 100 元再取出來,這個與先取錢再存錢是截然不同的!不過,有些場景對順序不是很敏感。
如果把 retries 設為非零整數,同時把 max.in.flight.requests.per.connection 設為比 1 大的數,那么,如果第一個批次消息寫入失敗,而第二個批次寫入成功,broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功,那么兩個批次的順序就反過來了。
一般來說,如果某些場景要求消息是有序的,那么消息是否寫入成功也是很關鍵的,所以不建議把 retries 設為 0。可以把 max.in.flight.requests.per.connection 設為 1,這樣在生產者嘗試發送第一批消息時,就不會有其他的消息發送給 broker。不過這樣會嚴重影響生產者的吞吐量,所以只有在對消息的順序有嚴格要求的情況下才能這么做。

消費者配置

fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,如果可用的數據量小于 fetch.min.bytes 指定的大小,那么它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率卻很高,那么就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值設置得大一點可以降低 broker 的工作負載。

fetch.max.wait.ms

我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才把它返回給消費者。而?feth.max.wait.ms?則用于指定 broker 的等待時間,默認是 500ms。如果沒有足夠的數據流入 Kafka,消費者獲取最小數據量的要求就得不到滿足,最終導致 500ms 的延遲。如果要降低潛在的延遲(為了滿足 SLA),可以把該參數值設置得小一些。如果?fetch.max.wait.ms?被設為 100ms,并且 fetch.min.bytes 被設為 1MB,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數據,要么在 100ms 后返回所有可用的數據,就看哪個條件先得到滿足。

max.partition.fetch.bytes

該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值是 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。如果一個主題有 20 個分區和 5 個消費者,那么每個消費者需要至少 4MB 的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組里有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另一個需要考慮的因素是消費者處理數據的時間。消費者需要頻繁調用 poll() 方法來避免會話過期和發生分區再均衡,如果單次調用 poll() 返回的數據太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

session.timeout.ms

該屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s。如果消費者沒有在?session.timeout.ms?指定的時間內發送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區分配給群組里的其他消費者。該屬性與?heartbeat.interval.ms?緊密相關。heartbeat.interval.ms?指定了 poll() 方法向協調器發送心跳的頻率,session.timeout.ms?則指定了消費者可以多久不發送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms?必須比?session.timeout.ms?小,一般是?session.timeout.ms?的三分之一。如果?session.timeout.ms?是 3s,那么?heartbeat.interval.ms?應該是 1s。把?session.timeout.ms?值設得比默認值小,可以更快地檢測和恢復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設置得大一些,可以減少意外的再均衡,不過檢測節點崩潰需要更長的時間。

auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時并被刪除)該作何處理。它的默認值是 latest ,意思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)。另一個值是 earliest ,意思是說,在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄。

enable.auto.commit

該屬性指定了消費者是否自動提交偏移量,默認值是 true 。為了盡量避免出現重復數據和數據丟失,可以把它設為 false ,由自己控制何時提交偏移量。如果把它設為 true ,還可以通過配置?auto.commit.interval.ms?屬性來控制提交的頻率。

partition.assignment.strategy

我們知道,分區會被分配給群組里的消費者。PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者。Kafka 有兩個默認的分配策略。可以通過設置partition.assignment.strategy 來選擇分區策略。默認使用的是 org.apache.kafka.clients.consumer.RangeAssignor ,這個類實現了 Range 策略,不過也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor 。我們還可以使用自定義策略,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字。
max.poll.records

Range
該策略會把主題的若干個連續的分區分配給消費者。假設消費者 C1 和消費者 C2 同時訂閱了主題 T1 和主題 T2,并且每個主題有 3 個分區。那么消費者 C1 有可能分配到這兩個主題的分區 0 和分區 1,而消費者 C2 分配到這兩個主題的分區 2。因為每個主題擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區。只要使用了 Range 策略,而且分區數量無法被消費者數量整除,就會出現這種情況。

RoundRobin
該策略把主題的所有分區逐個分配給消費者。如果使用 RoundRobin 策略來給消費者 C1 和消費者 C2 分配分區,那么消費者 C1 將分到主題 T1 的分區 0 和分區 2 以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 1 以及主題 T2 的分區 0 和分區 2。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見),RoundRobin 策略會給所有消費者分配相同數量的分區(或最多就差一個分區)。

max.poll.records

該屬性用于控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量。

receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩沖區也可以設置大小。如果它們被設為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處于不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

以上內容摘抄至:《Kafka權威指南》

總結

以上是生活随笔為你收集整理的Kafka调优的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。