深入理解分布式消息队列
一、消息隊(duì)列的演進(jìn)
分布式消息隊(duì)列中間件是是大型分布式系統(tǒng)中常見的中間件。消息隊(duì)列主要解決應(yīng)用耦合、異步消息、流量削鋒等問題,具有高性能、高可用、可伸縮和最終一致性等特點(diǎn)。消息隊(duì)列已經(jīng)逐漸成為企業(yè)應(yīng)用系統(tǒng)內(nèi)部通信的核心手段,使用較多的消息隊(duì)列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等,此外,利用數(shù)據(jù)庫(如 Redis、MySQL 等)也可實(shí)現(xiàn)消息隊(duì)列的部分基本功能。
1.基于 OS 的 MQ
單機(jī)消息隊(duì)列可以通過操作系統(tǒng)原生的進(jìn)程間通信機(jī)制來實(shí)現(xiàn),如消息隊(duì)列、共享內(nèi)存等。比如我們可以在共享內(nèi)存中維護(hù)一個(gè)雙端隊(duì)列:
消息產(chǎn)出進(jìn)程不停地往隊(duì)列里添加消息,同時(shí)消息消費(fèi)進(jìn)程不斷地從隊(duì)尾有序地取出這些消息。添加消息的任務(wù)我們稱為 producer,而取出并使用消息的任務(wù),我們稱之為 consumer。這種模式在早期單機(jī)多進(jìn)程模式中比較常見, 比如 IO 進(jìn)程把收到的網(wǎng)絡(luò)請(qǐng)求存入本機(jī) MQ,任務(wù)處理進(jìn)程從本機(jī) MQ 中讀取任務(wù)并進(jìn)行處理。
單機(jī) MQ 易于實(shí)現(xiàn),但是缺點(diǎn)也很明顯:因?yàn)橐蕾囉趩螜C(jī) OS 的 IPC 機(jī)制,所以無法實(shí)現(xiàn)分布式的消息傳遞,并且消息隊(duì)列的容量也受限于單機(jī)資源。
2.基于 DB 的 MQ
即使用存儲(chǔ)組件(如 Mysql 、 Redis 等)存儲(chǔ)消息, 然后在消息的生產(chǎn)側(cè)和消費(fèi)側(cè)實(shí)現(xiàn)消息的生產(chǎn)消費(fèi)邏輯,從而實(shí)現(xiàn) MQ 功能。以 Redis 為例, 可以使用 Redis 自帶的 list 實(shí)現(xiàn)。Redis list 使用 lpush 命令,從隊(duì)列左邊插入數(shù)據(jù);使用 rpop 命令,從隊(duì)列右邊取出數(shù)據(jù)。與單機(jī) MQ 相比, 該方案至少滿足了分布式, 但是仍然帶有很多無法接受的缺陷。
熱 key 性能問題:不論是用 codis 還是 twemproxy 這種集群方案,對(duì)某個(gè)隊(duì)列的讀寫請(qǐng)求最終都會(huì)落到同一臺(tái) redis 實(shí)例上,并且無法通過擴(kuò)容來解決問題。如果對(duì)某個(gè) list 的并發(fā)讀寫非常高,就產(chǎn)生了無法解決的熱 key,嚴(yán)重可能導(dǎo)致系統(tǒng)崩潰
沒有消費(fèi)確認(rèn)機(jī)制:每當(dāng)執(zhí)行 rpop 消費(fèi)一條數(shù)據(jù),那條消息就被從 list 中永久刪除了。如果消費(fèi)者消費(fèi)失敗,這條消息也沒法找回了。
不支持多訂閱者:一條消息只能被一個(gè)消費(fèi)者消費(fèi),rpop 之后就沒了。如果隊(duì)列中存儲(chǔ)的是應(yīng)用的日志,對(duì)于同一條消息,監(jiān)控系統(tǒng)需要消費(fèi)它來進(jìn)行可能的報(bào)警,BI 系統(tǒng)需要消費(fèi)它來繪制報(bào)表,鏈路追蹤需要消費(fèi)它來繪制調(diào)用關(guān)系……這種場景 redis list 就沒辦法支持了
不支持二次消費(fèi):一條消息 rpop 之后就沒了。如果消費(fèi)者程序運(yùn)行到一半發(fā)現(xiàn)代碼有 bug,修復(fù)之后想從頭再消費(fèi)一次就不行了。
針對(duì)上述缺點(diǎn),redis 5.0 開始引入 stream 數(shù)據(jù)類型,它是專門設(shè)計(jì)成為消息隊(duì)列的數(shù)據(jù)結(jié)構(gòu),借鑒了很多 kafka 的設(shè)計(jì),但是隨著很多分布式 MQ 組件的出現(xiàn),仍然顯得不夠友好, 畢竟 Redis 天生就不是用來做消息轉(zhuǎn)發(fā)的。
3. 專用分布式 MQ 中間件
隨著時(shí)代的發(fā)展,一個(gè)真正的消息隊(duì)列,已經(jīng)不僅僅是一個(gè)隊(duì)列那么簡單了,業(yè)務(wù)對(duì) MQ 的吞吐量、擴(kuò)展性、穩(wěn)定性、可靠性等都提出了嚴(yán)苛的要求。因此,專用的分布式消息中間件開始大量出現(xiàn)。常見的有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、Pulsar 等等。
二、消息隊(duì)列設(shè)計(jì)要點(diǎn)
消息隊(duì)列本質(zhì)上是一個(gè)消息的轉(zhuǎn)發(fā)系統(tǒng), 把一次 RPC 就可以直接完成的消息投遞,轉(zhuǎn)換成多次 RPC 間接完成,這其中包含兩個(gè)關(guān)鍵環(huán)節(jié):
1.消息轉(zhuǎn)儲(chǔ);
2.消息投遞:時(shí)機(jī)和對(duì)象;
基于此,消息隊(duì)列的整體設(shè)計(jì)思路是:
確定整體的數(shù)據(jù)流向:如 producer 發(fā)送給 MQ,MQ 轉(zhuǎn)發(fā)給 consumer,consumer 回復(fù)消費(fèi)確認(rèn),消息刪除、消息備份等。
利用 RPC 將數(shù)據(jù)流串起來,最好基于現(xiàn)有的 RPC 框架,盡量做到無狀態(tài),方便水平擴(kuò)展。
存儲(chǔ)選型,綜合考慮性能、可靠性和開發(fā)維護(hù)成本等諸多因素。
消息投遞,消費(fèi)模式 push、pull。
消費(fèi)關(guān)系維護(hù),單播、多播等,可以利用 zk、config server 等保存消費(fèi)關(guān)系。
高級(jí)特性,如可靠投遞,重復(fù)消息,順序消息等, 很多高級(jí)特性之間是相互制約的關(guān)系,這里要充分結(jié)合應(yīng)用場景做出取舍。
1.MQ 基本特性
RPC 通信
MQ 組件要實(shí)現(xiàn)和生產(chǎn)者以及消費(fèi)者進(jìn)行通信功能, 這里涉及到 RPC 通信問題。消息隊(duì)列的 RPC,和普通的 RPC 沒有本質(zhì)區(qū)別。對(duì)于負(fù)載均衡、服務(wù)發(fā)現(xiàn)、序列化協(xié)議等等問題都可以借助現(xiàn)有 RPC 框架來實(shí)現(xiàn),避免重復(fù)造輪子。
存儲(chǔ)系統(tǒng)
存儲(chǔ)可以做成很多方式。比如存儲(chǔ)在內(nèi)存里,存儲(chǔ)在分布式 KV 里,存儲(chǔ)在磁盤里,存儲(chǔ)在數(shù)據(jù)庫里等等。但歸結(jié)起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),并且理論上能承載更大限度的消息堆積(外存的空間遠(yuǎn)大于內(nèi)存)。但并不是每種消息都需要持久化存儲(chǔ)。很多消息對(duì)于投遞性能的要求大于可靠性的要求,且數(shù)量極大(如日志)。這時(shí)候,消息不落地直接暫存內(nèi)存,嘗試幾次 failover,最終投遞出去也未嘗不可。常見的消息隊(duì)列普遍兩種形式都支持。
從速度來看,理論上,文件系統(tǒng)>分布式 KV(持久化)>分布式文件系統(tǒng)>數(shù)據(jù)庫,而可靠性卻相反。還是要從支持的業(yè)務(wù)場景出發(fā)作出最合理的選擇。
高可用
MQ 的高可用,依賴于 RPC 和存儲(chǔ)的高可用。通常 RPC 服務(wù)自身都具有服務(wù)自動(dòng)發(fā)現(xiàn),負(fù)載均衡等功能,保證了其高可用。存儲(chǔ)的高可用, 例如 Kafka,使用分區(qū)加主備模式,保證每一個(gè)分區(qū)內(nèi)的高可用性,也就是每一個(gè)分區(qū)至少要有一個(gè)備份且需要做數(shù)據(jù)的同步。
推拉模型
push 和 pull 模型各有利弊,兩種模式也都有被市面上成熟的消息中間件選用。
1.慢消費(fèi)
慢消費(fèi)是 push 模型最大的致命傷,如果消費(fèi)者的速度比發(fā)送者的速度慢很多,會(huì)出現(xiàn)兩種惡劣的情況:
1.消息在 broker 的堆積。假設(shè)這些消息都是有用的無法丟棄的,消息就要一直在 broker 端保存。
2.broker 推送給 consumer 的消息 consumer 無法處理,此時(shí) consumer 只能拒絕或者返回錯(cuò)誤。
而 pull 模式下,consumer 可以按需消費(fèi),不用擔(dān)心自己處理不了的消息來騷擾自己,而 broker 堆積消息也會(huì)相對(duì)簡單,無需記錄每一個(gè)要發(fā)送消息的狀態(tài),只需要維護(hù)所有消息的隊(duì)列和偏移量就可以了。所以對(duì)于慢消費(fèi),消息量有限且到來的速度不均勻的情況,pull 模式比較合適。
2.消息延遲與忙等
這是 pull 模式最大的短板。由于主動(dòng)權(quán)在消費(fèi)方,消費(fèi)方無法準(zhǔn)確地決定何時(shí)去拉取最新的消息。如果一次 pull 取到消息了還可以繼續(xù)去 pull,如果沒有 pull 取到則需要等待一段時(shí)間重新 pull。
消息投放時(shí)機(jī)
即消費(fèi)者應(yīng)該在什么時(shí)機(jī)消費(fèi)消息。一般有以下三種方式:
攢夠了一定數(shù)量才投放。
到達(dá)了一定時(shí)間就投放。
有新的數(shù)據(jù)到來就投放。
至于如何選擇,也要結(jié)合具體的業(yè)務(wù)場景來決定。比如,對(duì)及時(shí)性要求高的數(shù)據(jù),可用采用方式 3 來完成。
消息投放對(duì)象
不管是 JMS 規(guī)范中的 Topic/Queue,Kafka 里面的 Topic/Partition/ConsumerGroup,還是 AMQP(如 RabbitMQ)的 Exchange 等等, 都是為了維護(hù)消息的消費(fèi)關(guān)系而抽象出來的概念。本質(zhì)上,消息的消費(fèi)無外乎點(diǎn)到點(diǎn)的一對(duì)一單播,或一對(duì)多廣播。另外比較特殊的情況是組間廣播、組內(nèi)單播。比較通用的設(shè)計(jì)是,不同的組注冊(cè)不同的訂閱,支持組間廣播。組內(nèi)不同的機(jī)器,如果注冊(cè)一個(gè)相同的 ID,則單播;如果注冊(cè)不同的 ID(如 IP 地址+端口),則廣播。
例如 pulsar 支持的訂閱模型有:
Exclusive:獨(dú)占型,一個(gè)訂閱只能有一個(gè)消息者消費(fèi)消息。
Failover:災(zāi)備型,一個(gè)訂閱同時(shí)只有一個(gè)消費(fèi)者,可以有多個(gè)備份消費(fèi)者。一旦主消費(fèi)者故障則備份消費(fèi)者接管。不會(huì)出現(xiàn)同時(shí)有兩個(gè)活躍的消費(fèi)者。
Shared:共享型,一個(gè)訂閱中同時(shí)可以有多個(gè)消費(fèi)者,多個(gè)消費(fèi)者共享 Topic 中的消息。
Key_Shared:鍵共享型,多個(gè)消費(fèi)者各取一部分消息。
通常會(huì)在公共存儲(chǔ)上維護(hù)廣播關(guān)系,如 config server、zookeeper 等。
2.隊(duì)列高級(jí)特性
常見的高級(jí)特性有可靠投遞、消息丟失、消息重復(fù)、事務(wù)等等,他們并非是 MQ 必備的特性。由于這些特性可能是相互制約的,所以不可能完全兼顧。所以要依照業(yè)務(wù)的需求,來仔細(xì)衡量各種特性實(shí)現(xiàn)的成本、利弊,最終做出最為合理的設(shè)計(jì)。
可靠投遞
如何保證消息完全不丟失?
直觀的方案是,在任何不可靠操作之前,先將消息落地,然后操作。當(dāng)失敗或者不知道結(jié)果(比如超時(shí))時(shí),消息狀態(tài)是待發(fā)送,定時(shí)任務(wù)不停輪詢所有待發(fā)送消息,最終一定可以送達(dá)。但是,這樣必然導(dǎo)致消息可能會(huì)重復(fù),并且在異常情況下,消息延遲較大。
例如:
producer 往 broker 發(fā)送消息之前,需要做一次落地。
請(qǐng)求到 server 后,server 確保數(shù)據(jù)落地后再告訴客戶端發(fā)送成功。
支持廣播的消息隊(duì)列需要對(duì)每個(gè)接收者,持久化一個(gè)發(fā)送狀態(tài),直到所有接收者都確認(rèn)收到,才可刪除消息。
即對(duì)于任何不能確認(rèn)消息已送達(dá)的情況,都要重推消息。但是,隨著而來的問題就是消息重復(fù)。在消息重復(fù)和消息丟失之間,無法兼顧,要結(jié)合應(yīng)用場景做出取舍。
消費(fèi)確認(rèn)
當(dāng) broker 把消息投遞給消費(fèi)者后,消費(fèi)者可以立即確認(rèn)收到了消息。但是,有些情況消費(fèi)者可能需要再次接收該消息(比如收到消息、但是處理失敗),即消費(fèi)者主動(dòng)要求重發(fā)消息。所以,要允許消費(fèi)者主動(dòng)進(jìn)行消費(fèi)確認(rèn)。
順序消息
對(duì)于 push 模式,要求支持分區(qū)且單分區(qū)只支持一個(gè)消費(fèi)者消費(fèi),并且消費(fèi)者只有確認(rèn)一個(gè)消息消費(fèi)后才能 push 另外一個(gè)消息,還要發(fā)送者保證發(fā)送順序唯一。
對(duì)于 pull 模式,比如 kafka 的做法:
producer 對(duì)應(yīng) partition,并且單線程。
consumer 對(duì)應(yīng) partition,消費(fèi)確認(rèn)(或批量確認(rèn)),單線程消費(fèi)。
但是這樣也只是實(shí)現(xiàn)了消息的分區(qū)有序性,并不一定全局有序。總體而言,要求消息有序的 MQ 場景還是比較少的。
三、Kafka
Kafka 是一個(gè)分布式發(fā)布訂閱消息系統(tǒng)。它以高吞吐、可持久化、可水平擴(kuò)展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用(如 Storm、Spark、Flink)。在大數(shù)據(jù)系統(tǒng)中,數(shù)據(jù)需要在各個(gè)子系統(tǒng)中高性能、低延遲的不停流轉(zhuǎn)。傳統(tǒng)的企業(yè)消息系統(tǒng)并不是非常適合大規(guī)模的數(shù)據(jù)處理,但 Kafka 出現(xiàn)了,它可以高效的處理實(shí)時(shí)消息和離線消息,降低編程復(fù)雜度,使得各個(gè)子系統(tǒng)可以快速高效的進(jìn)行數(shù)據(jù)流轉(zhuǎn),Kafka 承擔(dān)高速數(shù)據(jù)總線的作用。
kafka 基礎(chǔ)概念
BrokerKafka 集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為 broker。
TopicTopic 在邏輯上可以被認(rèn)為是一個(gè) queue,每條消費(fèi)都必須指定它的 Topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個(gè) queue 里。為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個(gè)或多個(gè) Partition,每個(gè) Partition 在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè) Partition 的所有消息和索引文件。
PartitionParition 是物理上的概念,每個(gè) Topic 包含一個(gè)或多個(gè) Partition。
Producer負(fù)責(zé)發(fā)布消息到 Kafka broker。
Consumer消息消費(fèi)者,向 Kafka broker 讀取消息的客戶端。
Consumer Group每個(gè) Consumer 屬于一個(gè)特定的 Consumer Group(可為每個(gè) Consumer 指定 group name,若不指定 group name 則屬于默認(rèn)的 group)。
一個(gè)典型的 kafka 集群包含若干 Producer,若干個(gè) Broker(kafka 支持水平擴(kuò)展)、若干個(gè) Consumer Group,以及一個(gè) zookeeper 集群。Producer 使用 push 模式將消息發(fā)布到 broker。consumer 使用 pull 模式從 broker 訂閱并消費(fèi)消息。多個(gè) broker 協(xié)同工作,producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中。kafka 通過 zookeeper 管理集群配置及服務(wù)協(xié)同。
這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)。Kafka 有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn),producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費(fèi)消息的過程是 pull,主動(dòng)去拉數(shù)據(jù)。而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer。
Producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù) Paritition 機(jī)制選擇將其存儲(chǔ)到哪一個(gè) Partition。如果 Partition 機(jī)制設(shè)置合理,所有消息可以均勻分布到不同的 Partition 里,這樣就實(shí)現(xiàn)了負(fù)載均衡。如果一個(gè) Topic 對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器 I/O 將會(huì)成為這個(gè) Topic 的性能瓶頸,而有了 Partition 后,不同的消息可以并行寫入不同 broker 的不同 Partition 里,極大的提高了吞吐率。
Kafka 特點(diǎn)
優(yōu)點(diǎn):
高性能:單機(jī)測試能達(dá)到 100w tps
低延時(shí):生產(chǎn)和消費(fèi)的延時(shí)都很低,e2e 的延時(shí)在正常的 cluster 中也很低
可用性高:replicate+ isr + 選舉 機(jī)制保證
工具鏈成熟:監(jiān)控 運(yùn)維 管理 方案齊全
生態(tài)成熟:大數(shù)據(jù)場景必不可少 kafka stream
不足:
無法彈性擴(kuò)容:對(duì) partition 的讀寫都在 partition leader 所在的 broker,如果該 broker 壓力過大,也無法通過新增 broker 來解決問題
擴(kuò)容成本高:集群中新增的 broker 只會(huì)處理新 topic,如果要分擔(dān)老 topic-partition 的壓力,需要手動(dòng)遷移 partition,這時(shí)會(huì)占用大量集群帶寬
消費(fèi)者新加入和退出會(huì)造成整個(gè)消費(fèi)組 rebalance:導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi),影響消費(fèi)速度,增加延遲
partition 過多會(huì)使得性能顯著下降:ZK 壓力大,broker 上 partition 過多讓磁盤順序?qū)憥缀跬嘶呻S機(jī)寫
高吞吐機(jī)制
順序存取
如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對(duì)應(yīng)柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過程相對(duì)內(nèi)存來說會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來的時(shí)間消耗,kafka 采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)。
頁緩存
即使是順序存取,但是頻繁的 I/O 操作仍然會(huì)造成磁盤的性能瓶頸,所以 kafka 使用了頁緩存和零拷貝技術(shù)。當(dāng)進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時(shí), 操作系統(tǒng)會(huì)先查看待讀取的數(shù)據(jù)是否在頁緩存中,如果存在則直接返回?cái)?shù)據(jù), 從而避免了對(duì)物理磁盤的 I/O 操作;
如果沒有命中, 則操作系統(tǒng)會(huì)向磁盤發(fā)起讀取請(qǐng)求并將讀取的數(shù)據(jù)頁存入頁緩存, 之后再將數(shù)據(jù)返回給進(jìn)程。一個(gè)進(jìn)程需要將數(shù)據(jù)寫入磁盤, 那么操作系統(tǒng)也會(huì)檢測數(shù)據(jù)對(duì)應(yīng)的頁是否在頁緩存中,如果不存在, 則會(huì)先在頁緩存中添加相應(yīng)的頁, 最后將數(shù)據(jù)寫入對(duì)應(yīng)的頁。被修改過后的頁也就變成了臟頁, 操作系統(tǒng)會(huì)在合適的時(shí)間把臟頁中的數(shù)據(jù)寫入磁盤, 以保持?jǐn)?shù)據(jù)的 一 致性。
Kafka 中大量使用了頁緩存, 這是 Kafka 實(shí)現(xiàn)高吞吐的重要因素之 一 。雖然消息都是先被寫入頁緩存,然后由操作系統(tǒng)負(fù)責(zé)具體的刷盤任務(wù)的, 但在 Kafka 中同樣提供了同步刷盤及間斷性強(qiáng)制刷盤(fsync),可以通過參數(shù)來控制。
同步刷盤能夠保證消息的可靠性,避免因?yàn)殄礄C(jī)導(dǎo)致頁緩存數(shù)據(jù)還未完成同步時(shí)造成的數(shù)據(jù)丟失。但是實(shí)際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會(huì)帶來性能的影響。
頁緩存的好處:
I/O Scheduler 會(huì)將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能;
I/O Scheduler 會(huì)嘗試將一些寫操作重新按順序排好,從而減少磁頭移動(dòng)時(shí)間;
充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存);
讀操作可以直接在 Page Cache 內(nèi)進(jìn)行,如果消費(fèi)和生產(chǎn)速度相當(dāng),甚至不需要通過物理磁盤交換數(shù)據(jù);
如果進(jìn)程重啟,JVM 內(nèi)的 Cache 會(huì)失效,但 Page Cache 仍然可用。
零拷貝
零拷貝技術(shù)可以減少 CPU 的上下文切換和數(shù)據(jù)拷貝次數(shù)。
常規(guī)方式
應(yīng)用程序一次常規(guī)的數(shù)據(jù)請(qǐng)求過程,發(fā)生了 4 次拷貝,2 次 DMA 和 2 次 CPU,而 CPU 發(fā)生了 4 次的切換。(DMA 簡單理解就是,在進(jìn)行 I/O 設(shè)備和內(nèi)存的數(shù)據(jù)傳輸?shù)臅r(shí)候,數(shù)據(jù)搬運(yùn)的工作全部交給 DMA 控制器,而 CPU 不再參與任何與數(shù)據(jù)搬運(yùn)相關(guān)的事情)
零拷貝的方式
通過零拷貝優(yōu)化,CPU 只發(fā)生了 2 次的上下文切換和 3 次數(shù)據(jù)拷貝。
批量發(fā)送
Kafka 允許進(jìn)行批量發(fā)送消息,先將消息緩存在內(nèi)存中,然后一次請(qǐng)求批量發(fā)送出去,這種策略將大大減少服務(wù)端的 I/O 次數(shù)。
數(shù)據(jù)壓縮
Kafka 還支持對(duì)消息集合進(jìn)行壓縮,Producer 可以通過 GZIP 或 Snappy 格式對(duì)消息集合進(jìn)行壓縮,Producer 壓縮之后,在 Consumer 需進(jìn)行解壓,雖然增加了 CPU 的工作,但在對(duì)大數(shù)據(jù)處理上,瓶頸在網(wǎng)絡(luò)上而不是 CPU,所以這個(gè)成本很值得。
高可用機(jī)制
副本
Producer 在發(fā)布消息到某個(gè) Partition 時(shí),先通過 ZooKeeper 找到該 Partition 的 Leader,然后無論該 Topic 的 Replication Factor 為多少,Producer 只將該消息發(fā)送到該 Partition 的 Leader。Leader 會(huì)將該消息寫入其本地 Log。
每個(gè) Follower 都從 Leader pull 數(shù)據(jù)。這種方式上,Follower 存儲(chǔ)的數(shù)據(jù)順序與 Leader 保持一致。Follower 在收到該消息后,向 Leader 發(fā)送 ACK, 并把消息寫入其 Log。一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,該消息就被認(rèn)為已經(jīng) commit 了,Leader 將增加 HW 并且向 Producer 發(fā)送 ACK。
為了提高性能,每個(gè) Follower 在接收到數(shù)據(jù)后就立馬向 Leader 發(fā)送 ACK,而非等到數(shù)據(jù)寫入 Log 中。因此,對(duì)于已經(jīng) commit 的消息,Kafka 只能保證它被存于多個(gè) Replica 的內(nèi)存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發(fā)生后該條消息一定能被 Consumer 消費(fèi)。Consumer 讀消息也是從 Leader 讀取,只有被 commit 過的消息才會(huì)暴露給 Consumer。Kafka Replication 的數(shù)據(jù)流如下圖所示:
對(duì)于 Kafka 而言,定義一個(gè) Broker 是否“活著”包含兩個(gè)條件:
一是它必須維護(hù)與 ZooKeeper 的 session(這個(gè)通過 ZooKeeper 的 Heartbeat 機(jī)制來實(shí)現(xiàn))。
二是 Follower 必須能夠及時(shí)將 Leader 的消息復(fù)制過來,不能“落后太多”。
Leader 會(huì)跟蹤與其保持同步的 Replica 列表,該列表稱為 ISR(即 in-sync Replica)。如果一個(gè) Follower 宕機(jī),或者落后太多,Leader 將把它從 ISR 中移除。這里所描述的“落后太多”指 Follower 復(fù)制的消息落后于 Leader 后的條數(shù)超過預(yù)定值或者 Follower 超過一定時(shí)間未向 Leader 發(fā)送 fetch 請(qǐng)求。Kafka 的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。
完全同步復(fù)制要求所有能工作的 Follower 都復(fù)制完,這條消息才會(huì)被認(rèn)為 commit,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是 Kafka 非常重要的一個(gè)特性)。異步復(fù)制方式下,Follower 異步的從 Leader 復(fù)制數(shù)據(jù),數(shù)據(jù)只要被 Leader 寫入 log 就被認(rèn)為已經(jīng) commit,這種情況下如果 Follower 都復(fù)制完都落后于 Leader,而如果 Leader 突然宕機(jī),則會(huì)丟失數(shù)據(jù)。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower 可以批量的從 Leader 復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫磁盤),極大減少了 Follower 與 Leader 的差距。
一條消息只有被 ISR 里的所有 Follower 都從 Leader 復(fù)制過去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了 Leader,還沒來得及被任何 Follower 復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer 無法消費(fèi)這些數(shù)據(jù))。而對(duì)于 Producer 而言,它可以選擇是否等待消息 commit。這種機(jī)制確保了只要 ISR 有一個(gè)或以上的 Follower,一條被 commit 的消息就不會(huì)丟失。
故障恢復(fù)
Leader 故障
leader 發(fā)生故障后,會(huì)從 ISR 中選出一個(gè)新的 leader,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)。注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
Kafka 在 ZooKeeper 中動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas),這個(gè) ISR 里的所有 Replica 都跟上了 leader,只有 ISR 里的成員才有被選為 Leader 的可能。在這種模式下,對(duì)于 f+1 個(gè) Replica,一個(gè) Partition 能在保證不丟失已經(jīng) commit 的消息的前提下容忍 f 個(gè) Replica 的失敗。
LEO:每個(gè)副本最大的 offset。
HW:消費(fèi)者能見到的最大的 offset,ISR 隊(duì)列中最小的 LEO。
Follower 故障
follower 發(fā)生故障后會(huì)被臨時(shí)踢出 ISR 集合,待該 follower 恢復(fù)后,follower 會(huì) 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 leader 進(jìn)行同步數(shù)據(jù)操作。等該 follower 的 LEO 大于等于該 partition 的 HW,即 follower 追上 leader 后,就可以重新加入 ISR 了。
擴(kuò)展性
由于 Broker 存儲(chǔ)著特定分區(qū)的數(shù)據(jù), 因此,不管是 Broker 還是分區(qū)的擴(kuò)縮容,都是比較復(fù)雜的,屬于典型的“有狀態(tài)服務(wù)”擴(kuò)縮容問題。接下來,我們看一下 Pulsar 是怎么針對(duì) kafka 的不足進(jìn)行優(yōu)化的。
四、Pulsar
Apache Pulsar 是 Apache 軟件基金會(huì)頂級(jí)項(xiàng)目,是下一代云原生分布式消息流平臺(tái),集消息、存儲(chǔ)、輕量化函數(shù)式計(jì)算為一體。采用計(jì)算與存儲(chǔ)分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲(chǔ)特性。在消息領(lǐng)域,Pulsar 是第一個(gè)將存儲(chǔ)計(jì)算分離云原生架構(gòu)落地的開源項(xiàng)目。
服務(wù)和存儲(chǔ)分離
在 kafka 的基礎(chǔ)上,把數(shù)據(jù)存儲(chǔ)功能從 Broker 中分離出來,Broker 僅面向生產(chǎn)者、消費(fèi)者提供數(shù)據(jù)讀寫能力,但其自身并不存儲(chǔ)數(shù)據(jù)。而在 Broker 層下面使用 Bookie 作為存儲(chǔ)層,承擔(dān)具體的數(shù)據(jù)存儲(chǔ)職責(zé)。在 Pulsar 中,broker 的含義和 kafka 中的 broker 是一致的,就是一個(gè)運(yùn)行的 Pulsar 實(shí)例, 提供多個(gè)分區(qū)的讀寫服務(wù)。由于 broker 層不在承擔(dān)數(shù)據(jù)存儲(chǔ)職責(zé),使得 Broker 層成為無狀態(tài)服務(wù)。這樣一來,Broker 的擴(kuò)縮容就變得非常簡單。
相比之下,服務(wù)存儲(chǔ)集于一體的 Kafka 就非常難以擴(kuò)容。
Broker 和 Bookie 相互獨(dú)立,方便實(shí)現(xiàn)獨(dú)立的擴(kuò)展以及獨(dú)立的容錯(cuò)
Broker 無狀態(tài),便于快速上、下線,更加適合于云原生場景
分區(qū)存儲(chǔ)不受限于單個(gè)節(jié)點(diǎn)存儲(chǔ)容量
Bookie 數(shù)據(jù)分布均勻
分片存儲(chǔ)
1.在 Kafka 分區(qū)(Partition)概念的基礎(chǔ)上,按照時(shí)間或大小,把分區(qū)切分成分片(Segment)。
2.同一個(gè)分區(qū)的分片,分散存儲(chǔ)在集群中所有的 Bookie 節(jié)點(diǎn)上。
3.同一個(gè)分片,擁有多個(gè)副本,副本數(shù)量可以指定,存儲(chǔ)于不同的 Bookie 節(jié)點(diǎn)。
Pulsar 性能
和 Kafka 一樣,Pulsar 也使用了順序讀寫和零拷貝等技術(shù)來提高系統(tǒng)的性能。
此外,Pulsar 還設(shè)計(jì)了分層緩存機(jī)制,在服務(wù)層和存儲(chǔ)層都做了分層緩存,來提高性能。
生產(chǎn)者發(fā)送消息時(shí),調(diào)用 Bookie 層寫入消息時(shí),同時(shí)將消息寫入 broker 緩存中。
實(shí)時(shí)消費(fèi)時(shí)(追尾讀),首先從 broker 緩存中讀取數(shù)據(jù),避免從持久層 bookie 中讀取,從而降低投遞延遲。
讀取歷史消息(追趕讀)場景中,bookie 會(huì)將磁盤消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤數(shù)據(jù),降低讀取延時(shí)。
Pulsar 擴(kuò)展性
分片存儲(chǔ)解決了分區(qū)容量受單節(jié)點(diǎn)存儲(chǔ)空間限制的問題,當(dāng)容量不夠時(shí),可以通過擴(kuò)容 Bookie 節(jié)點(diǎn)的方式支撐更多的分區(qū)數(shù)據(jù),也解決了分區(qū)數(shù)據(jù)傾斜問題,數(shù)據(jù)可以均勻的分配在 Bookie 節(jié)點(diǎn)上。
Broker 和 Bookie 靈活的容錯(cuò)以及無縫的擴(kuò)容能力讓 Apache Pulsar 具備非常高的可用性,實(shí)現(xiàn)了無限制的分區(qū)存儲(chǔ)。
Broker 擴(kuò)展
在 Pulsar 中 Broker 是無狀態(tài)的,可以通過增加節(jié)點(diǎn)的方式實(shí)現(xiàn)快速擴(kuò)容。當(dāng)需要支持更多的消費(fèi)者或生產(chǎn)者時(shí),可以簡單地添加更多的 Broker 節(jié)點(diǎn)來滿足業(yè)務(wù)需求。Pulsar 支持自動(dòng)的分區(qū)負(fù)載均衡,在 Broker 節(jié)點(diǎn)的資源使用率達(dá)到閾值時(shí),會(huì)將負(fù)載遷移到負(fù)載較低的 Broker 節(jié)點(diǎn)。新增 Broker 節(jié)點(diǎn)時(shí),分區(qū)也將在 Brokers 中做平衡遷移,一些分區(qū)的所有權(quán)會(huì)轉(zhuǎn)移到新的 Broker 節(jié)點(diǎn)。
Bookie 擴(kuò)展
存儲(chǔ)層的擴(kuò)容,通過增加 Bookie 節(jié)點(diǎn)來實(shí)現(xiàn)。通過資源感知和數(shù)據(jù)放置策略,流量將自動(dòng)切換到新的 Apache Bookie 中,整個(gè)過程不會(huì)涉及到不必要的數(shù)據(jù)搬遷。即擴(kuò)容時(shí),不會(huì)將舊數(shù)據(jù)從現(xiàn)有存儲(chǔ)節(jié)點(diǎn)重新復(fù)制到新存儲(chǔ)節(jié)點(diǎn)。
如圖所示,起始狀態(tài)有四個(gè)存儲(chǔ)節(jié)點(diǎn),Bookie1, Bookie2, Bookie3, Bookie4,以 Topic1-Part2 為例,當(dāng)這個(gè)分區(qū)的最新的存儲(chǔ)分片是 SegmentX 時(shí),對(duì)存儲(chǔ)層擴(kuò)容,添加了新的 Bookie 節(jié)點(diǎn),BookieX,BookieY,那么當(dāng)存儲(chǔ)分片滾動(dòng)之后,新生成的存儲(chǔ)分片, SegmentX+1,SegmentX+2,會(huì)優(yōu)先選擇新的 Bookie 節(jié)點(diǎn)(BookieX,BookieY)來保存數(shù)據(jù)。
Pulsar 可用性
Broker 容錯(cuò)
如下圖,假設(shè)當(dāng)存儲(chǔ)分片滾動(dòng)到 SegmentX 時(shí),Broker2 節(jié)點(diǎn)失敗。此時(shí)生產(chǎn)者和消費(fèi)者向其他的 Broker 發(fā)起請(qǐng)求,這個(gè)過程會(huì)觸發(fā)分區(qū)的所有權(quán)轉(zhuǎn)移,即將 Broker2 擁有的分區(qū) Topic1-Part2 的所有權(quán)轉(zhuǎn)移到其他的 Broker(Broker3)。
由于數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)服務(wù)分離,所以新 Broker 接管分區(qū)的所有權(quán)時(shí),它不需要復(fù)制 Partiton 的數(shù)據(jù)。新的分區(qū) Owner(Broker3)會(huì)產(chǎn)生一個(gè)新的分片 SegmentX+1, 如果有新數(shù)據(jù)到來,會(huì)存儲(chǔ)在新的分片 Segment x+1 上,不會(huì)影響分區(qū)的可用性。
即當(dāng)某個(gè) Broker 實(shí)例故障時(shí),整個(gè)集群的消息存儲(chǔ)能力仍然完好。此時(shí),集群只是喪失了特定分區(qū)的消息服務(wù),只需要把這些分區(qū)的服務(wù)權(quán)限分配給其他 Broker 即可。
注意,和 Kafka 一樣, Pulsar 的一個(gè)分區(qū)仍然只能由一個(gè) Broker 提供服務(wù),否則無法保證消息的分區(qū)有序性。
Bookie 容錯(cuò)
如下圖,假設(shè) Bookie 2 上的 Segment 4 損壞。Bookie Auditor 會(huì)檢測到這個(gè)錯(cuò)誤并進(jìn)行復(fù)制修復(fù)。Bookie 中的副本修復(fù)是 Segment 級(jí)別的多對(duì)多快速修復(fù),BookKeeper 可以從 Bookie 3 和 Bookie 4 讀取 Segment 4 中的消息,并在 Bookie 1 處修復(fù) Segment 4。如果是 Bookie 節(jié)點(diǎn)故障,這個(gè) Bookie 節(jié)點(diǎn)上所有的 Segment 會(huì)按照上述方式復(fù)制到其他的 Bookie 節(jié)點(diǎn)。
所有的副本修復(fù)都在后臺(tái)進(jìn)行,對(duì) Broker 和應(yīng)用透明,Broker 會(huì)產(chǎn)生新的 Segment 來處理寫入請(qǐng)求,不會(huì)影響分區(qū)的可用性。
Pulsar 其他特性
基于上述的設(shè)計(jì)特點(diǎn),Pulsar 提供了很多特性。
讀寫分離
Pulsar 另外一個(gè)有吸引力的特性是提供了讀寫分離的能力,讀寫分離保證了在有大量滯后消費(fèi)(磁盤 IO 會(huì)增加)時(shí),不會(huì)影響服務(wù)的正常運(yùn)行,尤其是不會(huì)影響到數(shù)據(jù)的寫入。讀寫分離的能力由 Bookie 提供,簡單說一下 Bookie 存儲(chǔ)涉及到的概念:
Journals:Journal 文件包含了 Bookie 事務(wù)日志,在 Ledger (可以認(rèn)為是分片的一部分) 更新之前,Journal 保證描述更新的事務(wù)寫入到 Non-volatile 的存儲(chǔ)介質(zhì)上;
Entry logger:Entry 日志文件管理寫入的 Entry,來自不同 ledger 的 entry 會(huì)被聚合然后順序?qū)懭?#xff1b;
Index files:每個(gè) Ledger 都有一個(gè)對(duì)應(yīng)的索引文件,記錄數(shù)據(jù)在 Entry 日志文件中的 Offset 信息。
Entry 的讀寫入過程下圖所示,數(shù)據(jù)的寫入流程:
數(shù)據(jù)首先會(huì)寫入 Journal,寫入 Journal 的數(shù)據(jù)會(huì)實(shí)時(shí)落到磁盤;
然后,數(shù)據(jù)寫入到 Memtable ,Memtable 是讀寫緩存;
寫入 Memtable 之后,對(duì)寫入請(qǐng)求進(jìn)行響應(yīng);
Memtable 寫滿之后,會(huì) Flush 到 Entry Logger 和 Index cache,Entry Logger 中保存了數(shù)據(jù),Index cache 保存了數(shù)據(jù)的索引信息,然后由后臺(tái)線程將 Entry Logger 和 Index cache 數(shù)據(jù)落到磁盤。
數(shù)據(jù)的讀取流程:
如果是 Tailing read 請(qǐng)求,直接從 Memtable 中讀取 Entry;
如果是 Catch-up read(滯后消費(fèi))請(qǐng)求,先讀取 Index 信息,然后索引從 Entry Logger 文件讀取 Entry。
一般在進(jìn)行 Bookie 的配置時(shí),會(huì)將 Journal 和 Ledger 存儲(chǔ)磁盤進(jìn)行隔離,減少 Ledger 對(duì)于 Journal 寫入的影響,并且推薦 Journal 使用性能較好的 SSD 磁盤,讀寫分離主要體現(xiàn)在:
寫入 Entry 時(shí),Journal 中的數(shù)據(jù)需要實(shí)時(shí)寫到磁盤,Ledger 的數(shù)據(jù)不需要實(shí)時(shí)落盤,通過后臺(tái)線程批量落盤,因此寫入的性能主要受到 Journal 磁盤的影響;
讀取 Entry 時(shí),首先從 Memtable 讀取,命中則返回;如果不命中,再從 Ledger 磁盤中讀取,所以對(duì)于 Catch-up read 的場景,讀取數(shù)據(jù)會(huì)影響 Ledger 磁盤的 IO,對(duì) Journal 磁盤沒有影響,也就不會(huì)影響到數(shù)據(jù)的寫入。
所以,數(shù)據(jù)寫入是主要是受 Journal 磁盤的負(fù)載影響,不會(huì)受 Ledger 磁盤的影響。另外,Segment 存儲(chǔ)的多個(gè)副本都可以提供讀取服務(wù),相比于主從副本的設(shè)計(jì),Apache Pulsar 可以提供更好的數(shù)據(jù)讀取能力。
通過以上分析,Apache Pulsar 使用 Apache BookKeeper 作為數(shù)據(jù)存儲(chǔ),可以帶來下列的收益:
支持將多個(gè) Ledger 的數(shù)據(jù)寫入到同一個(gè) Entry logger 文件,可以避免分區(qū)膨脹帶來的性能下降問題
支持讀寫分離,可以在滯后消費(fèi)場景導(dǎo)致磁盤 IO 上升時(shí),保證數(shù)據(jù)寫入的不受影響
支持全副本讀取,可以充分利用存儲(chǔ)副本的數(shù)據(jù)讀取能力
多種消費(fèi)模型
Pulsar 提供了多種訂閱方式來消費(fèi)消息,分為三種類型:獨(dú)占(Exclusive),故障切換(Failover)或共享(Share)。
Exclusive 獨(dú)占訂閱 :在任何時(shí)間,一個(gè)消費(fèi)者組(訂閱)中有且只有一個(gè)消費(fèi)者來消費(fèi) Topic 中的消息。
Failover 故障切換:多個(gè)消費(fèi)者(Consumer)可以附加到同一訂閱。但是,一個(gè)訂閱中的所有消費(fèi)者,只會(huì)有一個(gè)消費(fèi)者被選為該訂閱的主消費(fèi)者。其他消費(fèi)者將被指定為故障轉(zhuǎn)移消費(fèi)者。當(dāng)主消費(fèi)者斷開連接時(shí),分區(qū)將被重新分配給其中一個(gè)故障轉(zhuǎn)移消費(fèi)者,而新分配的消費(fèi)者將成為新的主消費(fèi)者。發(fā)生這種情況時(shí),所有未確認(rèn)(ack)的消息都將傳遞給新的主消費(fèi)者。
Share 共享訂閱:使用共享訂閱,在同一個(gè)訂閱背后,用戶按照應(yīng)用的需求掛載任意多的消費(fèi)者。訂閱中的所有消息以循環(huán)分發(fā)形式發(fā)送給訂閱背后的多個(gè)消費(fèi)者,并且一個(gè)消息僅傳遞給一個(gè)消費(fèi)者。
當(dāng)消費(fèi)者斷開連接時(shí),所有傳遞給它但是未被確認(rèn)(ack)的消息將被重新分配和組織,以便發(fā)送給該訂閱上剩余的剩余消費(fèi)者。
多種 ACK 模型
消息確認(rèn)(ACK)的目的就是保證當(dāng)發(fā)生故障后,消費(fèi)者能夠從上一次停止的地方恢復(fù)消費(fèi),保證既不會(huì)丟失消息,也不會(huì)重復(fù)處理已經(jīng)確認(rèn)(ACK)的消息。在 Pulsar 中,每個(gè)訂閱中都使用一個(gè)專門的數(shù)據(jù)結(jié)構(gòu)–游標(biāo)(Cursor)來跟蹤訂閱中的每條消息的確認(rèn)(ACK)狀態(tài)。每當(dāng)消費(fèi)者在分區(qū)上確認(rèn)消息時(shí),游標(biāo)都會(huì)更新。
Pulsar 提供兩種消息確認(rèn)方法:
單條確認(rèn)(Individual Ack),單獨(dú)確認(rèn)一條消息。被確認(rèn)后的消息將不會(huì)被重新傳遞
累積確認(rèn)(Cumulative Ack),通過累積確認(rèn),消費(fèi)者只需要確認(rèn)它收到的最后一條消息
上圖說明了單條確認(rèn)和累積確認(rèn)的差異(灰色框中的消息被確認(rèn)并且不會(huì)被重新傳遞)。對(duì)于累計(jì)確認(rèn),M12 之前的消息被標(biāo)記為 Acked。對(duì)于單獨(dú)進(jìn)行 ACK,僅確認(rèn)消息 M7 和 M12, 在消費(fèi)者失敗的情況下,除了 M7 和 M12 之外,其他所有消息將被重新傳送。
直播精彩分享
- END -
看完一鍵三連在看,轉(zhuǎn)發(fā),點(diǎn)贊
是對(duì)文章最大的贊賞,極客重生感謝你
推薦閱讀
深入理解虛擬化
深入理解零拷貝技術(shù)
后端技術(shù)趨勢(shì)指南|如何選擇自己的技術(shù)方向
總結(jié)
以上是生活随笔為你收集整理的深入理解分布式消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解虚拟化
- 下一篇: 今年你参与开源了吗?