消息中间件—Kafka 的设计思想
1.動機
設計 kafka 初衷,作為統(tǒng)一平臺處理大公司的實時數(shù)據(jù)。所以 必須具有如下特性:
- 支持海量數(shù)據(jù)
- 高吞吐量
- 低延遲(實時性)
- 支持分區(qū),分布式
- 容錯
2.持久化
kafka 高度依賴 文件系統(tǒng) 存儲和緩存消息。通過對磁盤的順序讀寫,并借助 OS 層面的 頁緩存(page cache),保證優(yōu)于緩存在內(nèi)存中或其他結構中。
為何使用磁盤效率仍然很高:
利用磁盤的順序讀寫,操作一個文件,將數(shù)據(jù)追加到文件的末尾。相比于隨機讀寫,效率很高。 利用 OS 層面的頁緩存(page cache),順序讀文件可以預讀數(shù)據(jù)到 page cache。通過自動訪問所有可用內(nèi)存 以及 存儲緊湊型字節(jié)結構而非單個對象提高內(nèi)存使用率。OS緩存相對于進程內(nèi)的緩存,重啟后仍然可用,不需要重建。 所有的操作時間復雜度都是 常量時間O(1),與數(shù)據(jù)大小無關,讀 和 寫 不會互相阻塞。
3.效率
使用磁盤效率低下主要有兩個原因:
過多的小 I/O 操作:發(fā)生在客戶端和服務端之間,以及 服務端自己的持久化操作中 過多的字節(jié)復制 針對 小 I/O 操作,kafka 根據(jù) "message set" 抽象構建了一個協(xié)議,該 抽象 自然地將消息分組在一起。該協(xié)議允許網(wǎng)絡請求將消息分組在一起,并分攤網(wǎng)絡往返的開銷,而不是一次發(fā)送一條消息。服務器依次將消息塊一次附加到其日志中,而消費者一次獲取大型線性塊。
針對過多的字節(jié)復制,使用了由生產(chǎn)者、代理 和 消費者共享的標準化二進制消息格式(這樣,數(shù)據(jù)塊就可以在它們之間不進行修改的情況下進行傳輸)。服務器所持有的消息日志 本身是一個文件目錄,每個文件都由一系列 "message set" 填充。這些消息集以生產(chǎn)者和消費者使用的相同格式寫入磁盤。維護這種通用格式可以優(yōu)化 持久化日志塊的 網(wǎng)絡傳輸。
存儲在文件中的信息通過網(wǎng)絡發(fā)送給客戶,經(jīng)歷的幾個路徑:
- 操作系統(tǒng)在內(nèi)核空間將數(shù)據(jù)從磁盤讀取到 page cache 中。
- 應用程序從內(nèi)核空間讀取到 用戶空間緩沖區(qū)。
- 應用程序?qū)?shù)據(jù)寫回到內(nèi)核空間的套接字緩沖區(qū)。
- 操作系統(tǒng)將數(shù)據(jù)從套接字緩沖區(qū)復制到 NIC 緩沖區(qū)(NIC:網(wǎng)絡接口控制器)。
- 以上產(chǎn)生了四個副本拷貝,2個系統(tǒng)調(diào)用開銷,效率低下。
大致流程
上下文切換開銷基于 零拷貝技術:消息數(shù)據(jù)直接從 page cache 發(fā)送到網(wǎng)絡。linux 中使用 sendfile 完成零拷貝技術。java 中 java.nio.channels.FileChannel 的 transferTo() 方法也使用了零拷貝技術。
減少開銷
kafka 通過 page cache 和 sendfile 的組合,將看不到磁盤上的任何讀取活動,因為它們將完全從緩存中提供數(shù)據(jù)。
端到端的批量壓縮 Kafka通過遞歸消息集來支持同時壓縮多個消息而減少相同消息的冗余。 一批消息可以一起壓縮并以此形式發(fā)送到服務器。 這批消息將以壓縮形式寫入,并將在日志中保持壓縮,并且只能由消費者解壓縮。Kafka支持GZIP和Snappy壓縮協(xié)議。
4.生產(chǎn)者
4.1負載均衡
生產(chǎn)者將數(shù)據(jù)直接發(fā)送給分區(qū)對應的 leader。為了實現(xiàn)這一點,所有的 kafka 節(jié)點要能夠在 任何時候應答 哪個服務器還活著以及 topic分區(qū)的leader在哪里的 元數(shù)據(jù)請求。
客戶端自己控制 消息發(fā)送到哪個分區(qū),這可以隨機完成,實現(xiàn)一種隨機的負載平衡,也可以通過一些語義分區(qū)函數(shù)完成。
4.2異步發(fā)送
啟用 kafka 生產(chǎn)者 的批處理,kafka 將在內(nèi)存中累積數(shù)據(jù)然后一次性批量發(fā)送。可以配置 累計不超過固定數(shù)量的消息(bach.size),等待不超過固定延遲時間(linger.ms)。
5.消費者
5.1拉 VS 推送
消費者主動拉取消息缺點:如果 broker 沒有數(shù)據(jù),消費者會輪詢,忙等待直到數(shù)據(jù)到達。kafka 可以在拉請求中設置一些參數(shù),允許使用者請求在“長輪詢”中阻塞,等待數(shù)據(jù)到達(也可以選擇等待,直到給定的字節(jié)數(shù)可用,以確保傳輸大小很大)
消費者被動推送消息缺點:很難適應消費速率不同的消費者,消息發(fā)送速率是由 broker 決定的,broker 是盡可能快的將消息發(fā)送出去,這樣會造成消費者來不及處理消息,典型的表現(xiàn)就是 網(wǎng)絡阻塞 和 拒絕服務。
5.2消費者的定位
topic 被分為一組有序的分區(qū),每個分區(qū)在任何給定的時間都由每個訂閱消費者組中的一個消費者消費。這意味著消費者在每個分區(qū)中的位置只是一個整數(shù),這個整數(shù)代表了即將要消費的消息的偏移量。這樣做的好處是可以返回到舊的偏移量進行消費。
5.3離線數(shù)據(jù)加載
可伸縮持久性允許消費者只定期使用,例如批量數(shù)據(jù)加載,定期將數(shù)據(jù)批量加載到離線系統(tǒng)(如Hadoop或關系數(shù)據(jù)倉庫)中。
6.消息傳遞語義
很明顯,消息傳遞保證能夠提供多種可能:
- 最多一次:消息可能丟失,但是絕不會重發(fā)
- 至少一次:消息絕不會丟失,但是可能會重發(fā)
- 正好一次:每條消息被傳遞一次 kafka 的消息傳遞語義:
一旦發(fā)布的消息已提交到日志,只要副本分區(qū)寫入了此消息的一個broker仍然"活著”,它就不會丟失。
0.11.0.0 版本之前,如果一個生產(chǎn)者沒有收到消息提交的響應,那么生產(chǎn)者只能重新發(fā)送該消息。這就保證了至少一次的傳遞語義。如果上一次的請求實際上是成功的,那么消息就會再次寫到日志中,造成重復消費。
0.11.0.0 版本之后,kafka 生產(chǎn)者支持冪等傳遞,保證重新發(fā)送不會導致日志中有重復記錄。為了實現(xiàn)這一點,broker 為 每一個生產(chǎn)者 分配一個 ID,使用生產(chǎn)者隨每條消息一起發(fā)送的序列號來消除重復的消息。
同時也是從 0.11.0.0 版本之后,生產(chǎn)者支持使用事務類語義將消息發(fā)送到多個 topic 分區(qū)的能力:即,要么所有消息都已成功寫入,要么都未成功寫入。這方面的主要用例是在Kafka topic 之間進行一次處理。
當然,不是所有的使用場景都需要如此嚴謹?shù)谋U?#xff0c;對于延遲敏感的,我們允許生產(chǎn)者指定它想要的耐用性水平。如生產(chǎn)者可以指定它獲取需等待10毫秒量級上的響應。生產(chǎn)者也可以指定異步發(fā)送,或只等待leader(不需要副本的響應)有響應。
從消費者的角度描述語義:
- 讀取到消息,在日志中保存位置,最后處理消息。這種順序 如果消費者在保存位置之后,處理消息之前崩潰,數(shù)據(jù)會丟失,屬于 最多一次的語義。
- 讀取消息,處理消息,在日志中保存位置。這種順序,如果消費者在處理消息之后,日志中保存位置之前崩潰,數(shù)據(jù)會被多次處理,屬于至少一次的語義。在多數(shù)情況下,消息都有一個主鍵,所以更新是冪等的(一次執(zhí)行和多次執(zhí)行的影響相同)。 kafka 默認是保證“至少一次”傳遞,并允許用戶通過禁止生產(chǎn)者重試和處理一批消息前提交它的偏移量來實現(xiàn) “最多一次”傳遞。而“正好一次”傳遞需要與目標存儲系統(tǒng)合作,但kafka提供了偏移量,所以實現(xiàn)這個很簡單。
7.副本
kafka 在各個服務器上備份 每個 topic 的 partition (通過 replication factor 設置副本數(shù))。當集群中的某個服務器發(fā)生故障時,自動轉(zhuǎn)移到這些副本,以便在故障時,消息仍然可用。
kafka 的默認 副本因子為 1,即不創(chuàng)建副本。副本因子是指副本的總數(shù),包括 leader 。
副本以 topic 的 partition 為單位。在非故障的情況下,kafka 中的每個 partition 都有一個 leader,0 個或者多個 follower。所有的讀 和寫都指向 分區(qū)的 leader。通常,分區(qū)數(shù) 多于 broker 的數(shù)量,leader 均勻的分布在 broker 上。follower 的日志與 leader 的日志相同,即相同的 偏移量 offset 和 消息順序 。(當然,有可能在某個時間點,leader 上比 follower 多幾條還未同步的消息)。
kafka 節(jié)點存活的2個條件:
- 一個節(jié)點必須能維持與 zookeeper 的會話(通過 zookeeper 的心跳機制)。
- 如果該節(jié)點是 slave,它必須復制 leader 的寫數(shù)據(jù),并且不能落后太多。 如果節(jié)點 死掉,卡主,或者落后太多,leader 將 從 同步副本 ISR (In Sync Replicas)中移除該節(jié)點。落后多少是由 replica.lag.max.messages 控制,卡主多久算卡主是由 replica.lag.time.max.ms 控制。
kafka 動態(tài)維護一組同步 leader 數(shù)據(jù)的副本(ISR),只有這個組中的成員才有資格當選 leader。在所有同步副本都收到寫操作之前,不會認為已提交對Kafka分區(qū)的寫操作。這組 ISR 保存在 zookeeper 中,正因為如此,在ISR中的任何副本都有資格當選leader。對于 f+1 個 副本的 kafka, topic 可以容忍f失敗而不會丟失已提交的消息。
如果所有的節(jié)點都死掉,有兩種可以實現(xiàn)的方式:
- 等待 ISR 列表中的節(jié)點活過來,并且選擇該節(jié)點作為 leader.
- 選擇第一個活過來的節(jié)點(不管它在 ISR 列表中)作為 leader. 從 0.11.0.0 開始 kafka 默認選擇第一種策略,等待一致性的副本;可以通過配置 unclean.leader.election.enable 為 true 來選用第二種策略。這兩種策略是 可用性 和一致性的權衡,需要根據(jù)實際業(yè)務來決定。
可用性 和 耐久性保證
當寫消息到 kafka 時,生產(chǎn)者可以 配置 需要 leader 收到的確認數(shù) 來確定是否完成請求,通過 配置 acks 滿足多種情況:
- acks = 0 :生產(chǎn)者不會等待服務器的任何確認,消息記錄將被立刻添加到 socket 緩沖區(qū)并視為已發(fā)送。這種情況無法確保服務器已經(jīng)接收到消息記錄,重試的配置也不會生效。每個記錄返回的偏移量始終被設置為 1.
- acks = 1 :服務器端的 leader 寫入消息到本地日志就立即響應生產(chǎn)者,而不等待 follower 應答。這種情況,如果在服務器響應生產(chǎn)者之后,復制到 follower 之前掛掉 就會丟失數(shù)據(jù)。
- acks = all(-1):服務器端的 leader 會等待 ISR 中所有副本同步響應來確認消息記錄。這保證了只要 ISR 中還有一個副本存活就不會丟失記錄,也可以設置為 -1; 提供兩種 topic 級別的配置 來確保 持久性 而非 可用性。
unclean.leader.election.enable 設為 false,(默認即為 false)即 所有的副本都不可用時,分區(qū)才不可用。只有當 ISR 中的節(jié)點 活過來 分區(qū)才能可用。 指定 一個最小的 ISR 數(shù)量值,通過 min.insync.replicas 來配置,只有當 ISR 中的數(shù)量 超過最小值,分區(qū)才會接受寫入操作,以此來防止僅寫入單個副本而后副本不可用而導致的消息的丟失。該設置僅在 acks = all 并保證至少有這么多同步副本確認消息時生效。 副本管理
上面關于復制日志的討論實際上只涉及了一個日志,例如 一個 topic 的partition,然而,kafka 集群管理著成百上千個這樣的分區(qū)。通過 round-robin 的方式平衡 集群中的分區(qū),避免 大部分的分區(qū)分布在少量的及誒單上,同樣,平衡 leader,使在分區(qū)份額上的每個節(jié)點都是 leader。
kafka 選擇 其中一個 broker 作為 controller(到 zookeeper 上注冊,先到先得)。該 controller 檢測 broker 級別的故障,并負責更改 故障 broker 上受影響的 分區(qū)的 leader。這樣就可以批量處理 leader 的變更。如果 controller 故障,其他存活的 broker 將會成為新的 controller(同樣需要到 zookeeper 上注冊)。
歡迎關注 編程那點事兒,隨時隨地想學就學~
轉(zhuǎn)載于:https://juejin.im/post/5c90538a5188252dac6d2881
總結
以上是生活随笔為你收集整理的消息中间件—Kafka 的设计思想的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 记录console的使用
- 下一篇: Reactor和Proactor对比以及