kafka记录及面试题
基本概念:
1、Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉取指定Topic的消息,然后進行業務處理。一個 Topic會分為多個 Partition分布在不同的broker上。
2、Producer 生產的數據會不斷append到partition文件末端,每條數據都有自己的 Offset,每個消費者會實時記錄自己消費到了哪個 Offset。
3、消費者組內每個消費者負責消費不同partition的數據,一個分區只能由同一組內一個消費者消費
1、kafka的介紹:
分布式發布-訂閱消息隊列,生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦(允許我們獨立的擴展或修改隊列兩邊的處理過程)、異步處理(允許用戶把消息放入隊列但不立即處理它)、緩沖削峰(其實就是解決生產消息和消費消息的處理速度不一致的情況)的作用。
2、基本概念:
消費者組CG:消費者組內每個消費者負責消費不同partition的數據,提高消費能力。一個分區只能由同一組內一個消費者消費,消費者組之間互不影響,消費者組是邏輯上的一個訂閱者。
Broker:一臺 Kafka 機器就是一個 Broker。
Partition:是為了實現擴展性,提高并發能力,一個 Topic 可以分為多個 Partition分布在不同的broker上,每個 Partition 是一個 有序的隊列。
Topic :是邏輯上的概念,而 Partition 是物理上的概念,每個 Partition 對應著一些log 文件,Producer 生產的數據會不斷append追加到?log 文件末端,且每條數據都有自己的 Offset,且消費者組中的每個消費者,都會實時記錄自己消費到了哪個 Offset,以便出錯恢復時,從上次的位置繼續消費。生產者的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了partition分片和索引機制,將每個 Partition 分為多個 Segment,每個 Segment 對應一個“.index” 索引文件和 “.log” 數據文件,“.index” 文件存儲.log文件中 Message 的物理偏移量,這些文件位于同一文件下命名規則為topic 名-分區號,index 和 log 文件以當前 Segment 的第一條消息的 Offset 命名。?
Offset:Kafka中的每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中,partition中的每個消息都有一個連續的序號,也就是offset用于為partition唯一標識一條消息。Offset從語義上來看擁有兩種:Current Offset和Committed Offset。Current Offset是保存在Consumer客戶端中,是針對Consumer的poll過程的,它可以保證每次poll都返回不重復的消息;Committed Offset是用于Consumer Rebalance過程的,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重復消費。在Kafka 0.9前,Committed Offset信息保存在zookeeper的(zookeeper其實并不適合進行大批量的讀寫操作,尤其是寫操作),因此在0.9之后,所有的offset信息都保存在了Broker上的一個名為__consumer_offsets的topic中。
Group Coordinator:消費過程中,每個?Broker都會在啟動時啟動一個Group Coordinator?服務,CG確定自己屬于哪一個Coordinator?,首先確定該CG的offset信息寫入到__consumers_offsets topic中的分區,該分區的leader所在的broker就是CG所屬的coordinator。
Group Coordinator都會存儲消費者組以及組內消費者的配置信息,主要包括:訂閱的topics列表;Consumer Group配置信息,包括session timeout等;組中每個Consumer的元數據。包括主機名,consumer id;每個Group正在消費的topic partition的當前offsets;Partition的ownership元數據,包括consumer消費的partitions映射關系。
3、分區策略
(1)msg->partition
將 Producer 發送的msg封裝成一個 ProducerRecord 對象,對該對象指定一些參數后,然后進行序列化,然后按照topic和partition放進對應的發送隊列中,這里有異步和同步兩種發送方式。具體參數包括:
topic:string 類型,NotNull;partition:int 類型,可選;timestamp:long 類型,可選;key:string 類型,可選;value:string 類型,可選;headers:array 類型,Nullable。
①指明 Partition 的情況下,直接將給定的值作為 Partition 的值。
②沒有指明 Partition 但有 Key 的情況下,將 Key 的 Hash 值與分區數取余得到 Partition 值。
③既沒有 Partition 也沒有 Key 的情況下,Round-Robin 輪詢算法,第一次調用時隨機生成一個整數(后面每次調用都在這個整數上自增),將這個值與可用的分區數取余,得到 Partition 值。
(2)patition->broker
首先將所有Broker和待分配的Partition排序;其次將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader);最后將第i個Partition的第j個Replica分配到第((i + j) mod?n)個Broker上。
kafka使用zookeeper在broker中選出一個controller,用于partition在broker的分配和partition leader選舉。
4、數據可靠性保證
(1)冗余策略
分布式架構都會設置冗余副本數,kafka中partition的冗余機制默認是一個leader和兩個follower,所有對該partition的操作,實際操作的都是leader,然后再同步到其他的follower,follower是從leader批量拉取數據來同步,并且會盡量把多個副本,分配到不同的broker上。
(2)ack應答機制
提供了三種可靠性級別,用戶根據可靠性和延遲的要求進行權衡,Ack 參數配置:
0:Producer 不等待 Broker 的 ACK,這提供了最低延遲,Broker 一收到數據還沒有寫入磁盤就已經返回,當 Broker 故障時有可能丟失數據。
1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盤成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么將會丟失數據。
-1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盤成功后才返回 ACK。但是在 Broker 發送 ACK 時,Leader 發生故障,則會造成數據重復。如果設置ack=-1,設想有一個 Follower 因為某種原因出現故障,那 Leader 就要一直等到它完成同步,Leader維護了一個動態的ISR,如果 Follower 長時間未向 Leader 同步數據,則該 Follower 將被踢出 ISR 集合,Leader 發生故障后,controller就會從 ISR 中選舉出新的 Leader。
(3)故障處理
LEO:每個partition最大的 Offset。HW:消費者能見到的最大的 Offset,即ISR 隊列中最小的 LEO。
Follower 故障:Follower 發生故障后會被臨時踢出 ISR 集合,待該 Follower 恢復后,Follower 會 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 Leader 進行同步數據操作,等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。
Leader 故障:Leader 發生故障后,controller會從 ISR 中選出一個新的 Leader,之后,為保證多個副本之間的數據一致性,其余的 Follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 Leader 同步數據,這只能保證副本之間的數據一致性,并不能保證數據不丟失或者不重復。
(4)Exactly Once 語義
將服務器的 ACK 級別設置為 -1,可以保證 Producer 到 Server 之間不會丟失數據,即 At Least Once 語義,但是不能保證數據不重復。
將服務器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次可以保證數據不重復,即 At Most Once 語義,但是不能保證數據不丟失。
但是,對于一些非常重要的信息比如交易數據,下游數據消費者要求數據既不重復也不丟失,即 Exactly Once 語義。
0.11 版本的 Kafka,引入了冪等性:Producer 不論向 Server 發送多少重復數據,Server 端都只會持久化一條。
即:At?Least?Once?+?冪等性?=?Exactly?Once
開啟冪等性的 Producer 在初始化時會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number,而 Borker 端會對 <PID,Partition,SeqNumber> 做緩存,當具有相同值的消息提交時,Broker 只會持久化一條,但是 PID 重啟后就會變化,且不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區跨會話的 Exactly Once。
5、消費方式
Consumer 采用 Pull(拉取)模式從 Broker 中讀取數據,可以根據 Consumer 的消費能力以適當的速率消費消息。Pull 模式不足之處是,如果 Kafka 沒有數據,消費者可能會陷入循環一直返回空數據。針對這一點, Kafka 的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可供消費,Consumer 會等待timeout時間之后再返回。
Consumer 采用 Push(推送)模式,Broker 給 Consumer 推送消息的速率是由 Broker 決定的,很難適應消費速率不同的消費者。目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。
6、reblance消費者組內分區分配策略
發生場景:消費者組成員個數發生變化;訂閱的?Topic?個數發生變化;訂閱?Topic?的分區數發生變化。
(1)?RoundRobin,輪詢方式將該CG訂閱的所有topic的所有分區作為一個整體進行 Hash 排序,消費者組內分配分區個數最大差別為 1,是按照組來分的,可以解決多個消費者消費數據不均衡的問題。但是當消費者組內訂閱不同主題時,可能造成消費混亂。
(2) Range,默認為Range,不會產生消費混亂問題,其實就是單個topic的輪詢分配,但是同時訂閱了主題 A 和 B,可能造成消息分配不對等問題,當消費者組內訂閱的主題越多,分區分配可能越不均衡。
Rebalance?發生時,Group?下所有?Consumer?實例都會協調在一起共同參與,Kafka?能夠保證盡量達到最公平的分配。但是?Rebalance?過程對?Consumer Group?會造成比較嚴重的影響。在?Rebalance?的過程中?Consumer Group?下的所有消費者實例都會停止工作,等待?Rebalance?過程完成。
常見面試題:
1、Kafka中的ISR、AR又代表什么?ISR的伸縮又指什么
ISR:In-Sync Replicas 副本同步隊列
AR:Assigned Replicas 所有副本
ISR是由leader維護,每個Partition都會有一個ISR,follower從leader同步數據有一些延遲,包括延遲時間和延遲條數兩個維度,任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
2、什么是broker
broker 是消息的代理,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉取指定Topic的消息,然后進行業務處理,broker在中間起到一個代理保存消息的中轉站。
3、kafka follower如何與leader同步數據
Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。完全同步復制要求All Alive Follower都復制完,這條消息才會被認為commit,這種復制方式極大的影響了吞吐率。而異步復制方式下,Follower異步的從Leader復制數據,數據只要被Leader寫入log就被認為已經commit,這種情況下,如果leader掛掉,會丟失數據,kafka使用ISR的方式很好的均衡了確保數據不丟失以及吞吐率。Follower可以批量的從Leader復制數據,而且Leader充分利用磁盤順序讀以及send file(zero copy)機制,這樣極大的提高復制性能,內部批量寫磁盤,大幅減少了Follower與Leader的消息量差(順序寫、零拷貝、批量量處理也是kafka快的原因)。
4、什么情況下一個 partition或broker 會從 isr中踢出去
leader會維護一個副本同步列表ISR(in-sync Replica),每個Partition都會有一個ISR,而且是由leader動態維護 ,如果一個follower比一個leader落后太多,或者超過一定時間未發起數據復制請求,則leader將其從ISR中移除 。
5、kafka producer 打數據,ack ?為 0, 1, -1 的時候代表啥, 設置 -1 的時候,什么情況下,leader 會認為一條消息 commit了
1(默認)??數據發送到Kafka后,經過leader成功接收消息的的確認,就算是發送成功了。在這種情況下,如果leader宕機了,則會丟失數據。
0?生產者將數據發送出去就不管了,不去等待任何返回。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。
-1?producer需要等待ISR中的所有follower都確認接收到數據后才算一次發送完成,可靠性最高。當ISR中所有Replica都向Leader發送ACK時,leader才commit,這時候producer才能認為一個請求中的消息都commit了。
6、如果leader crash時,ISR為空怎么辦
kafka在Broker端提供了一個配置參數:unclean.leader.election,這個參數有兩個值:
true(默認):允許不同步副本成為leader,由于不同步副本的消息較為滯后,此時成為leader,可能會出現消息不一致的情況。
false:不允許不同步副本成為leader,此時如果發生ISR列表為空,會一直等待舊leader恢復,降低了可用性。
7、kafka的message格式是什么樣的
一個Kafka的Message由一個固定長度的header和一個變長的消息體body組成
header部分由一個字節的magic(文件格式)和四個字節的CRC32(用于判斷body消息體是否正常)構成。
當magic的值為1的時候,會在magic和crc32之間多一個字節的數據:attributes(保存一些相關屬性,
比如是否壓縮、壓縮格式等等);如果magic的值為0,那么不存在attributes屬性
body是由N個字節構成的一個消息體,包含了具體的key/value消息
8、kafka中consumer group 是什么概念
同一個topic的數據,會廣播給不同的消費者組;同一個group中只有一個worker能拿到這個數據進行消費。group內的worker可以使用多線程或多進程來實現,也可以將進程分散在多臺機器上,一個組內worker的數量通常不超過partition的數量,且二者最好保持整數倍關系。
9、冪等性
即f(x)=f(f(x)) 能夠成立的數學性質。用在編程領域,則意為對同一個系統,使用同樣的條件,一次請求和重復的多次請求對系統資源的影響是一致的
10、Kafka中的消息是否會丟失和重復消費?
要確定Kafka的消息是否丟失或重復,從兩個方面分析入手:消息發送和消息消費。
1、消息發送
? ? ? ? ?Kafka消息發送有兩種方式:同步(sync)和異步(async),默認是異步方式,可通過producer.type屬性進行配置。Kafka通過配置request.required.acks屬性來確認消息的生產:
綜上所述,有6種消息生產的情況,下面分情況來分析消息丟失的場景:
(1)acks=0,不和Kafka集群進行消息接收確認,則當網絡異常、緩沖區滿了等情況時,消息可能丟失;
(2)acks=1,同步模式下,只有Leader確認接收成功后但掛掉了,副本沒有同步,消息可能丟失;
2、消息消費
Kafka消息消費有兩個consumer接口,Low-level?API和High-level?API:
Low-level?API:消費者自己維護offset等值,可以實現對Kafka的完全控制;
High-level?API:封裝了對parition和offset的管理,使用簡單;
如果使用高級接口High-level API,可能存在一個問題就是當消息消費者從集群中把消息取出來、并提交了新的消息offset值后,還沒來得及消費就掛掉了,那么下次再消費時之前沒消費成功的消息就“詭異”的消失了;
解決辦法:
????????針對消息丟失:同步模式下,確認機制設置為-1,即讓消息寫入Leader和Follower之后再確認消息發送成功;異步模式下,為防止緩沖區滿,可以在配置文件設置不限制阻塞超時時間,當緩沖區滿時讓生產者一直處于阻塞狀態;
????????針對消息重復:將消息的唯一標識保存到外部介質中,每次消費時判斷是否處理過即可。
11、如何保證消息不被重復消費?或者說,如何保證消息消費時的冪等性?
Kafka 實際上有個 offset 的概念,就是每個消息寫進去,都有一個 offset,然后 consumer 消費了數據之后,每隔一段時間會把自己消費過的消息的 offset 提交一下,表示我已經消費過了,但是如果進程直接掛掉了,重啟后這會導致 consumer 有些消息處理了但是沒來得及提交 offset,少數消息會再次消費一次。
其實重復消費不可怕,可怕的是你沒考慮到重復消費之后,怎么保證冪等性。
舉個例子吧。消費一條往數據庫里插入一條,要是你一個消息重復兩次,就插入了兩條數據就出錯了,但是你要是消費到第二次的時候,自己判斷一下已經消費過了,直接扔了,不就保留了一條數據,一條數據重復出現兩次,數據庫里就只有一條數據,這就保證了系統的冪等性。
如何保證冪等性主要是要結合具體的業務:
如果是數據要寫庫,你先根據主鍵查一下,如果這數據都有了,你就別插入了,update 一下好吧。
如果是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
如果更復雜一點,你需要讓生產者發送每條數據的時候,里面加一個全局唯一的 id存在redis中,消費時判斷一下id是否消費過。
12、Kafka中是怎么體現消息順序性的?
kafka每個partition中的消息在寫入時都是有序的,消費時每個partition只能被每個group中的一個消費者消費,保證了消費時也是有序的。topic不保證有序。如果要保證topic整個有序,那么將partition調整為1.
總結
以上是生活随笔為你收集整理的kafka记录及面试题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: B程序员:讲述三年计算机学习辛酸史
- 下一篇: shell脚本常用命令