震惊!原来这才是Kafka的“真面目”!
出處:https://www.jianshu.com/p/d3e963ff8b70
Kafka 是一個分布式消息隊(duì)列,具有高性能、持久化、多副本備份、橫向擴(kuò)展能力。生產(chǎn)者往隊(duì)列里寫消息,消費(fèi)者從隊(duì)列里取消息進(jìn)行業(yè)務(wù)邏輯。一般在架構(gòu)設(shè)計(jì)中起到解耦、削峰、異步處理的作用。
Kafka 對外使用 Topic 的概念,生產(chǎn)者往 Topic 里寫消息,消費(fèi)者從中讀消息。
為了做到水平擴(kuò)展,一個 Topic 實(shí)際是由多個 Partition 組成的,遇到瓶頸時,可以通過增加 Partition 的數(shù)量來進(jìn)行橫向擴(kuò)容。單個 Parition 內(nèi)是保證消息有序。
每新寫一條消息,Kafka 就是在對應(yīng)的文件 append 寫,所以性能非常高。
Kafka 的總體數(shù)據(jù)流是這樣的:
大概用法就是,Producers 往 Brokers 里面的指定 Topic 中寫消息,Consumers 從 Brokers 里面拉取指定 Topic 的消息,然后進(jìn)行業(yè)務(wù)處理。
圖中有兩個 Topic,Topic0 有兩個 Partition,Topic1 有一個 Partition,三副本備份。
可以看到 Consumer Gourp1 中的 Consumer2 沒有分到 Partition 處理,這是有可能出現(xiàn)的,下面會講到。
關(guān)于 Broker、Topics、Partitions 的一些元信息用 ZK 來存,監(jiān)控和路由啥的也都會用到 ZK。
生產(chǎn)
基本流程是這樣的:
創(chuàng)建一條記錄,記錄中一個要指定對應(yīng)的 Topic 和 Value,Key 和 Partition 可選。?
先序列化,然后按照 Topic 和 Partition,放進(jìn)對應(yīng)的發(fā)送隊(duì)列中。Kafka Produce 都是批量請求,會積攢一批,然后一起發(fā)送,不是調(diào) send() 就立刻進(jìn)行網(wǎng)絡(luò)發(fā)包。
如果 Partition 沒填,那么情況會是這樣的:
-
Key 有填。按照 Key 進(jìn)行哈希,相同 Key 去一個 Partition。(如果擴(kuò)展了 Partition 的數(shù)量那么就不能保證了)
-
Key 沒填。Round-Robin 來選 Partition。
這些要發(fā)往同一個 Partition 的請求按照配置,攢一波,然后由一個單獨(dú)的線程一次性發(fā)過去。
API
有 High Level API,替我們把很多事情都干了,Offset,路由啥都替我們干了,用起來很簡單。
還有 Simple API,Offset 啥的都是要我們自己記錄。(注:消息消費(fèi)的時候,首先要知道去哪消費(fèi),這就是路由,消費(fèi)完之后,要記錄消費(fèi)單哪,就是 Offset)
Partition
當(dāng)存在多副本的情況下,會盡量把多個副本,分配到不同的 Broker 上。
Kafka 會為 Partition 選出一個 Leader,之后所有該 Partition 的請求,實(shí)際操作的都是 Leader,然后再同步到其他的 Follower。
當(dāng)一個 Broker 歇菜后,所有 Leader 在該 Broker 上的 Partition 都會重新選舉,選出一個 Leader。(這里不像分布式文件存儲系統(tǒng)那樣會自動進(jìn)行復(fù)制保持副本數(shù))
然后這里就涉及兩個細(xì)節(jié):
-
怎么分配 Partition
-
怎么選 Leader
關(guān)于 Partition 的分配,還有 Leader 的選舉,總得有個執(zhí)行者。在 Kafka 中,這個執(zhí)行者就叫 Controller。
Kafka 使用 ZK 在 Broker 中選出一個 Controller,用于 Partition 分配和 Leader 選舉。
Partition 的分配:
-
將所有 Broker(假設(shè)共 n 個 Broker)和待分配的 Partition 排序。
-
將第 i 個 Partition 分配到第(i mod n)個 Broker 上 (這個就是 Leader)。
-
將第 i 個 Partition 的第 j 個 Replica 分配到第((i + j) mode n)個 Broker 上。
Leader 容災(zāi)
Controller 會在 ZK 的 /brokers/ids 節(jié)點(diǎn)上注冊 Watch,一旦有 Broker 宕機(jī),它就能知道。
當(dāng) Broker 宕機(jī)后,Controller 就會給受到影響的 Partition 選出新 Leader。
Controller 從?ZK?的 /brokers/topics/[topic]/partitions/[partition]/state 中,讀取對應(yīng) Partition 的 ISR(in-sync replica 已同步的副本)列表,選一個出來做 Leader。
選出 Leader 后,更新 ZK,然后發(fā)送 LeaderAndISRRequest 給受影響的 Broker,讓它們知道改變這事。
為什么這里不是使用 ZK 通知,而是直接給 Broker 發(fā)送 RPC 請求,我的理解可能是這樣做 ZK 有性能問題吧。
如果 ISR 列表是空,那么會根據(jù)配置,隨便選一個 Replica 做 Leader,或者干脆這個 Partition 就是歇菜。
如果 ISR 列表的有機(jī)器,但是也歇菜了,那么還可以等 ISR 的機(jī)器活過來。
多副本同步
這里的策略,服務(wù)端這邊的處理是 Follower 從 Leader 批量拉取數(shù)據(jù)來同步。但是具體的可靠性,是由生產(chǎn)者來決定的。
生產(chǎn)者生產(chǎn)消息的時候,通過 request.required.acks 參數(shù)來設(shè)置數(shù)據(jù)的可靠性。
在 Acks=-1 的時候,如果 ISR 少于 min.insync.replicas 指定的數(shù)目,那么就會返回不可用。
這里 ISR 列表中的機(jī)器是會變化的,根據(jù)配置 replica.lag.time.max.ms,多久沒同步,就會從 ISR 列表中剔除。
以前還有根據(jù)落后多少條消息就踢出 ISR,在 1.0 版本后就去掉了,因?yàn)檫@個值很難取,在高峰的時候很容易出現(xiàn)節(jié)點(diǎn)不斷的進(jìn)出 ISR 列表。
從 ISA 中選出 Leader 后,Follower 會把自己日志中上一個高水位后面的記錄去掉,然后去和 Leader 拿新的數(shù)據(jù)。
因?yàn)樾碌?Leader 選出來后,Follower 上面的數(shù)據(jù),可能比新 Leader 多,所以要截取。
這里高水位的意思,對于 Partition 和 Leader,就是所有 ISR 中都有的最新一條記錄。消費(fèi)者最多只能讀到高水位。
從 Leader 的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR 中的 Broker 都 Fetch 到了,但是 ISR 中的 Broker 只有在下一輪的 Fetch 中才能告訴 Leader。
也正是由于這個高水位延遲一輪,在一些情況下,Kafka 會出現(xiàn)丟數(shù)據(jù)和主備數(shù)據(jù)不一致的情況,0.11 開始,使用 Leader Epoch 來代替高水位。
思考:當(dāng) Acks=-1 時
-
是 Follwers 都來 Fetch 就返回成功,還是等 Follwers 第二輪 Fetch?
-
Leader 已經(jīng)寫入本地,但是 ISR 中有些機(jī)器失敗,那么怎么處理呢?
消費(fèi)
訂閱 Topic 是以一個消費(fèi)組來訂閱的,一個消費(fèi)組里面可以有多個消費(fèi)者。同一個消費(fèi)組中的兩個消費(fèi)者,不會同時消費(fèi)一個 Partition。
換句話來說,就是一個 Partition,只能被消費(fèi)組里的一個消費(fèi)者消費(fèi),但是可以同時被多個消費(fèi)組消費(fèi)。
因此,如果消費(fèi)組內(nèi)的消費(fèi)者如果比 Partition 多的話,那么就會有個別消費(fèi)者一直空閑。
API
?
訂閱 Topic 時,可以用正則表達(dá)式,如果有新 Topic 匹配上,那能自動訂閱上。
Offset 的保存
一個消費(fèi)組消費(fèi) Partition,需要保存 Offset 記錄消費(fèi)到哪,以前保存在 ZK 中,由于 ZK 的寫性能不好,以前的解決方法都是 Consumer 每隔一分鐘上報(bào)一次。
這里 ZK 的性能嚴(yán)重影響了消費(fèi)的速度,而且很容易出現(xiàn)重復(fù)消費(fèi)。在 0.10 版本后,Kafka 把這個 Offset 的保存,從 ZK 總剝離,保存在一個名叫 consumeroffsets topic 的 Topic 中。
寫進(jìn)消息的 Key 由 Groupid、Topic、Partition 組成,Value 是偏移量 Offset。Topic 配置的清理策略是 Compact。總是保留最新的 Key,其余刪掉。
一般情況下,每個 Key 的 Offset 都是緩存在內(nèi)存中,查詢的時候不用遍歷 Partition,如果沒有緩存,第一次就會遍歷 Partition 建立緩存,然后查詢返回。
確定 Consumer Group 位移信息寫入 consumers_offsets 的哪個 Partition,具體計(jì)算公式:
__consumers_offsets?partition?=Math.abs(groupId.hashCode()?%?groupMetadataTopicPartitionCount)??? //groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認(rèn)是50個分區(qū)。思考:如果正在跑的服務(wù),修改了 offsets.topic.num.partitions,那么 Offset 的保存是不是就亂套了?
分配 Partition—Reblance
生產(chǎn)過程中 Broker 要分配 Partition,消費(fèi)過程這里,也要分配 Partition 給消費(fèi)者。
類似 Broker 中選了一個 Controller 出來,消費(fèi)也要從 Broker 中選一個 Coordinator,用于分配 Partition。
下面從頂向下,分別闡述一下:
-
怎么選 Coordinator
-
交互流程
-
Reblance 的流程
①選 Coordinator:看 Offset 保存在那個 Partition;該 Partition Leader 所在的 Broker 就是被選定的 Coordinator。
這里我們可以看到,Consumer Group 的 Coordinator,和保存 Consumer Group Offset 的 Partition Leader 是同一臺機(jī)器。
②交互流程:把 Coordinator 選出來之后,就是要分配了。
整個流程是這樣的:
-
Consumer 啟動、或者 Coordinator 宕機(jī)了,Consumer 會任意請求一個 Broker,發(fā)送 ConsumerMetadataRequest 請求。
Broker 會按照上面說的方法,選出這個 Consumer 對應(yīng) Coordinator 的地址。
-
Consumer 發(fā)送 Heartbeat 請求給 Coordinator,返回 IllegalGeneration 的話,就說明 Consumer 的信息是舊的了,需要重新加入進(jìn)來,進(jìn)行 Reblance。
返回成功,那么 Consumer 就從上次分配的 Partition 中繼續(xù)執(zhí)行。
③Reblance 流程:
-
Consumer 給 Coordinator 發(fā)送 JoinGroupRequest 請求。
-
這時其他 Consumer 發(fā) Heartbeat 請求過來時,Coordinator 會告訴他們,要 Reblance 了。
-
其他 Consumer 發(fā)送 JoinGroupRequest 請求。
-
所有記錄在冊的 Consumer 都發(fā)了 JoinGroupRequest 請求之后,Coordinator 就會在這里 Consumer 中隨便選一個 Leader。
然后回 JoinGroupRespone,這會告訴 Consumer 你是 Follower 還是 Leader,對于 Leader,還會把 Follower 的信息帶給它,讓它根據(jù)這些信息去分配 Partition。
-
Consumer 向 Coordinator 發(fā)送 SyncGroupRequest,其中 Leader 的 SyncGroupRequest 會包含分配的情況。
-
Coordinator 回包,把分配的情況告訴 Consumer,包括 Leader。
當(dāng) Partition 或者消費(fèi)者的數(shù)量發(fā)生變化時,都得進(jìn)行 Reblance。
列舉一下會 Reblance 的情況:
-
增加 Partition
-
增加消費(fèi)者
-
消費(fèi)者主動關(guān)閉
-
消費(fèi)者宕機(jī)了
-
Coordinator 自己也宕機(jī)了
消息投遞語義
Kafka 支持 3 種消息投遞語義:
-
At most once:最多一次,消息可能會丟失,但不會重復(fù)。
-
At least once:最少一次,消息不會丟失,可能會重復(fù)。
-
Exactly once:只且一次,消息不丟失不重復(fù),只且消費(fèi)一次(0.11 中實(shí)現(xiàn),僅限于下游也是 Kafka)
在業(yè)務(wù)中,常常都是使用 At least once 的模型,如果需要可重入的話,往往是業(yè)務(wù)自己實(shí)現(xiàn)。
At least once
先獲取數(shù)據(jù),再進(jìn)行業(yè)務(wù)處理,業(yè)務(wù)處理成功后 Commit Offset:
-
生產(chǎn)者生產(chǎn)消息異常,消息是否成功寫入不確定,重做,可能寫入重復(fù)的消息。
-
消費(fèi)者處理消息,業(yè)務(wù)處理成功后,更新 Offset 失敗,消費(fèi)者重啟的話,會重復(fù)消費(fèi)。
At most once
先獲取數(shù)據(jù),再 Commit Offset,最后進(jìn)行業(yè)務(wù)處理:
-
生產(chǎn)者生產(chǎn)消息異常,不管,生產(chǎn)下一個消息,消息就丟了。
-
消費(fèi)者處理消息,先更新 Offset,再做業(yè)務(wù)處理,做業(yè)務(wù)處理失敗,消費(fèi)者重啟,消息就丟了。
Exactly once
思路是這樣的,首先要保證消息不丟,再去保證不重復(fù)。所以盯著 At least once 的原因來搞。
首先想出來的:
-
生產(chǎn)者重做導(dǎo)致重復(fù)寫入消息:生產(chǎn)保證冪等性。
-
消費(fèi)者重復(fù)消費(fèi):消滅重復(fù)消費(fèi),或者業(yè)務(wù)接口保證冪等性重復(fù)消費(fèi)也沒問題。
由于業(yè)務(wù)接口是否冪等,不是 Kafka 能保證的,所以 Kafka 這里提供的 Exactly once 是有限制的,消費(fèi)者的下游也必須是 Kafka。
所以以下討論的,沒特殊說明,消費(fèi)者的下游系統(tǒng)都是 Kafka(注:使用 Kafka Conector,它對部分系統(tǒng)做了適配,實(shí)現(xiàn)了 Exactly once)。生產(chǎn)者冪等性好做,沒啥問題。
解決重復(fù)消費(fèi)有兩個方法:
-
下游系統(tǒng)保證冪等性,重復(fù)消費(fèi)也不會導(dǎo)致多條記錄。
-
把 Commit Offset 和業(yè)務(wù)處理綁定成一個事務(wù)。
本來 Exactly once 實(shí)現(xiàn)第 1 點(diǎn)就 OK 了。但是在一些使用場景下,我們的數(shù)據(jù)源可能是多個 Topic,處理后輸出到多個 Topic,這時我們會希望輸出時要么全部成功,要么全部失敗。這就需要實(shí)現(xiàn)事務(wù)性。
既然要做事務(wù),那么干脆把重復(fù)消費(fèi)的問題從根源上解決,把 Commit Offset 和輸出到其他 Topic 綁定成一個事務(wù)。
生產(chǎn)冪等性
思路是這樣的,為每個 Producer 分配一個 Pid,作為該 Producer 的唯一標(biāo)識。
Producer 會為每一個維護(hù)一個單調(diào)遞增的 Seq。類似的,Broker 也會為每個記錄下最新的 Seq。
當(dāng) req_seq == broker_seq+1 時,Broker 才會接受該消息,因?yàn)?#xff1a;
-
消息的 Seq 比 Broker 的 Seq 大超過時,說明中間有數(shù)據(jù)還沒寫入,即亂序了。
-
消息的 Seq 不比 Broker 的 Seq 小,那么說明該消息已被保存。
事務(wù)性/原子性廣播
場景是這樣的:
-
先從多個源 Topic 中獲取數(shù)據(jù)。
-
做業(yè)務(wù)處理,寫到下游的多個目的 Topic。
-
更新多個源 Topic 的 Offset。
其中第 2、3 點(diǎn)作為一個事務(wù),要么全成功,要么全失敗。這里得益于?Offset 實(shí)際上是用特殊的 Topic 去保存,這兩點(diǎn)都?xì)w一為寫多個 Topic 的事務(wù)性處理。
基本思路是這樣的:
-
引入 Tid(transaction id),和 Pid 不同,這個 ID 是應(yīng)用程序提供的,用于標(biāo)識事務(wù),和 Producer 是誰并沒關(guān)系。
就是任何 Producer 都可以使用這個 Tid 去做事務(wù),這樣進(jìn)行到一半就死掉的事務(wù),可以由另一個 Producer 去恢復(fù)。
-
同時為了記錄事務(wù)的狀態(tài),類似對 Offset 的處理,引入 Transaction Coordinator 用于記錄 Transaction Log。
在集群中會有多個 Transaction Coordinator,每個 Tid 對應(yīng)唯一一個 Transaction Coordinator。
注:Transaction Log 刪除策略是 Compact,已完成的事務(wù)會標(biāo)記成 Null,Compact 后不保留。
做事務(wù)時,先標(biāo)記開啟事務(wù),寫入數(shù)據(jù),全部成功就在 Transaction Log 中記錄為 Prepare Commit 狀態(tài),否則寫入 Prepare Abort 的狀態(tài)。
之后再去給每個相關(guān)的 Partition 寫入一條 Marker(Commit 或者 Abort)消息,標(biāo)記這個事務(wù)的 Message 可以被讀取或已經(jīng)廢棄。成功后在 Transaction Log記錄下 Commit/Abort 狀態(tài),至此事務(wù)結(jié)束。
數(shù)據(jù)流:
-
首先使用 Tid 請求任意一個 Broker(代碼中寫的是負(fù)載最小的 Broker),找到對應(yīng)的 Transaction Coordinator。
-
請求 Transaction Coordinator 獲取到對應(yīng)的 Pid,和 Pid 對應(yīng)的 Epoch,這個 Epoch 用于防止僵死進(jìn)程復(fù)活導(dǎo)致消息錯亂。
當(dāng)消息的 Epoch 比當(dāng)前維護(hù)的 Epoch 小時,拒絕掉。Tid 和 Pid 有一一對應(yīng)的關(guān)系,這樣對于同一個 Tid 會返回相同的 Pid。
-
Client 先請求 Transaction Coordinator 記錄的事務(wù)狀態(tài),初始狀態(tài)是 Begin,如果是該事務(wù)中第一個到達(dá)的,同時會對事務(wù)進(jìn)行計(jì)時。
Client 輸出數(shù)據(jù)到相關(guān)的 Partition 中;Client 再請求 Transaction Coordinator 記錄 Offset 的事務(wù)狀態(tài);Client 發(fā)送 Offset Commit 到對應(yīng) Offset Partition。
-
Client 發(fā)送 Commit 請求,Transaction Coordinator 記錄 Prepare Commit/Abort,然后發(fā)送 Marker 給相關(guān)的 Partition。
全部成功后,記錄 Commit/Abort 的狀態(tài),最后這個記錄不需要等待其他 Replica 的 ACK,因?yàn)?Prepare 不丟就能保證最終的正確性了。
這里 Prepare 的狀態(tài)主要是用于事務(wù)恢復(fù),例如給相關(guān)的 Partition 發(fā)送控制消息,沒發(fā)完就宕機(jī)了,備機(jī)起來后,Producer 發(fā)送請求獲取 Pid 時,會把未完成的事務(wù)接著完成。
當(dāng) Partition 中寫入 Commit 的 Marker 后,相關(guān)的消息就可被讀取。所以 Kafka 事務(wù)在 Prepare Commit 到 Commit 這個時間段內(nèi),消息是逐漸可見的,而不是同一時刻可見。
消費(fèi)事務(wù)
前面都是從生產(chǎn)的角度看待事務(wù)。還需要從消費(fèi)的角度去考慮一些問題。
消費(fèi)時,Partition 中會存在一些消息處于未 Commit 狀態(tài),即業(yè)務(wù)方應(yīng)該看不到的消息,需要過濾這些消息不讓業(yè)務(wù)看到,Kafka 選擇在消費(fèi)者進(jìn)程中進(jìn)行過來,而不是在 Broker 中過濾,主要考慮的還是性能。
Kafka 高性能的一個關(guān)鍵點(diǎn)是 Zero Copy,如果需要在 Broker 中過濾,那么勢必需要讀取消息內(nèi)容到內(nèi)存,就會失去 Zero Copy 的特性。
文件組織
Kafka 的數(shù)據(jù),實(shí)際上是以文件的形式存儲在文件系統(tǒng)的。Topic 下有 Partition,Partition 下有 Segment,Segment 是實(shí)際的一個個文件,Topic 和 Partition 都是抽象概念。
在目錄 /partitionid}/ 下,存儲著實(shí)際的 Log 文件(即 Segment),還有對應(yīng)的索引文件。
每個 Segment 文件大小相等,文件名以這個 Segment 中最小的 Offset 命名,文件擴(kuò)展名是 .log。Segment 對應(yīng)的索引的文件名字一樣,擴(kuò)展名是 .index。
有兩個 Index 文件:
-
一個是 Offset Index 用于按 Offset 去查 Message。
-
一個是 Time Index 用于按照時間去查,其實(shí)這里可以優(yōu)化合到一起,下面只說 Offset Index。
總體的組織是這樣的:
為了減少索引文件的大小,降低空間使用,方便直接加載進(jìn)內(nèi)存中,這里的索引使用稀疏矩陣,不會每一個 Message 都記錄下具體位置,而是每隔一定的字節(jié)數(shù),再建立一條索引。?
索引包含兩部分:
-
BaseOffset:意思是這條索引對應(yīng) Segment 文件中的第幾條 Message。這樣做方便使用數(shù)值壓縮算法來節(jié)省空間。例如 Kafka 使用的是 Varint。
-
Position:在 Segment 中的絕對位置。
查找 Offset 對應(yīng)的記錄時,會先用二分法,找出對應(yīng)的 Offset 在哪個 Segment 中,然后使用索引,在定位出 Offset 在 Segment 中的大概位置,再遍歷查找 Message。
常用配置項(xiàng)
Broker 配置
Topic 配置
關(guān)于日志清理,默認(rèn)當(dāng)前正在寫的日志,是怎么也不會清理掉的。
還有 0.10 之前的版本,時間看的是日志文件的 Mtime,但這個值是不準(zhǔn)確的,有可能文件被 Touch 一下,Mtime 就變了。因此從?0.10 版本開始,改為使用該文件最新一條消息的時間來判斷。
按大小清理這里也要注意,Kafka 在定時任務(wù)中嘗試比較當(dāng)前日志量總大小是否超過閾值至少一個日志段的大小。如果超過但是沒超過一個日志段,那么就不會刪除。
總結(jié)
以上是生活随笔為你收集整理的震惊!原来这才是Kafka的“真面目”!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 来,聊聊程序员的爱情
- 下一篇: 拯救 Out Of Memory,8个案