kafka技术内幕(一)
文章目錄
- 概念
- 基本概念
- topic,partition
- 副本
- 生產者
- 重要組件
- 發送消息流程
- 元數據的更新
- 消費者
- 消費者與消費組
- 客戶端
- 訂閱主題與分區
- 消息消費
- 位移提交
- 控制或關閉消費
- 指定位移消費
- 再均衡
- 消費者攔截器
- 多線程實現
- 主題和分區
- 主題的管理
- KafkaAdminClient
- 分區的管理
- 選擇合適的分區數
- 日志存儲
- 文件目錄布局
- 日志格式
- 日志索引
- 偏移量索引
- 時間戳索引
- 日志清理
- 日志刪除
- 基于時間
- 基于日志大小
- 基于日志起始偏移量
- 日志壓縮
- 磁盤存儲
- 頁緩存
- 零拷貝
- 附錄
- 服務端參數配置
- **`server.properties`**配置
- 必配置屬性
- 參數說明
- Leader,replicas配置參數
- zookeeper參數配置
- listener配置
- 生產者客戶端參數
- 消費者客戶端參數
- 主題參數
概念
Kafka相比較其他消息中間件,扮演了三大角色:
- 消息系統: Kafka 和傳統的消息系統(也稱作消息中間件〉都具備系統解稿、冗余存儲、流量削峰、緩沖、異步通信、擴展性、可恢復性等功能。與此同時, Kafka 還提供了大多數消息系統難以實現的消息順序性保障及回溯消費的功能。
- 存儲系統: Kafka 把消息持久化到磁盤,相比于其他基于內存存儲的系統而言,有效地降低了數據丟失的風險。也正是得益于Kafka 的消息持久化功能和多副本機制,我們可以把Kafka 作為長期的數據存儲系統來使用
- 流式處理平臺: Kafka 不僅為每個流行的流式處理框架提供了可靠的數據來源,還提供了一個完整的流式處理類庫,比如窗口、連接、變換和聚合等各類操作。
基本概念
Kafka 體系架構包括以下概念:
- Producer:發布消息
- Consumer :消息消費。
- Broker:服務代理節點
- ZooKeeper :負責集群元數據的管理、控制器的選舉等操作的
- topic:主題。消息處理的歸類的單位。生產者負責將消息發送到特定的主題,而消費者負責訂閱主題并進行消費。
- partition:主題是個邏輯概念,可以細分為多個分區。
- Replica :副本,容災。
topic,partition
主題是一個邏輯上的概念, 它還可以細分為多個分區, 一個分區只屬于單個主題, 很多時候也會把分區稱為主題分區(Topic-Partition)。同一主題下的不同分區包含的消息是不同的,分區在存儲層面可以看作一個可追加的日志(Log)文件, 消息在被追加到分區日志、文件的時候都會分配一個特定的偏移量( offset)。offset 是消息在分區中的唯一標識, Kafka通過它來保證消息在分區內的順序性, 不過offset 并不跨越分區, 也就是說, Kafka保證的是分區有序而不是主題有序。
副本
Kafka 為分區引入了多副本( Replica ) 機制, 通過增加副本數量可以提升容災能力。同一分區的不同副本中保存的是相同的消息(在同一時刻,副本之間并非完全一樣),副本之間是“ 一主多從”的關系,其中**leader** 副本負責處理讀寫請求, follower 副本只負責與leader 副本的消息同步。副本處于不同的broker 中,當leader 副本出現故障時,從follower 副本中重新選舉新的leader 副本對外提供服務。Kafka 通過多副本機制實現了故障的自動轉移,當Kafka 集群中某個broker 失效時仍然能保證服務可用。
分區中的所有副本統稱為AR ( Assigned Replicas ) 。所有與leader 副本保持一定程度同步的副本(包括leader 副本在內〕組成**ISR** (In-Sync Replicas ) , ISR 集合是AR 集合中的一個子集。消息會先發送到leader副本,然后follower 副本才能從leader 副本中拉取消息進行同步,同步期間內follower 副本相對于leader 副本而言會有一定程度的滯后。前面所說的“ 一定程度的同步”是指可忍受的滯后范圍,這個范圍可以通過參數進行配置。與leader 副本同步滯后過多的副本(不包括leader 副本)組成**OSR** ( Out-of-Sync Replicas ),由此可見, AR=ISR+OSR 。在正常情況下, 所有的follower 副本都應該與leader 副本保持一定程度的同步,即AR=ISR,OSR 集合為空。
leader 副本負責維護和跟蹤ISR 集合中所有follower 副本的滯后狀態, 當follower 副本落后太多或失效時, leader 副本會把它從ISR 集合中剔除。如果OSR 集合中有follower 副本“追上’p了leader 副本,那么leader 副本會把它從OSR 集合轉移至ISR 集合。默認情況下, 當leader 副
本發生故障時,只有在ISR 集合中的副本才有資格被選舉為新的leader, 而在OSR 集合中的副本則沒有任何機會(不過這個原則也可以通過修改相應的參數配置來改變) 。
ISR 與HW 和LEO 也有緊密的關系。HW 是**High Watermark** 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量( offset ),消費者只能拉取到這個offset 之前的消息。LEO 是Log End Offset 的縮寫,它標識當前日志文件中下一條待寫入消息的offset ,
分區中各種偏移量的說明Kafka 的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower 副本都復制完,這條消息才會被確認為已成功提交,這種復制方式極大地影響了性能。而在異步復制方式下, follower 副本異步地從leader 副本中復制數
據,數據只要被leader 副本寫入就被認為已經成功提交。在這種情況下,如果follower 副本都還沒有復制完而落后于leader 副本,突然leader 副本著機,則會造成數據丟失。Kafka 使用的這種ISR 的方式則有效地權衡了數據可靠性和性能之間的關系。
生產者
重要組件
序列化器:用于對key和value進行序列化。
分區器:如果沒有指定分區字段,則通過分區器計算出partition值。
生產者攔截器:生產者攔截器既可以用來在消息發送前做一些準備工作, 比如按照某個規則過濾不符合要求的消息、修改消息的內容等, 也可以用來在發送回調邏輯前做一些定制化的需求,比如統計類工作。KafkaProducer 在將消息序列化和計算分區之前會調用生產者攔截器的onSend()方法來對消息進行相應的定制化操作
發送消息流程
生產者客戶端架構整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和Sender 線程(發送線程)。在主線程中由KafkaProducer 創建消息,然后通過可能的攔截器、序列化器和分區器的作用之后緩存到消息累加器( RecordAccumulator ,也稱為消息收集器〉中。Sender 線程負責從
RecordAccumulator 中獲取消息并將其發送到Kafka 中。
RecordAccumulator 主要用來緩存消息以便Sender 線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產者客戶端參數buffer.memory 配置,默認值為33554432B ,即32M。如果生產者發送消息的速度超過發送到服務器的速度,則會導致生產者空間不足,這個時候KafkaProducer 的send() 方法調用要么被阻塞,要么拋出異常,這個取決于參數max.block.ms 的配置,此參數的默認值為6 0000,即60 秒。
主線程中發送過來的消息都會被追加到RecordAccumulator 的某個雙端隊列( Deque )中,在RecordAccumulator 的內部為每個分區都維護了一個雙端隊列,隊列中的內容就是ProducerBatch ,即Deque<ProducerBatch>。消息寫入緩存時,追加到雙端隊列的尾部; Sender讀取消息時,從雙端隊列的頭部讀取。注意ProducerBatch 不是ProducerRecord, ProducerBatch中可以包含一至多個ProducerRecord 。通俗地說, ProducerRecord 是生產者中創建的消息,而ProducerBatch 是指一個消息批次, ProducerRecord 會被包含在ProducerBatch 中,這樣可以使字節的使用更加緊湊。與此同時,將較小的ProducerRecord 拼湊成一個較大的ProducerBatch ,也可以減少網絡請求的次數以提升整體的吞吐量。ProducerBatch 和消息的具體格式有關。如果生產者客戶端需要向很多分區發送消息, 則可以將buffer.memory 參數適當調大以增加整體的吞吐量。
消息在網絡上都是以字節(Byte)的形式傳輸的,在發送之前需要創建一塊內存區域來保存對應的消息。在Kafka 生產者客戶端中,通過java.io.ByteBuffer 實現消息內存的創建和釋放。不過頻繁的創建和釋放是比較耗費資源的,在RecordAccumulator 的內部還有一個BufferPool,
它主要用來實現ByteBuffer 的復用,以實現緩存的高效利用。不過BufferPool 只針對特定大小的ByteBuffer 進行管理,而其他大小的ByteBuffer 不會緩存進BufferPool 中,這個特定的大小由batch.size 參數來指定,默認值為16384B ,即16KB 。我們可以適當地調大batch.size參數以便多緩存一些消息。
ProducerBatch 的大小和batch.size 參數也有著密切的關系。當一條消息( ProducerRecord )流入RecordAccumulator 時,會先尋找與消息分區所對應的雙端隊列(如果沒有則新建),再從這個雙端隊列的尾部獲取一個ProducerBatch (如果沒有則新建),查看ProducerBatch 中是否
還可以寫入這個ProducerRecord ,如果可以則寫入,如果不可以則需要創建一個新的ProducerBatch 。在新建ProducerBatch 時評估這條消息的大小是否超過batch.size 參數的大小,如果不超過,那么就以batch.size 參數的大小來創建ProducerBatch,這樣在使用完這段內存區域之后,可以通過BufferPool 的管理來進行復用;如果超過,那么就以評估的大小來創建ProducerBatch , 這段內存區域不會被復用。
Sender 從RecordAccumulator 中獲取緩存的消息之后,會進一步將原本<分區, Deque<Producer Batch>>的保存形式轉變成<Node, List< ProducerBatch>>的形式,其中Node 表示Kafka集群的broker 節點。對于網絡連接來說,生產者客戶端是與具體的broker 節點建立的連接,也
就是向具體的broker 節點發送消息,而并不關心消息屬于哪一個分區;而對于KafkaProducer的應用邏輯而言,我們只關注向哪個分區中發送哪些消息,所以在這里需要做一個應用邏輯層面到網絡I/O層面的轉換。
在轉換成<Node, List<ProducerBatch>>的形式之后, Sender 還會進一步封裝成<Node,Request>的形式,這樣就可以將Request 請求發往各個Node了, 這里的Request 是指Kafka 的各種協議請求,對于消息發送而言就是指具體的ProduceRequest。
請求在從Sender 線程發往Kafka 之前還會保存到InFlightRequests 中, InFlightRequests 保存對象的具體形式為**Map<Nodeld, Deque<Request>>,它的主要作用是緩存了已經發出去但還沒有收到響應的請求**( Nodeld 是一個String 類型,表示節點的id 編號)。與此同時,InFlightRequests 還提供了許多管理類的方法,并且通過配置參數還可以限制每個連接(也就是客戶端與Node 之間的連接)最多緩存的請求數。這個配置參數為max.in.flight.requests.per.connection ,默認值為5 ,即每個連接最多只能緩存5 個未響應的請求,超過該數值之后就不能再向這個連接發送更多的請求了,除非有緩存的請求收到了響應( Response )。通過比較Deque<Request>的size 與這個參數的大小來判斷對應的Node 中是否己經堆積了很多未響應的消息,如果真是如此,那么說明這個Node 節點負載較大或網絡連接有問題,再繼續向其發送請求會增大請求超時的可能。
元數據的更新
InFlightRequests 還可以獲得leastLoadedNode ,即所有Node 中負載最小的那一個。這里的負載最小是通過每個Node 在InFlightRequests 中還未確認的請求決定的,未確認的請求越多則認為負載越大。
KafkaProducer 需要知道目標分區的leader 副本所在的broker 節點的地址、端口等信息才能建立連接,最終才能將消息發送到Kafka,在這一過
程中所需要的信息都屬于元數據信息。bootstrap.servers 參數只需要配置部分broker 節點的地址即可,不需要配置所有broker 節點的地址,因為客戶端可以自己發現其他broker 節點的地址, 這一過程也屬于元數據相關的更新操作。與此同時,分區數量及leader 副本的分布都會動態地變化, 客戶端也需要動態地捕捉這些變化。
元數據是指Kafka 集群的元數據,這些元數據具體記錄了集群中有哪些主題,這些主題有哪些分區,每個分區的leader 副本分配在哪個節點上, follower 副本分配在哪些節點上,哪些副本在AR 、ISR 等集合中,集群中有哪些節點,控制器節點又是哪一個等信息。當客戶端中沒有需要使用的元數據信息時,比如沒有指定的主題信息,或者超過metadata.max.age.ms 時間沒有更新元數據都會引起元數據的更新操作。客戶端參數
metadata.rnax.age.ms 的默認值為300000 ,即5 分鐘。元數據的更新操作是在客戶端內部進行的,對客戶端的外部使用者不可見。當需要更新元數據時,會先挑選出leastLoadedNode,然后向這個Node 發送MetadataRequest 請求來獲取具體的元數據信息。這個更新操作是由Sender線程發起的, 在創建完MetadataRequest 之后同樣會存入InFlightRequests ,之后的步驟就和發送消息時的類似。元數據雖然由Sender 線程負責更新,但是主線程也需要讀取這些信息,這里的數據同步通過synchronized 和final 關鍵字來保障。
消費者
消費者與消費組
每一個分區只能被一個消費組中的一個消費者所消費。如果消費者過多,出現了消費者的個數大于分區個數的清況,就會有消費者分配不到任何分區。默認分配邏輯都是基于默認的分區分配策略進行分析的,可以通過消費者客戶端參數partition.assignment.strategy來設置消費者與訂閱主題之間的分區分配策略。
一般有兩種消息投遞模式:點對點(P2P, Point-to-Point)模式和發布/訂閱(Pub/Sub)模式。Kaflca 同時支待兩種消息投遞模式,而這正是得益千消費者與消費組模型的契合:
? 如果所有的消費者都隸屬于同一個消費組,那么所有的消息都會被均衡地投遞給每一個消費者,即每條消息只會被一個消費者處理,這就相當于點對點模式的應用。
? 如果所有的消費者都隸屬于不同的消費組,那么所有的消息都會被廣播給所有的消費者,即每條消息會被所有的消費者處理,這就相當于發布/訂閱模式的應用。
消費組是一個邏輯上的概念, 它將旗下的消費者歸為一類,每一個消費者只隸屬于一個消費組。每一個消費組都會有一個固定的名稱,消費者在進行消費前需要指定其所屬消費組的名稱,這個可以通過消費者客戶端參數group.id來配置,默認值為空字符串。
消費者并非邏輯上的概念,它是實際的應用實例,它可以是一個線程,也可以是一個進程。同一個消費組內的消費者既可以部署在同一臺機器上, 也可以部署在不同的機器上。
客戶端
訂閱主題與分區
一個消費者可以訂閱一個或多個主題。如果前后兩次訂閱了不同的主題, 那么消費者以最后一次的為準。如果消費者采用的是正則表達式的方式(subscribe(Pattern))訂閱, 在之后的過程中, 如果有人又創建了新的主題,并且主題的名字與正則表達式相匹配,那么這個消費者就可以消費到新添加的主題中的消息。如果應用程序需要消費多個主題,并且可以處理不同的類型,那么這種訂閱方式就很有效。
消費者不僅可以通過KafkaConsumer.subscribe()方法訂閱主題,還可以直接訂閱某些主題的特定分區,在KafkaConsumer中還提供了一個assign()方法來實現這些功能。,可以使用KafkaConsumer中的unsubscribe ()方法來取消主題的訂閱。如果將subscribe(Collection)或assign(Collection)中的集合參數設置為空集合,那么作用等同于unsubscribe()方法。
集合訂閱的方式subscribe(Collection)、正則表達式訂閱的方式subscribe(Pattem)和指定分區的訂閱方式assign(Collection)分表代表了三種不同的訂閱狀態: AUTO_OPICS 、AUTO_PATTERN和USER_ASSIGNED (如果沒有訂閱,那么訂閱狀態為NONE)。然而這三種狀態是互斥的,在一個消費者中只能使用其中的一種,否則會報出IllegalStateException異常:
通過subscribe()方法訂閱主題具有消費者自動再均衡的功能,而通過assign()方法訂閱分區時,是不具備消費者自動均衡的功能的,兩種類型的subscribe()都有ConsumerRebalanceListener類型參數的方法,而assign()方法卻沒有。
消息消費
Kafka中的消費是基于拉模式的。消息的消費一般有兩種模式:推模式和拉模式。推模式是服務端主動將消息推送給消費者, 而拉模式是消費者主動向服務端發起請求來拉取消息。Kafka中的消息消費是一個不斷輪詢的過程,消費者所要做的就是**重復地調用poll()**方法,而poll()方法返回的是所訂閱的主題(分區)上的一組消息。
poll()方法的返回值類型是ConsumerRecords, 它用來表示一次拉取操作所獲得的消息集,內部包含了若干ConsumerRecord, 它提供了一個iterator()方法來循環遍歷消息集內部的消息,poll ()方法其內部邏輯涉及消費位移、消費者協調器、組協調器、消費者的選舉、分區分配的分發、再均衡的邏輯、心跳等內容。
位移提交
Kafka 中的分區的每條消息都有唯一的offset ,用來表示消息在分區中對應的位置。對于消費者而言, 它也有一個offset 的概念,消費者使用offset 來表示消費到分區中某個消息所在的位置。
在每次調用poll()方法時,它返回的是還沒有被消費過的消息集(當然這個前提是消息己經存儲在Kafka 中了,并且暫不考慮異常情況的發生) , 要做到這一點,就需要記錄上一次消費時的消費位移。并且這個消費位移必須做持久化保存,而不是單單保存在內存中,否則消費者重啟之后就無法知曉之前的消費位移。再考慮一種情況,當有新的消費者加入時,那么必然會有再均衡的動作, 對于同一分區而言,它可能在再均衡動作之后分配給新的消費者, 如果不持久化保存消費位移,那么這個新的消費者也無法知曉之前的消費位移。
在舊消費者客戶端中,消費位移是存儲在ZooKeeper 中的。而在新消費者客戶端中,消費位移存儲在Kafka 內部的主題__consumer_offsets 中。這里把將消費位移存儲起來(持久化)的動作稱為“提交’ ,消費者在消費完消息之后需要執行消費位移的提交。當前消費者需要提交的消費位移表示下一條需要拉取的消息的位置。
3個offset:
- position:下一次需要拉取的消息位置
- lastConsumedOffset:消費的最后一條消息的offset。
- committed offset:已經提交過的消費位移。
一般有:position = committed offset = lastConsumedOffset + 1
在Kafka 中默認的消費位移的提交方式是自動提交,這個由消費者客戶端參數enable.auto.commit 配置,默認值為true。當然這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數auto.commit.interval.ms配置,默認值為5 秒,此參數生效的前提是enable.auto.commit 參數為true 。
在默認的方式下,消費者每隔5 秒會將拉取到的每個分區中最大的消息位移進行提交。自動位移提交的動作是在poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移。
在Kafka 中還提供了手動位移提交的方式,手動提交可以細分為同步提交和異步提交,對應于KafkaConsumer 中的commitSync()和
commitAsync()兩種類型的方法。offset信息是在partitionRecord中通過offset()方法獲取。
控制或關閉消費
KafkaConsumer中使用pause()和resume()方法來分別實現暫停某些分區在拉取操作時返回數據給客戶端和恢復某些分區向客戶端返回數據的操作。還有一種方式是調用KafkaConsumer 的wakeup ()方法,wakeup ()方法是KafkaConsumer 中唯一可以從其他線程里安全調用的方法( KafkaConsumer 是非線程安全的〉,調用wakeup()方法后可以退出poll() 的邏輯,并拋出WakeupException 的異常, 我們也不需要處理WakeupException 的異常,它只是一種跳出循環的方式。跳出循環以后一定要顯式地執行關閉動作以釋放運行過程中占用的各種系統資源,包括內
存資源、Socket 連接等。KafkaConsumer 提供了close()方法來實現關閉。
指定位移消費
在Kafka 中每當消費者查找不到所記錄的消費位移時, 就會根據消費者客戶端參數auto.offset.reset 的配置來決定從何處開始進行消費,這個參數的默認值為“ latest ”,表示從分區末尾開始消費消息。如果將auto.offset.reset參數配置為“ earliest”,那么消費者會從起始處開始消費。
除了查找不到消費位移,位移越界也會觸發auto.offset.reset 參數的執行。
auto.offset.reset 參數還有一個可配置的值:“ none ”,配置為此值就意味著出現查到不到消費位移的時候,既不從最新的消息位置處開始消費,也不從最早的消息位置處開始消費,此時會報出NoOffsetForPartitionException 異常,
KafkaConsumer 中的seek()方法讓我們從特定的位移處開始拉取消息。
再均衡
在再均衡發生期間, 消費組內的消費者是無法讀取消息的。也就是說, 在再均衡發生期間的這一小段時間內, 消費組會變得不可用。
消費者攔截器
消費者攔截器主要在消費到消息或在提交消費位移時進行一些定制化的操作。與生產者攔截器對應的, 消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.Consumerlnterceptor接口
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) public void close()KaflcaConsumer會在poll()方法返回之前調用攔截器的onConsume()方法來對消息進行相應的定制化操作,比如修改返回的消息內容、按照某種規則過濾消息(可能會減少poll()方法返回的消息的個數)。如果onConsume()方法中拋出異常, 那么會被捕獲并記錄到日志中, 但是異常不會再向上傳遞。KaflcaConsumer會在提交完消費位移之后調用攔截器的onCommit()方法, 可以使用這個方法來記錄跟蹤所提交的位移信息,比如當消費者使用cornmitSync的無參方法時,我們不知道提交的消費位移的具體細節, 而使用攔截器的onCornmit()方法卻可以做到這一點。
多線程實現
KatkaProducer是線程安全的, 然而KafkaConsumer卻是非線程安全的。KafkaConsumer中定義了一個acquire()方法, 用來檢測當前是否只有一個線程在操作, 若有其他線程正在操作則會拋出ConcurrentModifcationException異常。
acquire()方法和我們通常所說的鎖(synchronized、Lock等)不同,它不會造成阻塞等待,我們可以將其看作一個輕量級鎖,它僅通過線程操作計數標記的方式來檢測線程是否發生了并發操作,以此保證只有一個線程在操作。acquire()方法和release()方法成對出現,表示相應的加鎖和解鎖操作。
一個線程對應一個KafkaConsumer實例, 我們可以稱之為消費線程。一個消費線程可以消費一個或多個分區中的消息, 所有的消費線程都隸屬于同一個消費組。這種實現方式的并發度受限于分區的實際個數,當消費線程的個數大于分區數時, 就有部分消費線程一直處于空閑的狀態。
主題和分區
主題的管理
kafka-topics.sh工具。略
KafkaAdminClient
分區的管理
選擇合適的分區數
日志存儲
文件目錄布局
Kafka 中的消息是以主題為基本單位進行歸類的, 各個主題在邏輯上相互獨立。每個主題又可以分為一個或多個分區, 分區的數量可以在主題創建的時候指定,也可以在之后修改。每條消息在發送的時候會根據分區規則被追加到指定的分區中, 分區中的每條消息都會被分配一個唯一的序列號, 也就是通常所說的偏移量(offset )。一個分區對應一個日志(Log)。為了防止Log過大,Kafka又引入了日志分段(LogSegment)的概念, 將Log切分為多個LogSegment, 相當于一個巨型文件被平均分配為多個相對較小的文件, 這樣也便于消息的維護和清理。事實上, Log 和LogSegnient也不是純粹物理意義上的概念, Log在物理上只以文件夾的形式存儲, 而每個LogSegment對應于磁盤上的一個日志文件和兩個索引文件, 以及可能的其他文件(比如以" . txnindex"為后綴的事務索引文件)。
日志文件目錄結構向Log中追加消息時是順序寫入的, 只有最后一個LogSegment才能執行寫入操作, 在此之前所有的LogSegment都不能寫入數據。為了方便描述, 我們將最后一個LogSegment稱為"activeSegment" , 即表示當前活躍的日志分段。隨著消息的不斷寫入, 當activeSegment滿足一定的條件時, 就需要創建新的activeSegment, 之后追加的消息將寫入新的activeSegment。
為了便于消息的檢索,每個LogSegment中的日志文件(以" .log"為文件后綴)都有對應的兩個索引文件:偏移量索引文件(以".index"為文件后綴)和時間戳索引文件(以". timeindex"為文件后綴) 。每個LogSegment都有一個基準偏移量baseOffset, 用來表示當前LogSegment中第一條消息的offset。偏移量是一個64位的長整型數,日志文件和兩個索引文件都是根據基準偏移量(baseOffset)命名的, 名稱固定為20 位數字,沒有達到的位數則用0填充。比如第一個LogSegment的基準偏移量為0, 對應的日志文件為00000000000000000000.log。
注意:
每個LogSegment中不只包含" .log"".index" “.timeindex"這3種文件, 還可能包含”. deleted " " .cleaned " " .swap “等臨時文件, 以及可能的”. snapshot",".txnindex “,” leader-epoch-checkpoint"等文件。
當一個Katka服務第一次啟動的時候, 默認的根目錄下就會創建以下5個文件:
cleaner-offset-checkpoint,log-start-offset-checkpoint,meta.properties,recovery-point-offset-checkpoint,replication-offset-checkpoint
日志格式布局日志格式
略
日志索引
每個日志分段文件對應了兩個索引文件,主要用來提高查找消息的效率。偏移量索引文件用來建立消息偏移量( offset )到物理地址之間的映射關系,方便快速定位消息所在的物理文件位置;時間戳索引文件則根據指定的時間戳( timestamp )來查找對應的偏移量信息。
Kafka 中的索引文件以稀疏索引( sparse index )的方式構造消息的索引,它并不保證每個消息在索引文件中都有對應的索引頁。每當寫入一定量(由broker 端參數log.index.interval.bytes 指定,默認值為4096 ,即4KB )的消息時,偏移量索引文件和時間戳索引文件分別增加一個偏移量索引項和時間戳索引項,增大或減小log.index.interval.bytes的值,對應地可以增加或縮小索引項的密度。
稀疏索引通過MappedByteBuffer 將索引文件映射到內存中,以加快索引的查詢速度。2個索引文件都是單調遞增的。
日志分段文件達到一定的條件時需要進行切分,那么其對應的索引文件也需要進行切分。日志分段文件切分包含以下幾個條件,滿足其一即可。
( 1) 當前日志分段文件的大小超過了broker 端參數log.segment.bytes 配置的值。默認值為1073741824 ,即 1 GB 。
(2 )當前日志分段中消息的最大時間戳與當前系統的時間戳的差值大于log.roll.ms或log.roll.hours 參數配置的值。如果同時配置了log.roll.ms 和log.roll.hours 參數,那么log.roll.ms 的優先級高。默認情況下,只配置了log.roll.hours參數,其值為168,即7 天。
(3 )偏移量索引文件或時間戳索引文件的大小達到broker 端參數log.index.size.max.bytes 配置的值。默認值為10485760 ,即 10 MB 。
(4 )追加的消息的偏移量與當前日志分段的偏移量之間的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能轉變為相對偏移量( offset - baseOffset > Integer.MAX_VALUE )。
對非當前活躍的日志分段而言,其對應的索引文件內容己經固定而不需要再寫入索引項,所以會被設定為只讀。
偏移量索引
偏移量索引項的格式如圖5 - 8 所示。每個索引項占用8 個字節,分為兩個部分。
( 1 ) relativeOffset:相對偏移量,表示消息相對于baseOffset 的偏移量,占用4 個字節,當前索引文件的文件名即為baseOffset 的值。
( 2) position :物理地址,也就是消息在日志分段文件中對應的物理位置,占用4 個字節。
時間戳索引
每個索引項占用12 個字節,分為兩個部分。
( 1 ) timestamp : 當前日志分段最大的時間戳。
( 2) relativeOffset :時間戳所對應的消息的相對偏移量。(需要通過偏移量offset找到物理位置)
我們己經知道每當寫入一定量的消息時, 就會在偏移量索引文件和時間戳索引文件中分別增加一個偏移量索引項和時間戳索引項。兩個文件增加索引項的操作是同時進行的,但并不意味著偏移量索引中的relativeOffset 和時間戳索引項中的relativeOffset 是同一個值。
日志清理
Kafka 中每一個分區副本都對應一個Log, 而Log又可以分為多個日志分段, 這樣也便于日志的清理操作。Kafka提供了兩種日志清理策略。
(1)日志刪除(Log Retention) : 按照一定的保留策略直接刪除不符合條件的日志分段。
(2)日志壓縮(Log Compaction) : 針對每個消息的key進行整合, 對于有相同key的不同value 值, 只保留最后一個版本。
我們可以通過broker端參數log.cleanup.policy來設置日志清理策略,此參數的默認值為"delete " , 即采用日志刪除的清理策略。如果要采用日志壓縮的清理策略, 就需要將log.cleanup.policy設置為"compact" , 并且還需要將log.cleaner.enable (默認值為true )設定為true。通過將log.cleanup.policy參數設置為"delete,compact" , 還可以同時支持日志刪除和日志壓縮兩種策略。日志清理的粒度可以控制到主題級別, 比如與
log.cleanup.policy對應的主題級別的參數為cleanup.policy。
日志刪除
在Kafka 的日志管理器中會有一個專門的日志刪除任務來周期性地檢測和刪除不符合保留條件的日志分段文件,這個周期可以通過broker端參數log.retention.check.interval.ms來配置,默認值為300000, 即5分鐘。當前日志分段的保留策略有3 種:基于時間的保留策略、基于日志大小的保留策略和基于日志起始偏移量的保留策略。
基于時間
日志刪除任務會檢查當前日志文件中是否有保留時間超過設定的闕值(retentionMs)來尋找可刪除的日志分段文件集合(deletable Segments), 如圖5-13 所示。retentionMs可以通過broker端參數log.retention.hours、log.retentiion.minutes和log.retention.ms來配置, 優先級依次增加。
查找過期的日志分段文件,并不是簡單地根據日志分段的最近修改時間lastModifiedTime來計算的, 而是根據日志分段中最大的時間戳largestTimeStamp來計算的。largestTimeStamp從時間戳索引文件中,查找最大值。
刪除日志分段時,首先會從Log對象中所維護日志分段的跳躍表中移除待刪除的日志分段,以保證沒有線程對這些日志分段進行讀取操作。然后將日志分段所對應的所有文件添加上".deleted"的后綴(當然也包括對應的索引文件)。最后交由一個以"delete-file"命名的延遲任務來刪除這些以 ".deleted "為后綴的文件, 這個任務的延遲執行時間可以通過file.delete.delay.ms參數來調配, 此參數的默認值為60000 , 即1 分鐘。
基于日志大小
日志刪除任務會檢查當前日志的大小是否超過設定的閥值(retentionSize)來尋找可刪除的日志分段的文件集合(deletableSegments), 如圖5-14所示。retentionSize可以通過broker端參數log.retention.bytes來配置,默認值為-1 , 表示無窮大。注意log.retention.bytes配置的是Log中所有日志文件的總大小, 而不是單個日志分段(確切地說應該為log 日志文件)的大小。單個日志分段的大小由broker 端參數log.segment.bytes 來限制, 默認值為1073741824, 即 1 GB。
基于日志起始偏移量
一般情況下, 日志文件的起始偏移量logStartOffset 等于第一個日志分段的baseOffset, 但這并不是絕對的, logStartOffset 的值可以通過DeleteRecordsRequest請求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh腳本)、日志的清理和截斷等操作進行修改。
基于日志起始偏移量的保留策略的判斷依據是某日志分段的下一個日志分段的起始偏移量baseOffset 是否小于等于logStartOffset, 若是, 則可以刪除此日志分段。(logStartOffset通過其他途徑被修改了)
日志壓縮
略
磁盤存儲
頁緩存
Kafka 中大量使用了頁緩存, 這是Kafka 實現高吞吐的重要因素之一。雖然消息都是先被寫入頁緩存, 然后由操作系統負責具體的刷盤任務的, 但在Kafka中同樣提供了同步刷盤及間斷性強制刷盤( fsync )的功能,這些功能可以通過log.flush.interval.messages 、log.flush.interval.ms 等參數來控制。同步刷盤可以提高消息的可靠性,防止由于機器掉電等異常造成處于頁緩存而沒有及時寫入磁盤的消息丟失。
零拷貝
略
附錄
服務端參數配置
在config目錄下有以下配置文件:
connect-console-sink.properties connect-console-source.properties connect-distributed.properties connect-file-sink.properties connect-file-source.properties connect-log4j.properties connect-mirror-maker.properties connect-standalone.properties consumer.properties log4j.properties producer.properties server.properties tools-log4j.properties trogdor.conf zookeeper.properties**server.properties**配置
必配置屬性
默認必須配置的屬性如下:
broker.id=0 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 log.retention.hours=168log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=falsezookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=1000000參數說明
| broker.id=0 | 每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況 |
| log.dirs=/data/kafka-logs | kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分布在不同磁盤上可以提高讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2 |
| port =9092 | broker server服務端口 |
| message.max.bytes =6525000 | 表示消息體的最大大小,單位是字節 |
| num.network.threads =4 | broker處理消息的最大線程數,一般情況下數量為cpu核數 |
| num.io.threads =8 | broker處理磁盤IO的線程數,數值為cpu核數2倍 |
| background.threads =4 | 一些后臺任務處理的線程數,例如過期消息文件的刪除等,一般情況下不需要去做修改 |
| queued.max.requests =500 | 等待IO線程處理的請求隊列最大數,若是等待IO的請求超過這個數值,那么會停止接受外部消息,應該是一種自我保護機制。 |
| hostname | broker的主機地址,若是設置了,那么會綁定到這個地址上,若是沒有,會綁定到所有的接口上,并將其中之一發送到ZK,一般不設置 |
| socket.send.buffer.bytes=100*1024 | socket的發送緩沖區,socket的調優參數SO_SNDBUFF |
| socket.receive.buffer.bytes =100*1024 | socket的接受緩沖區,socket的調優參數SO_RCVBUFF |
| socket.request.max.bytes =100*1024*1024 | socket請求的最大數值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,會被topic創建時的指定參數覆蓋 |
| log.segment.bytes =1024*1024*1024 | topic的分區是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic創建時的指定參數覆蓋 |
| log.roll.hours =24*7 | 這個參數會在日志segment沒有達到log.segment.bytes設置的大小,也會強制新建一個segment會被 topic創建時的指定參數覆蓋 |
| log.cleanup.policy = delete | 日志清理策略選擇有:delete和compact主要針對過期數據的處理,或是日志文件達到限制的額度,會被 topic創建時的指定參數覆蓋 |
| log.retention.minutes=300 或 log.retention.hours=24 | 數據文件保留多長時間, 存儲的最大時間超過這個時間會根據log.cleanup.policy設置數據清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一個達到要求,都會執行刪除有2刪除數據文件方式:按照文件大小刪除:log.retention.bytes,按照2中不同時間粒度刪除:分別為分鐘,小時 |
| log.retention.bytes=-1 | topic每個分區的最大文件大小,一個topic的大小限制 = 分區數*log.retention.bytes。-1沒有大小限log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic創建時的指定參數覆蓋 |
| log.retention.check.interval.ms=5minutes | 文件大小檢查的周期時間,是否處罰 log.cleanup.policy中設置的策略 |
| log.cleaner.enable=false | 是否開啟日志清理 |
| log.cleaner.threads = 2 | 日志清理運行的線程數 |
| log.cleaner.io.max.bytes.per.second=None | 日志清理時候處理的最大大小 |
| log.cleaner.dedupe.buffer.size=500*1024*1024 | 日志清理去重時候的緩存空間,在空間允許的情況下,越大越好 |
| log.cleaner.io.buffer.size=512*1024 | 日志清理時候用到的IO塊大小一般不需要修改 |
| log.cleaner.io.buffer.load.factor =0.9 | 日志清理中hash表的擴大因子一般不需要修改 |
| log.cleaner.backoff.ms =15000 | 檢查是否觸發日志清理的間隔 |
| log.cleaner.min.cleanable.ratio=0.5 | 日志清理的頻率控制,越大意味著更高效的清理,同時會存在一些空間上的浪費,會被topic創建時的指定參數覆蓋 |
| log.cleaner.delete.retention.ms =1 day | 對于壓縮的日志保留的最長時間,也是客戶端消費消息的最長時間,同log.retention.minutes的區別在于一個控制未壓縮數據,一個控制壓縮后的數據。會被topic創建時的指定參數覆蓋 |
| log.index.size.max.bytes =10*1024*1024 | 對于segment日志的索引文件大小限制,會被topic創建時的指定參數覆蓋 |
| log.index.interval.bytes=4096 | 當執行一個fetch操作后,需要一定的空間來掃描最近的offset大小,設置越大,代表掃描速度越快,但是也更好內存,一般情況下不需要搭理這個參數 |
| log.flush.interval.messages=None | 例如log.flush.interval.messages=1000表示每當消息記錄數達到1000時flush一次數據到磁盤,log文件”sync”到磁盤之前累積的消息條數,因為磁盤IO操作是一個慢操作,但又是一個”數據可靠性"的必要手段,所以此參數的設置,需要在"數據可靠性"與"性能"之間做必要的權衡.如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導致"fsync"的次數較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的消息丟失 |
| log.flush.scheduler.interval.ms=3000 | 檢查是否需要固化到硬盤的時間間隔 |
| log.flush.interval.ms= None | 例如:log.flush.interval.ms=1000,表示每間隔1000毫秒flush一次數據到磁盤僅僅通過interval來控制消息的磁盤寫入時機,是不足的.此參數用于控制"fsync"的時間間隔,如果消息量始終沒有達到閥值,但是離上一次磁盤同步的時間間隔達到閥值,也將觸發. |
| log.delete.delay.ms=60000 | 文件在索引中清除后保留的時間一般不需要去修改 |
| log.flush.offset.checkpoint.interval.ms=60000 | 控制上次固化硬盤的時間點,以便于數據恢復一般不需要去修改 |
| auto.create.topics.enable =true | 是否允許自動創建topic,若是false,就需要通過命令創建topic |
| default.replication.factor =1 | 對replica的數目進行配置,默認值為1,表示不對topic進行備份 |
| num.partitions =1 | 每個topic的分區個數,若是在topic創建時候沒有指定的話會被topic創建時的指定參數覆蓋 |
Leader,replicas配置參數
| controller.socket.timeout.ms=30000 | partition leader與replicas之間通訊時,socket的超時時間 |
| controller.message.queue.size=10 | partition leader與replicas數據同步時,消息的隊列尺寸 |
| replica.lag.time.max.ms=10000 | replicas響應partition leader的最長等待時間,若是超過這個時間,就將replicas列入ISR(in-sync replicas),并認為它是死的,不會再加入管理中 |
| replica.lag.max.messages =4000 | 如果follower落后與leader太多,將會認為此follower[或者說partition relicas]已經失效.##通常,在follower與leader通訊時,因為網絡延遲或者鏈接斷開,總會導致replicas中消息同步滯后##如果消息之后太多,leader將認為此follower網絡延遲較大或者消息吞吐能力有限,將會把此replicas遷移到其他follower中.##在broker數量較少,或者網絡不足的環境中,建議提高此值. |
| replica.socket.timeout.ms=30*1000 | follower與leader之間的socket超時時間 |
| replica.socket.receive.buffer.bytes=64*1024 | leader復制時候的socket緩存大小 |
| replica.fetch.max.bytes =1024*1024 | replicas每次獲取數據的最大大小 |
| replica.fetch.wait.max.ms=500 | replicas同leader之間通信的最大等待時間,失敗了會重試 |
| replica.fetch.min.bytes =1 | fetch的最小數據尺寸,如果leader中尚未同步的數據不足此值,將會阻塞,直到滿足條件 |
| num.replica.fetchers=1 | leader進行復制的線程數,增大這個數值會增加follower的IO |
| replica.high.watermark.checkpoint.interval.ms=5000 | 每個replica檢查是否將最高水位進行固化的頻率 |
| controlled.shutdown.enable =false | 是否允許控制器關閉broker ,若是設置為true,會關閉所有在這個broker上的leader,并轉移到其他broker |
| controlled.shutdown.max.retries =3 | 控制器關閉的嘗試次數 |
| controlled.shutdown.retry.backoff.ms=5000 | 每次關閉嘗試的時間間隔 |
| leader.imbalance.per.broker.percentage =10 | leader的不平衡比例,若是超過這個數值,會對分區進行重新的平衡 |
| leader.imbalance.check.interval.seconds =300 | 檢查leader是否不平衡的時間間隔 |
| offset.metadata.max.bytes | 客戶端保留offset信息的最大空間大小 |
zookeeper參數配置
| zookeeper.connect = localhost:2181 | zookeeper集群的地址,可以是多個,多個之間用逗號分割hostname1:port1,hostname2:port2,hostname3:port3 |
| zookeeper.session.timeout.ms=6000 | ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反應,那么認為已經死了,不易過大 |
| zookeeper.connection.timeout.ms=6000 | ZooKeeper的連接超時時間 |
| zookeeper.sync.time.ms=2000 | ZooKeeper集群中leader和follower之間的同步時間 |
listener配置
| listeners | 服務監聽地址,如果沒有配置,則使用java.net.InetAddress.getCanonicalHostName()的返回值。 格式:listeners = listener_name://host_name:port, 示例:listeners = PLAINTEXT://your.host.name:9092 |
| advertised.listeners | 用于外部訪問的地址。即注冊到zookeeper,其他服務能夠訪問。 格式: listener_name://host_name:port, 示例: PLAINTEXT://your.host.name:9092 |
| inter.broker.listener.name | 專門用于Kafka集群中Broker之間的通信,name即通過listeners配置的監聽名稱。 |
| listener.security.protocol.map | 配置監聽者的安全協議的,比如PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL |
示例:
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT listeners=EXTERNAL://192.168.11.103:9092,INTERNAL://192.168.11.103:9093 inter.broker.listener.name=INTERNAL #advertised.listeners=EXTERNAL://192.168.11.103:9094,INTERNAL://192.168.11.103:9093#listeners:是kafka真正bind的地址 #advertised.listeners:是暴露給外部的listeners,如果沒有設置,會用listeners生產者客戶端參數
參考:http://kafka.apache.org/documentation/#producerconfigs
| bootstrap.servers | |
| key.serializer | 指定key序列化操作的序列化器,無默認值。類的全限定名。 |
| value.serializer | 指定value序列化操作的序列化器,無默認值。類的全限定名。 |
| client.id | 設定kafkaProducer對應的客戶端id,默認值為“”,如果不設置,會自動生成一個非空字符串,內容形式如:“producer-1”,“producer-2”… |
| retries | 配置生產者重試次數,對于可重試異常,那么只要在規定的次數內自行恢復了,就不會拋出異常,默認是0。 |
| retry.backoff.ms | 用來設定兩次重試之間的時間間隔,默認值100 ms。 |
| partitioner.class | 顯示配置使用哪個分區器。 |
| interceptor.classes | 指定自定義攔截器,多個傳List集合。 |
| buffer.memory | 生產者客戶端RecordAccumulator緩存大小,默認值為33554432B,即32M。 |
| batch.size | ProducerBatch可以復用內存區域的大小 |
| max.block.ms | 最大阻塞時間,RecordAccumulator緩存不足時或者沒有可用的元數據時,KafkaProducer的send()方法調用要么被阻塞,要么拋出異常,此參數的默認值為60000,即60s。 |
| metadata.max.age.ms | 當客戶端超過這個時間間隔時就會更新元數據信息默認值300000,即5分鐘。元數據指集群中有哪些主題,主題有哪些分區,每個分區leader副本在哪個節點上,follower副本在哪個節點上,哪些副本在AR,ISR等集合中,集群中有哪些節點等等。 |
| acks | 用來指定必須要多少個副本收到這條消息,之后生產者才會認為這條消息成功寫入。acks參數有三種類型的值:字符串類型,不是整型。 1:只要分區的leader副本寫入成功,生產者就會收到來自服務端的成功響應。如果再被其它follower副本拉取前leader副本崩潰,那么此時消息還是會丟失。 0:生產者發送消息之后不需要等待任何服務端的響應。如果在消息發送到寫入kafka的過程中出現了某些異常,導致kafka并沒有收到這條消息,那么生產者也無從得知,消息會丟失。 -1或者all:生產者發送消息之后,需要等待ISR中所有副本成功寫入消息之后才能收到來自服務端的成功響應。 |
| max.request.size | 用來限制生產者客戶端能發送的消息的最大值,默認值為1048576B,即1MB。這個參數涉及到其它參數的聯動,比如broker端的message.max.bytes參數。對kafka沒有足夠把控的時候不要更改此參數。 |
| compression.type | 指定消息的壓縮方式,默認值為"none",可以配置為"gzip",“snappy”和“lz4”。 |
| connections.max.idle.ms | 用來指定多久之后關閉閑置的連接,默認值540000(ms),即9min |
| linger.ms | 生產者發送ProducerBatch之前等待更多消息(ProducerRecoder)加入ProducerBatch的時間,默認值為0。生產者客戶端會在ProducerBatch被填滿或者等待時間超過linger.ms時發送出去。增大這個參數的值會增加消息的延遲,但同時會提高吞吐量。 |
| receive.buffer.bytes | 用來設置socket接收緩沖區的大小,默認值為32768(B),即32KB,如果設置為-1,則使用操作系統的默認值。 |
| send.buffer.bytes | 用來設置socket發送緩沖區的大小,默認值為131072(B),即128KB,如果設置為-1,則使用操作系統默認值。 |
| request.timeout.ms | 配置Producer等待請求響應的最長時間,默認值為30000(ms),請求超時之后可以進行重試。注意這個參數需要比broker端參數replica.lag.time.max.ms的值要大,這樣可以減少因客戶端重試而引起的消息重復的概率。 |
| enable.idempotence | 是否開啟冪等性功能,默認值false |
| max.in.flight.requests.per.connection | 限制每個連接,也就是客戶端與Node之間的連接最多緩存請求數,默認值5 |
| transactional.id | 設置事物id,必須唯一,默認值null |
消費者客戶端參數
參考:http://kafka.apache.org/documentation/#consumerconfigs
| bootstrap.servers | |
| key.descrializer | Message record 的key的反序列化類。 |
| value.descrializer | Message record 的value的反序列化類。 |
| group.id | 用于表示該consumer想要加入到哪個group中。默認值是 “”。 |
| heartbeat.interval.ms | 心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是確定consumer存活,加入或者退出group的有效手段。這個值必須設置的小于session.timeout.ms,因為:當Consumer由于某種原因不能發Heartbeat到coordinator時,并且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。通常設置的值要低于session.timeout.ms的1/3。默認值是:3000 (3s) |
| session.timeout.ms | Consumer session 過期時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。其默認值是:10000 (10 s) |
| enable.auto.commit | Consumer 在commit offset時有兩種模式:自動提交,手動提交。手動提交在前面已經說過。自動提交:是Kafka Consumer會在后臺周期性的去commit。默認值是true。 |
| auto.commit.interval.ms | 自動提交間隔。范圍:[0,Integer.MAX],默認值是 5000 (5 s) |
| auto.offset.reset | 這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(如果一個record被刪除,就肯定不存在了)時,該如何處理。它有4種處理方式: 1) earliest:自動重置到最早的offset。 2) latest:看上去重置到最晚的offset。 3) none:如果邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。 4) 如果不是上述3種,只拋出異常給consumer。 默認值是latest。 |
| connections.max.idle.ms | 連接空閑超時時間。因為consumer只與broker有連接(coordinator也是一個broker),所以這個配置的是consumer到broker之間的。默認值是:540000 (9 min) |
| fetch.max.wait.ms | Fetch請求發給broker后,在broker中可能會被阻塞的(當topic中records的總size小于fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。 |
| fetch.min.bytes | 當consumer向一個broker發起fetch請求時,broker返回的records的大小最小值。如果broker中數據量不夠的話會wait,直到數據大小滿足這個條件。取值范圍是:[0, Integer.Max],默認值是1。默認值設置為1的目的是:使得consumer的請求能夠盡快的返回。 |
| fetch.max.bytes | 一次fetch請求,從一個broker中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。取值范圍是:[0, Integer.Max],默認值是:52428800 (5 MB) |
| max.partition.fetch.bytes | 一次fetch請求,從一個partition中取得的records最大大小。如果在從topic中第一個非空的partition取消息時,如果取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片情況下,只會返回這一條record。broker、topic都會對producer發給它的message size做限制。所以在配置這值時,可以參考broker的message.max.bytes 和 topic的max.message.bytes的配置。 |
| max.poll.interval.ms | 前面說過要求程序中不間斷的調用poll()。如果長時間沒有調用poll,且間隔超過這個值時,就會認為這個consumer失敗了。 |
| max.poll.records | Consumer每次調用poll()時取到的records的最大數。 |
| receive.buffer.byte | Consumer receiver buffer (SO_RCVBUF)的大小。這個值在創建Socket連接時會用到。取值范圍是:[-1, Integer.MAX]。默認值是:65536 (64 KB)如果值設置為-1,則會使用操作系統默認的值。 |
| request.timeout.ms | 請求發起后,并不一定會很快接收到響應信息。這個配置就是來配置請求超時時間的。默認值是:305000 (305 s) |
| client.id | Consumer進程的標識。如果設置一個人為可讀的值,跟蹤問題會比較方便。 |
| interceptor.classes | 用戶自定義interceptor。 |
| metadata.max.age.ms | Metadata數據的刷新間隔。即便沒有任何的partition訂閱關系變更也行執行。范圍是:[0, Integer.MAX],默認值是:300000 (5 min) |
| group.max.session.timeout.ms | session超時時間,默認值:1800000 (30 minutes) |
| group.min.session.timeout.ms | 最小session超時時間,默認值:6000 (6 seconds) |
主題參數
| cleanup.policy | “delete” 或 “compact”的字符串。這個字符串指派了用在老的日志片段的保存策略。默認的策略(“delete”)會在它們的保留時間或是大小超出限制時丟棄老的片段。設置為”compact”將在主題上啟用日志壓縮。 | 列表 | delete | [compact, delete] | log.cleanup.policy | 中 |
| compression.type | 為一個給定的主題指定最終的壓縮類型。這個配置接受標準的壓縮編碼器(‘gzip’, ‘snappy’, lz4)。它額外的接受’uncompressed’,等于不壓縮;還有’producer’ 表示使用生產者設置的原始壓縮編碼器。 | 字符 | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | 中 |
| delete.retention.ms | 為日志壓縮主題保留刪除墓碑標記的時間。這個設置也給出了一個消費者必須完成一個讀取的邊界,如果它們開始從偏移0 到確保它可以得到一個可用的最尾端的快照(否則在它們完成掃描之前刪除墓碑會被收集) | 長整型 | 86400000 | [0,…] | log.cleaner.delete.retention.ms | 中 |
| file.delete.delay.ms | 從文件系統刪除一個文件之前的等待時間。 | 長整型 | 60000 | [0,…] | log.segment.delete.delay.ms | 中 |
| flush.messages | 這個設置允許指定一個時間間隔,它將用于寫入到日志的數據強制進行文件同步。例如:如果這個值被設置為1我們將在每條消息之后進行文件同步。如果是5我將在每5條消息后進行文件同步。通常我們推薦你不要設置這個值并使用副本實現持久性和允許使用操作系統的后臺刷新能力,因為它更高效。此設置可以按每個主題覆蓋(參見每個主題配置部分)。 | 長整型 | 9223372036854775807 | [0,…] | log.flush.interval.messages | 中 |
| flush.ms | 這個設置允許指定一個時間間隔,它將用于寫入到日志的數據強制進行文件同步。例如:如果這個值被設置成1000我們將在1000毫秒之后進行文件同步。通常我們推薦你不要設置這個值并使用副本實現持久性和允許使用操作系統的后臺刷新能力,因為它更高效。 | 長整型 | 9223372036854775807 | [0,…] | log.flush.interval.ms | 中 |
| follower.replication.throttled.replicas | 應該被限制在跟隨者端的日志副本列表。這個列表應該描述成副本的一個集合,格式為 [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或是可選的通配符“*”,可以用來限制這個主題的所有副本。 | 列表 | “” | [partitionId],[brokerId]:[partitionId],[brokerId]:… | follower.replication.throttled.replicas | 中 |
| index.interval.bytes | 這個設置控制了 Kafka添加一個索引入口到它的偏移索引的頻率。默認設置是確保我們的索引信息大約每4096字節。更多的索引允許在日志中讀取時跳躍到離期望的位置更近的地方,但是這樣會讓索引更大。你很可能不需要更改這個值。 | 列表 | 4096 | [0,…] | log.index.interval.bytes | 中 |
| leader.replication.throttled.replicas | 應該被限制在服務器端的日志副本列表。這個列表應該描述成副本的一個集合,格式為 [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:…或是可選的通配符“*”,可以用來限制這個主題的所有副本。 | 列表 | “” | [partitionId],[brokerId]:[partitionId],[brokerId]:… | leader.replication.throttled.replicas | 中 |
| max.message.bytes | Kafka允許的最大的記錄批次大小。如果這是增長的并且有老于0.10.2的消費者,消費者獲取大小也必須是增長的以便它們可以這個大小的記錄批次。在最新的消息格式版本中,記錄常常分組為批次來實現更高效。在先前的消息格式版本中,未壓縮的記錄沒有按批次分組,并且在這種情況下這樣的局限僅適用于一個單一的記錄。 | 整型 | 1000012 | [0,…] | message.max.bytes | 中 |
| message.format.version | 指定了broker將用于追加消息到日志的消息格式的版本。這個值應該是一個有效的API版本。例如:0.8.2, 0.9.0.0, 0.10.0,更多詳情請查看API版本。通過設置一個獨特的消息格式版本,用戶可以證明所有在磁盤上已存在的消息版本都小或等于指定版本。如果這個值沒有被正確的設置將導致有更老版本的消費者崩潰,它們將收到帶有一個它們不認識的格式的消息。 | 字符 | 0.11.0-IV2 | log.message.format.version | 中 | |
| message.timestamp.difference.max.ms | 一個broker收到一個消息時的時間戳與在消息中指定的時間戳之間允許的最大差異。如果message.timestamp.type=CreateTime,消息會被拒絕,如果時間戳的差異超過了這個閾值。如果message.timestamp.type=LogAppendTime這個配置會被忽略。 | 長整型 | 9223372036854775807 | [0,…] | log.message.timestamp.difference.max.ms | 中 |
| message.timestamp.type | 定義消息的時間戳是消息的創建時間或是日志的追加時間。這個值應該是CreateTime 或 LogAppendTime | 字符 | CreateTime | log.message.timestamp.type | 中 | |
| min.cleanable.dirty.ratio | 這個配置控制著日志壓縮器將要嘗試清理日志的頻率(假設日志壓縮已啟用)。默認情況下,我們將避免清理超過50%的已被壓縮的日志。這個比率限制了日志中重復的最大空間(最多50%個日志中的50%個可能是重復的)。更高的比率意味著更少、更高效的清潔,但也意味著在日志中有更多的空間浪費。 | 雙精度型 | 0.5 | [0,…,1] | log.cleaner.min.cleanable.ratio | 中 |
| min.compaction.lag.ms | 一條消息在日志里保持未壓縮的最小時間。只對壓縮的日志有效。 | 長整型 | 0 | [0,…] | log.cleaner.min.compaction.lag.ms | 中 |
| min.insync.replicas | 當一個生產者設置acks 為 “all”或是 “-1”。這個配置指定了必須被認為是成功的一個寫入的副本的最小數量。如果這個最小數量不能被滿足,那么生產者將拋出一個異常( NotEnoughReplicas 或 NotEnoughReplicasAfterAppend)。當min.insync.replicas 和 acks 一起使用時,允許你實現更強的耐久性保證。一個典型的情況是與3個復制因子創建主題,設置 min.insync.replicas 為2,并且生產者的acks 為 “all”。這將確保生產者拋出一個意外,如果一個主要的副本沒有接收到一個寫入。 | 整型 | 1 | [1,…] | min.insync.replicas | 中 |
| preallocate | 設為true時我們應當在創建一個新的日志片段時預分配文件在磁盤上。 | 布爾 | FALSE | log.preallocate | 中 | |
| retention.bytes | 如果我們使用”delete”保留策略,這個配置控制了丟棄一個老的日志片段來釋放之前保留一個日志文件可以增長到的最大在小。默認情況下沒有大小限制,只有時間限制。 | 長整型 | -1 | log.retention.bytes | 中 | |
| retention.ms | 如果我們使用”delete”保留策略,這個配置控制了丟棄一個老的日志片段來釋放之前保留一個日志文件最大時間。這代表在服務級別協議之上,一個消費者必須多快地讀取他們的數據。 | 長整型 | 604800000 | log.retention.ms | 中 | |
| segment.bytes | 這個配置控制了日志片段文件。保留或清理通常是一次一個文件的進行的。所以一個大的片段大小意味著更少的文件,但是保留上的控制會有更少的顆粒。 | 整型 | 1073741824 | [14,…] | log.segment.bytes | 中 |
| segment.index.bytes | 這個配置控制了到文件位置的映射偏移索引的大小。我們預分配這個索引文件并且只在日志展開后收縮。通常情況下不需要改這個設置。 | 整型 | 10485760 | [0,…] | log.index.size.max.bytes | 中 |
| segment.jitter.ms | 計劃的片段展開時間減少的最大隨機抖動,以免發生迅速集中的片段展開。 | 長整型 | 0 | [0,…] | log.roll.jitter.ms | 中 |
| segment.ms | 這個配置控置了時間周期,在這之后 kafka將會強制日志展開,即使這個片段文件沒有滿到確保保留可以刪除或是壓縮舊的數據。 | 長整型 | 604800000 | [0,…] | log.roll.ms | 中 |
| unclean.leader.election.enable | 指示是否開啟不在ISR里的副本設置為選舉為領導者作為最后的手段,盡管這樣做會導致數據丟失。 | 布爾 | FALSE | unclean.leader.election.enable | 中 |
總結
以上是生活随笔為你收集整理的kafka技术内幕(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka常用命令及问题解决
- 下一篇: kafka技术内幕(二)