kafka学习梳理
文章目錄
- 1. kafka是一個消息隊列,
- 2. kafka中主要的術語
- 3. kafka broker
- 3.1 topic partition的數據冗余和leader partition
- 3.2 集群中的controller
- 3.3 partition中的leader,follower,in-sync-replica
- 1. leader如何判斷一個follower是否應該屬于in-sync-replica 集合呢
- 2. leader的選舉
- 3. 最小ISR設置
- 3.4 partition中消息的High-Watermark
- 1. 什么是HighWatermark,有什么用
- 2. HW的更新機制:
- 1. LEO ,log-end-offset
- 4 producer
- 1. producer如何直接連接對應的topic所在節點
- 2. 相關配置
- 5 consumer
- 1. consumer的rebalance過程
- 1. 發生rebalance的時機
- 2. rebalance過程
- 2. 消費者的一些配置
- 6. 高性能讀寫的秘密
這一篇簡要總結一下kafka的原理和概念
1. kafka是一個消息隊列,
對應的特性有
- 高吞吐量:吞吐量高達數十萬
- 高并發:支持數千個客戶端同時讀寫
- 低延遲:延遲最低只有幾毫秒
- 消息持久性和可靠性:消息被持久化到本地磁盤,同時支持數據備份
- 集群容錯性:允許部分節點失敗
- 可擴展性:支持集群動態擴展
2. kafka中主要的術語
??kafka 是一個分布式的消息中間件,一條消息就是類似我們往MYSQL中存儲中的任何一條數據一樣,kafka服務以集群的方式存在,集群中的每一個節點叫broker。消息在kafka的歸檔方式是按照topic來進行歸檔的(就像是數據庫中一個表一樣),同時,每個topic又可以被分為一個或者多個partition(可以理解為MYSQL中的分表),用來提高并行度(生產者或者消費者的并行度)。
??消息的生產者被稱為producer,消息的消費者被稱為consumer。生產者將消息發送到broker中的摸個topic的某個partition當中,consumer指定從某一個或者多個topic的partion當中拉取數據。kafka可以保證保證數據的生產和消費的有序性(當然如果想要嚴格保證的話還需要進一步了解和設置它)。kafka的一個簡單圖:
3. kafka broker
kafka broker層面要做的東西可以主要分為兩個比較大的方面,一方面是集群的管理,一方面是數據的管理。
3.1 topic partition的數據冗余和leader partition
??因為kafka是分布式的,所以他會通過一定的數據冗余來對抗集群中的部分節點失敗的情況,通過設置每個topic的replica.factor=3(或其他),每個topic就會有3份存儲,這樣的話當其中的一個或者連個副本所在的broker宕機的話,服務依然能夠提供(根據不同的配置,有可能導致少許數據丟失,后面會聊聊)。因為每個partition有多個副本,而kafka規定了多個副本中只能有一個leader 副本,所有的讀寫都是對這個leader進行的。
??在整個kafka集群中有一個大部分分布式服務都有的master角色,他負責了整個集群的topic的partition的leader選舉,叫controller。
3.2 集群中的controller
??像所有的分布式集群都有master節點一樣(zookeeper有master節點,es也有),kafka也有一個master角色,只是在kafka當中叫controller。controller可以認為承擔了集群的可用性管理,他主要負責每個partition的leader選舉工作。
??那么kafka中的controller是如何選舉的的呢,因為有了zookeeper,所以選舉的功能得到了很大簡化。在啟動的時候大家都是去zookeeper那里搶占式的創建一個相同的臨時節點(/controller)。zookeeper的分布式一致性保證了只有一個請求能夠創建成功,那么創建成功的就成為了controller,其他失敗的節點則在這個節點上添加監聽,在當前controller發生故障失敗的時候這個臨時節點(/controller)會被zookeeper刪除,然后大家就可以再進行一次競爭了。
??controller在每次選舉得到的新的控制器會通過zookeeper確認自己是新的controller,然后會把epoch+=1,這里的epoch是什么呢,就是一個數值,可以簡單理解為皇帝的年號一樣的東西,每個皇帝都有自己的epoch,然后下一任的皇帝的epoch比當前這一任的大。controller每次發消息的時候也會帶上這個epoch,這樣主要是為了防止腦裂,也就是新的controller選出來之后,舊的卡住的controller又活過來了,這個時候他可能還認為自己是controller,進而給其他的節點發號施令,但是他的epoch比較舊,這樣的信息會被其他的節點忽略。
??epoch是分布式中避免腦裂常用的一種手段,在zookeeper和raft當中都有應用。
3.3 partition中的leader,follower,in-sync-replica
leader,follower,in-sync-replica都是運行時的概念,而且會動態變化的,在靜態的存儲中一般都叫replica
??在上面講到,kafka為了提升容錯能力,每個partition會有多個副本(replica)多個副本當中有一個會成為leader, 每次接收producer和consumer的請求總是由leader partition來處理。
??在producer發送消息往leader時,leader會將message存入自己所在的partition當中,同時follower也在源源不斷的從leader拉取數據,也就是同步leader中的數據。在這里面kafka允許部分副本比較慢,這樣可以提升服務的服務性能,kafka維護了一個follower的子集,叫in-sync-replica叫同步副本集,這個集合里面也包括leader本身,也就是leader會認為這些副本是一直在保持和leader進行同步的。
1. leader如何判斷一個follower是否應該屬于in-sync-replica 集合呢
2. leader的選舉
??那么當一個leader掛掉之后,新的leader又是如何被controller選中的呢,他會從 in-sync-replica中選取一個座位leader,是不是有點粗暴,而且,我們也可以看到,因為 in-sync-replica中的follower是有可能落后于leader的,這樣,新的leader選出來以后,其數據是有可能落后于原來的leader的,這就有可能造成數據的丟失(已經給producer返回了成功消息,但是最終消息卻沒有完成真正的持久化,consumer消費不到這條消息)。當然,我們通過一些配置是能夠達到數據不丟失的,需要broker端和producer端的配合。
3. 最小ISR設置
??在broker端還可以配置min.insync.replicas,如果min.insync.replicas=2,那么至少要存在兩個同步副本才能向分區寫入數據。這個時候如果只有一個同步副本,那么Broker就會停止接受生產者的請求,此時Broker變成了只讀,嘗試發送數據的生產者會收到NotEnoughReplicasException異常,但是消費者仍然可以繼續讀取已有的數據。
??這是為了避免發生不完全選舉時數據的寫入和讀取出現非預期的行為,可以看出來,這個參數也是實現高可用的重要一環,假如設置min.insync.replicas=1,那么leader掛了,就無法選出leader了。
3.4 partition中消息的High-Watermark
1. 什么是HighWatermark,有什么用
??kafka中的消息在partition當中是按照先來后到的順序持續存入的,High-Watermark ,高水位,他標識了截止到哪條消息是consumer可以看到的。因為要盡可能滿足數據的一致性,有可能消息只是在partition中的leader存在,還沒有復制給其他的follower,這個時候讓consumer看到這條消息是不安全的。因為這條消息有可能還沒有答應producer已經成功持久化,這個時候如果leader宕機,也會導致數據的不一致,因為這個時候可能給producer返回的是fail,但是實際上consumer卻消費到了這條數據。所以kafka就設計了high-water-mark,標識截止到哪條數據是consumer可以消費的。
2. HW的更新機制:
1. LEO ,log-end-offset
??在聊HW的更新機制之前,需要先了解LEO(log-end-offset),這個是partition的每個replica中存儲的日志的最后一條日志的offset(最后存進來的),offset其實就是日志的進來的順序編號,可以理解為數組的下標。
??producer每發進來一條消息,server端(broker)對應的都會放到某個指定的partition下,每個消息都會產生一個offset。
??同時,leader當中不僅會有自己的LEO,也會有其他follower的LEO信息(這個信息也就是follower在拉取leader的數據的時候傳入的fetch offset ),leader會根據這些LEO信息來完成HW的更新。HW=min(LEO)我們可以通以下幾個問題來回答HW的更新
follower 何時更新LEO
leader 如何更新自己記錄的follower的LEO
leader 端非自己副本對象 LEO值是在leader端處理follower的FETCH請求過程中被更新的。
follower 何時更新HW
leader 何時更新HW
leader 在正常同步時的更新機制 HW的更新過程
4 producer
??這里主要是要了解producer的發送機制,以及一些比較重要的配置。
1. producer如何直接連接對應的topic所在節點
Producers直接發送消息到broker上的leader partition,不需要經過任何中介一系列的路由轉發。為了實現這個特性,
2. 相關配置
acks:
- acks=0 ,生產者把消息發送到broker即認為成功,不等待broker的處理結果,這種情況下,下面的retries的配置也是無效的。這種方式的吞吐最高,但也是最容易丟失消息的
- acks=1:生產者會在該分區的群首(leader)寫入消息并返回成功后,認為消息發送成功。如果群首寫入消息失敗,生產者會收到錯誤響應并進行重試。這種方式能夠一定程度避免消息丟失,但如果群首宕機時該消息沒有復制到其他副本,那么該消息還是會丟失。
- acks=all:生產者會等待所有副本成功寫入該消息,這種方式是最安全的,能夠保證消息不丟失,但是延遲也是最大的,這種一般是用在對數據的持久化和一致性比較高的場景,但是對數據吞吐量要求不是特別高。
retries
- 當生產者發送消息收到一個可恢復異常時,會進行重試,這個參數指定了重試的次數。在實際情況中,這個參數需要結合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比集群重新選舉群首的時間長,這樣可以避免生產者過早結束重試導致失敗
batch.size
- 當多條消息發送到一個分區時,生產者會進行批量發送,這個參數指定了批量消息的大小上限(以字節為單位)。當批量消息達到這個大小時,生產者會一起發送到broker;但即使沒有達到這個大小,生產者也會有定時機制來發送消息,避免消息延遲過大。
linger.ms
- 這個參數指定生產者在發送批量消息前等待的時間,當設置此參數后,即便沒有達到批量消息的指定大小,到達時間后生產者也會發送批量消息到broker。默認情況下,生產者的發送消息線程只要空閑了就會發送消息,即便只有一條消息。設置這個參數后,發送線程會等待一定的時間,這樣可以批量發送消息增加吞吐量,但同時也會增加延遲。
buffer.memory
- 這個參數設置生產者緩沖發送的消息的內存大小
client.id
- 這個參數可以是任意字符串,它是broker用來識別消息是來自哪個客戶端的。在broker進行打印日志、衡量指標或者配額限制時會用到。
max.in.flight.requests.per.connection
- 這個參數指定生產者可以發送多少消息到broker并且等待響應,設置此參數較高的值可以提高吞吐量,但同時也會增加內存消耗。如果想要保證消息的有序性,只能設置為1。2.0中默認為5,kafka保證了單個producer的嚴格exactly-once,也保證了有序性,比較牛叉
max.request.size
- 這個參數限制生產者發送數據包的大小,數據包的大小與消息的大小、消息數相關。如果我們指定了最大數據包大小為1M,那么最大的消息大小為1M,或者能夠最多批量發送1000條消息大小為1K的消息。另外,broker也有message.max.bytes參數來控制接收的數據包大小。在實際中,建議這些參數值是匹配的,避免生產者發送了超過broker限定的數據大小。
5 consumer
??kafka的消費者,消費者,顧名思義,就是從kafka broker 上拉取生產者producer產生的數據。消費的數據粒度可以到達topic.partition,同時,也可以指定的起始位置offset值,或者是按照時間查找offset,然后進行消費。
??每個consumer會有一個group-id,多個consumer可以屬于一個group-id,進而分享一個topic的多個partition(提高并行能力)
??consumer可以使用自動提交消費點位offset,也可以使用手動提交的方式,他其實沒有ack機制,每次提交offset,就是往broker的__consumer_offset__ 這個topic生產消息而已。然后下次啟動的時候又默認會從這個topic中取到相關的offset信息,使用這個offset從broker中拉取數據。
1. consumer的rebalance過程
1. 發生rebalance的時機
1.正常情況
組成員個數發生變化。例如有新的 consumer 實例加入該消費組或者離開組。 訂閱的 Topic 個數發生變化。 訂閱 Topic 的分區數發生變化。2.消費者意外情況
session 過期 max.poll.interval 到期,在這個時間值達到時,心跳線程會自動停止發送heartbeats 然后 發送leave-group request這個時候會觸發rebalance,
2. rebalance過程
下面以新增一個consumer來闡述
Consumer Client 發送 join-group 請求,如果 Group 不存在,創建該 Group,Group 的狀態為 Empty;
由于 Group 的 member 為空,將該 member 加入到 Group 中,并將當前 member (client)設置為 Group 的 leader,進行 rebalance 操作,Group 的狀態變為 preparingRebalance,等待 rebalance.timeout.ms 之后(為了等待其他 member 重新發送 join-group,如果 Group 的狀態變為 preparingRebalance,Consumer Client 在進行 poll 操作時,needRejoin() 方法結果就會返回 true,也就意味著當前 Consumer Client 需要重新加入 Group),Group 的 member 更新已經完成,此時 Group 的狀態變為 AwaitingSync,并向 Group 的所有 member 返回 join-group 響應;
client 在收到 join-group 結果之后,如果發現自己的角色是 Group 的 leader,就進行 assignment,該 leader 將 assignment 的結果通過 sync-group 請求發送給 GroupCoordinator,而 follower 也會向 GroupCoordinator 發送一個 sync-group 請求(只不過對應的字段為空);
當 GroupCoordinator 收到這個 Group leader 的請求之后,獲取 assignment 的結果,將各個 member 對應的 assignment 發送給各個 member,而如果該 Client 是 follower 的話就不做任何處理,此時 group 的狀態變為 Stable(也就是說,只有當收到的 Leader 的請求之后,才會向所有 member 返回 sync-group 的結果,這個是只發送一次的,由 leader 請求來觸發)。
2. 消費者的一些配置
fetch.min.bytes
- 這個參數允許消費者指定從broker讀取消息時最小的數據量。當消費者從broker讀取消息時,如果數據量小于這個閾值,broker會等待直到有足夠的數據,然后才返回給消費者。對于寫入量不高的主題來說,這個參數可以減少broker和消費者的壓力,因為減少了往返的時間。而對于有大量消費者的主題來說,則可以明顯減輕broker壓力。
fetch.max.wait.ms
- 上面的fetch.min.bytes參數指定了消費者讀取的最小數據量,而這個參數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞。這個參數默認為500ms。
max.partition.fetch.bytes
-
這個參數指定了每個分區返回的最多字節數,默認為1M。也就是說,KafkaConsumer.poll()返回記錄列表時,每個分區的記錄字節數最多為1M。如果一個主題有20個分區,同時有5個消費者,那么每個消費者需要4M的空間來處理消息。實際情況中,我們需要設置更多的空間,這樣當存在消費者宕機時,其他消費者可以承擔更多的分區。
-
需要注意的是,max.partition.fetch.bytes必須要比broker能夠接收的最大的消息(由max.message.size設置)大,否則會導致消費者消費不了消息。另外,在上面的樣例可以看到,我們通常循環調用poll方法來讀取消息,如果max.partition.fetch.bytes設置過大,那么消費者需要更長的時間來處理,可能會導致沒有及時poll而會話過期。對于這種情況,要么減小max.partition.fetch.bytes,要么加長會話時間。
session.timeout.ms
- 這個參數設置消費者會話過期時間,默認為3秒。也就是說,如果消費者在這段時間內沒有發送心跳,那么broker將會認為會話過期而進行分區重平衡。這個參數與heartbeat.interval.ms有關,heartbeat.interval.ms控制KafkaConsumer的poll()方法多長時間發送一次心跳,這個值需要比session.timeout.ms小,一般為1/3,也就是1秒。更小的session.timeout.ms可以讓Kafka快速發現故障進行重平衡,但也加大了誤判的概率(比如消費者可能只是處理消息慢了而不是宕機)。
auto.offset.reset
- 這個參數指定了當消費者第一次讀取分區或者上一次的位置太老(比如消費者下線時間太久)時的行為,可以取值為latest(從最新的消息開始消費)或者earliest(從最老的消息開始消費)。
enable.auto.commit
- 這個參數指定了消費者是否自動提交消費位移,默認為true。如果需要減少重復消費或者數據丟失,你可以設置為false。如果為true,你可能需要關注自動提交的時間間隔,該間隔由auto.commit.interval.ms設置。
partition.assignment.strategy
-
我們已經知道當消費組存在多個消費者時,主題的分區需要按照一定策略分配給消費者。這個策略由PartitionAssignor類決定,默認有兩種策略:
- 范圍(Range):對于每個主題,每個消費者負責一定的連續范圍分區。假如消費者C1和消費者C2訂閱了兩個主題,這兩個主題都有3個分區,那么使用這個策略會導致消費者C1負責每個主題的分區0和分區1(下標基于0開始),消費者C2負責分區2。可以看到,如果消費者數量不能整除分區數,那么第一個消費者會多出幾個分區(由主題數決定)。
- 輪詢(RoundRobin):對于所有訂閱的主題分區,按順序一一的分配給消費者。用上面的例子來說,消費者C1負責第一個主題的分區0、分區2,以及第二個主題的分區1;其他分區則由消費者C2負責。可以看到,這種策略更加均衡,所有消費者之間的分區數的差值最多為1。
-
partition.assignment.strategy設置了分配策略,默認為org.apache.kafka.clients.consumer.RangeAssignor(使用范圍策略),你可以設置為org.apache.kafka.clients.consumer.RoundRobinAssignor(使用輪詢策略),或者自己實現一個分配策略然后將partition.assignment.strategy指向該實現類。
client.id
- 這個參數可以為任意值,用來指明消息從哪個客戶端發出,一般會在打印日志、衡量指標、分配配額時使用。
max.poll.records
- 這個參數控制一個poll()調用返回的記錄數,這個可以用來控制應用在拉取循環中的處理數據量。
max.poll.interval.ms
- 兩次 poll 之間的最大時間間隔,設置大一點可以處理消息的時間,在到達這個時間沒有進行poll()操作的話會自動停止發送心跳,并且發送一個leave-group的請求,
假如兩次poll()之間處理請求比較大的話應該放到異步去做,因為服務器同時會使用這個參數作為等待其他consumer相應rejion的最大時長,假如其他consumer也把max.poll.interval.ms設置的比較長的話,那么整個rebalance可能耗時會很長。
receive.buffer.bytes、send.buffer.bytes
- 這兩個參數控制讀寫數據時的TCP緩沖區,設置為-1則使用系統的默認值。如果消費者與broker在不同的數據中心,可以一定程度加大緩沖區,因為數據中心間一般的延遲都比較大。
partition.assignment.strategy
- 這個設置了consumer的partition在consumer中的分配機制
kafka的壓縮和解壓縮,理論上broker是不會再處理的,除非單獨配置了,這樣有可能會導致cpu升高
http://zhongmingmao.me/2019/08/02/kafka-compression/
6. 高性能讀寫的秘密
參考
https://juejin.im/post/5d9944e9f265da5b6a169271
https://juejin.im/post/5bf6b0acf265da612d18e931#heading-5
https://juejin.im/post/5c0683b1f265da614f701441
https://juejin.im/post/5c46e729e51d452c8e6d5679
總結
- 上一篇: 搜索的一般过程
- 下一篇: 算法训练营12-动态规划