新一代消息队列 Pulsar
作者:joylei,騰訊 PCG 后臺開發工程師
在信息流場景,內容的請求處理、原子模塊調度、結果的分發等至關重要,直接影響到內容的外顯、推薦、排序等。基于消息 100% 成功的要求,我們團隊對 Pulsar 進行了調研,并采用騰訊云的 TDMQ(Pulsar 版)實現消息的可靠處理。本文主要參考 Pulsar 的官方文檔和技術文章,對 Pulsar 的特性、機制、原理等進行整理總結。后續我們團隊計劃產出多篇文章,重點聚焦分析 Pulsar 與其他消息隊列(Kafka、RocketMQ 等) 的調度和寫盤等,以及 Pulsar 在信息流內容鏈路場景的使用實踐。
1. Pulsar 概述
Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性,被看作是云原生時代實時消息流傳輸、存儲和計算最佳解決方案。
Pulsar 是一個 pub-sub (發布-訂閱)模型的消息隊列系統。
1.1. Pulsar 架構
Pulsar 由 Producer、Consumer、多個 Broker 、一個 BookKeeper 集群、一個 Zookeeper 集群構成,具體如下圖所示。
Producer:數據生成者,即發送消息的一方。生產者負責創建消息,將其投遞到 Pulsar 中。
Consumer:數據消費者,即接收消息的一方。消費者連接到 Pulsar 并接收消息,進行相應的業務處理。
Broker:無狀態的服務層,負責接收消息、傳遞消息、集群負載均衡等操作,Broker 不會持久化保存元數據。
BookKeeper:有狀態的持久層,負責持久化地存儲消息。
ZooKeeper:存儲 Pulsar 、 BookKeeper 的元數據,集群配置等信息,負責集群間的協調(例如:Topic 與 Broker 的關系)、服務發現等。
從 Pulsar 的架構圖上可以看出, Pulsar 在架構設計上采用了計算與存儲分離的模式,發布/訂閱相關的計算邏輯在 Broker 上完成,而數據的持久化存儲交由 BookKeeper 去實現。
1.1.1. Broker 擴展
在 Pulsar 中 Broker 是無狀態的,當需要支持更多的消費者或生產者時,可以簡單地添加更多的 Broker 節點來滿足業務需求。Pulsar 支持自動的分區負載均衡,在 Broker 節點的資源使用率達到閾值時,會將負載遷移到負載較低的 Broker 節點,這個過程中分區也將在多個 Broker 節點中做平衡遷移,一些分區的所有權會轉移到新的 Broker 節點。在后面 Bundle 小節會具體介紹這部分的實現。
1.1.2. Bookie 擴展
存儲層的擴容,通過增加 Bookie 節點來實現。在 BooKie 擴容的階段,由于分片機制,整個過程不會涉及到不必要的數據搬遷,即不需要將舊數據從現有存儲節點重新復制到新存儲節點。在后續的 Bookkeeper 小節中會具體介紹。
1.2. Topic
和其他消息隊列類似,Pulsar 中也有 Topic。Topic 即在生產者與消費者中傳輸消息的通道。消息可以以 Topic 為單位進行歸類,生產者負責將消息發送到特定的 Topic,而消費者指定特定的 Topic 進行消費。
1.2.1. 分區 Topic(Topic-Partition)
Pulsar 的 Topic 可以分為非分區 Topic 和分區 Topic 。
普通的 Topic 僅僅被保存在單個 Broker 中,這限制了 Topic 的最大吞吐量。分區 Topic 是一種特殊類型的主題,支持被多個 Broker 處理,從而實現更高的吞吐量。
針對一個 Topic ,可以設置多個 Topic 分區來提高 Topic 的吞吐量。每個 Topic Partition 由 Pulsar 分配給某個 Broker ,該 Broker 稱為該 Topic Partition 的所有者。生成者和消費者會與每個 Topic 分區的 Broker 創建鏈接,發送消息并消費消息。
如下圖所示, Topic1 有 Partition1、 Partition2、 Partition3、 Partition4 、 Partition5 五個分區, Partition1 和 Partition4 由 Broker1 處理, Partition2 和 Partition5 由 Broker2 處理, Partition3 由 Broker3 處理。
從 Pulsar 社區版的 golang-sdk 可以看出,客戶端的 Producer 和 Consumer 在初始化的時候,都會與每一個 Topic-Partition 創建鏈接,并且會監聽是否有新的 Partition,以創建新的連接。
1.2.2. 非持久 topic
默認情況下, Pulsar 會保存所有沒確認的消息到 BookKeeper 中。持久 Topic 的消息在 Broker 重啟或者 Consumer 出現問題時保存下來。
除了持久 Topic , Pulsar 也支持非持久 Topic 。這些 Topic 的消息只存在于內存中,不會存儲到磁盤。
因為 Broker 不會對消息進行持久化存儲,當 Producer 將消息發送到 Broker 時, Broker 可以立即將 ack 返回給 Producer ,所以非持久 Topic 的消息傳遞會比持久 Topic 的消息傳遞更快一些。相對的,當 Broker 因為一些原因宕機、重啟后,非持久 Topic 的消息都會消失,訂閱者將無法收到這些消息。
1.2.3. 重試 topic
由于業務邏輯處理出現異常,消息一般需要被重新消費。Pulsar 支持生產者同時將消息發送到普通的 Topic 和重試 Topic ,并指定允許延時和最大重試次數。當配置了允許消費者自動重試時,如果消息沒有被消費成功,會被保存到重試 Topic 中,并在指定延時時間后,重新被消費。
1.2.4. 死信 topic
當 Consumer 消費消息出錯時,可以通過配置重試 Topic 對消息進行重試,但是,如果當消息超過了最大的重試次數仍處理失敗時,該怎么辦呢?Pulsar 提供了死信 Topic ,通過配置 deadLetterTopic,當消息達到最大重試次數的時候, Pulsar 會將消息推送到死信 Topic 中進行保存。
1.3. 訂閱(subscription)
通過訂閱的方式,我們可以指定消息如何投遞給消費者。
1.3.1. 訂閱類型(Subscription type)
Pulsar 支持獨占(Exclusive)、災備(Failover)、共享(Shared)、Key_Shared 這四種訂閱類型。
獨占(Exclusive)SinglePartition
Exclusive 下,只允許 Subscription 存在一個消費者,如果多個消費者使用同一個訂閱名稱去訂閱同一個 Topic ,則會報錯。
如下圖,只有 Consumer A-0 可以消費數據。
災備(Failover)
Failover 下,一個 Subscription 中可以有多個消費者,但只有 Master Consumer 可以消費數據。當 Master Consumer 斷開連接時,消息會由下一個被選中的 Consumer 進行消費。
如下圖,Consumer-B-0 是 Master Consumer 。當 Consumer -B-0 發生問題與 Broker 斷開連接時, Consumer-B-1 將成為下一個 Master Consumer 來消費數據。
分區 Topic:Broker 會按照消費者的優先級和消費名的順序對消費者進行排序,將 Topic 均勻地分配給優先級最高的消費者。
非分區 Topic:Broker 會根據消費者訂閱的非分區 Topic 的時間順序選擇消費者。
共享(Shared)
Shared 中,多個消費者可以綁定到同一個 Subscription 上。消息通過 round robin 即輪詢機制分發給不同的消費者,并且每個消息僅會被分發給一個消費者。當消費者斷開連接,所有被發送給消費者但沒有被確認的消息將被重新處理,分發給其它存活的消費者。
如下圖, Consumer-C-1 、 Consumer-C-2 、 Consumer-C-3 都可以訂閱 Topic 消費數據。
Key_Shared
Key_Shared 中,多個 Consumer 可以綁定到同一個 Subscription 。消息在傳遞給 Consumer 時,具有相同鍵的消息只會傳遞給同一個 Consumer 。
1.3.2. 訂閱模式(Subscription modes)
訂閱模式有持久化和非持久化兩種。訂閱模式取決于游標(cursor)的類型。
創建訂閱時,將創建一個相關的游標來記錄最后使用的位置。當訂閱的 consumer 重新啟動時,它可以從它所消費的最后一條消息繼續消費。
Durable (持久訂閱):游標是持久性的,會保留消息并保持游標記錄的位置。當 Broker 重新啟動時,可以從 BookKeeper 中恢復游標,消息可以從游標上次記錄的位置繼續消費。默認情況下,都是持久化訂閱。
NonDurable(非持久訂閱):游標不是持久性的,當 Broker 宕機時,游標會丟失并無法恢復,所以消息無法繼續從上次消費的位置開始繼續消費。
一個訂閱可以有一個或多個消費者。當使用者訂閱主題時,它必須指定訂閱名稱。持久訂閱和非持久訂閱可以具有相同的名稱,它們彼此獨立。如果使用者指定了以前不存在的訂閱,則會自動創建訂閱。
默認情況下,沒有任何持久訂閱的 Topic 的消息將被標記為已刪除。如果要防止消息被標記為已刪除,可以為此 Topic 創建持久訂閱。在這種情況下,只有被確認的消息才會被標記為已刪除。
1.3.3. 多主題訂閱
當 Consumer 訂閱 Topic 時,默認指定訂閱一個主題。從 Pulsar 的 1.23.0-incubating 的版本開始, Pulsar 消費者可以同時訂閱多個 Topic ??梢酝ㄟ^兩種方式進行訂閱:
正則表達式,例如:persistent://public/default/finance-.*
明確指定 Topic 列表
2. Pulsar 生產者 (Producer)
Producer 是連接 topic 的程序,它將消息發布到一個 Pulsar broker 上。
2.1. 訪問模式
消息生成者有多種模式訪問 Topic ,可以使用以下幾種方式將消息發送到 Topic 。
Shared:默認情況下,多個生成者可以將消息發送到同一個 Topic。
Exclusive:在這種模式下,只有一個生產者可以將消息發送到 Topic ,當其他生產者嘗試發送消息到這個 Topic 時,會發生錯誤。只有獨占 Topic 的生產者發生宕機時(Network Partition )該生產者會被驅逐,新的生產者才能產生并向 Topic 發送消息。
WaitForExclusive:在這種模式下,只有一個生產者可以將消息發送到 Topic 。當已有生成者和 Topic 建立連接時,其他生產者的創建會被掛起而不會產生錯誤。如果想要采用領導者選舉機制來選擇消費者的話,可以采用這種模式。
2.2. 路由模式
當將消息發送到分區 Topic 時,需要指定消息的路由模式,這決定了消息將會被發送到哪個分區 Topic 。Pulsar 有以下三種消息路由模式,RoundRobinPartition 為默認路由模式。
RoundRobinPartition:如果消息沒有指定 key,為了達到最大吞吐量,生產者會以 round-robin (輪詢)方式將消息發布到所有分區。請注意 round-robin 并不是作用于每條單獨的消息,而是作用于延遲處理的批次邊界,以確保批處理有效。如果消息指定了 key,分區生產者會根據 key 的 hash 值將該消息分配到對應的分區。這是默認的模式。
SinglePartition:如果消息沒有指定 key,生產者將會隨機選擇一個分區,并發布所有消息到這個分區。如果消息指定了 key,分區生產者會根據 key 的 hash 值將該消息分配到對應的分區。
CustomPartition:自定義模式,用戶可以創建自定義路由模式,通過實現 MessageRouter 接口實現自定義路由規則。
2.3. 批量處理
Pulsar 支持對消息進行批量處理。批量處理啟用后, Producer 會在一次請求中累積并發送一批消息。批量處理時的消息數量取決于最大消息數(單次批量處理請求可以發送的最大消息數)和最大發布延遲(單個請求的最大發布延遲時間)決定。開啟批量處理后,積壓的數量是批量處理的請求總數,而不是消息總數。
2.3.1. 索引確認機制
通常情況下,只有 Consumer 確認了批量請求中的所有消息,這個批量請求才會被認定為已處理。當這批消息沒有全部被確認的情況下,發生故障時,會導致一些已確認的消息被重復確認。
為了避免 Consumer 重復消費已確認的消息, Pulsar 從 Pulsar 2.6.0 開始采用批量索引確認機制。如果啟用批量索引確認機制, Consumer 將篩選出已被確認的批量索引,并將批量索引確認請求發送給 Broker 。Broker 維護批量索引的確認狀態并跟蹤每批索引的確認狀態,以避免向 Consumer 發送已確認的消息。當該批信息的所有索引都被確認后,該批信息將被刪除。
默認情況下,索引確認機制處于關閉狀態。開啟索引確認機制將產生導致更多內存開銷。
2.3.2. key-based batching
key_shared 模式下,Broker 會根據消息的 key 來分發消息,但默認的批量處理模式,無法保證將所有的相同的 key 都打包到同一批中,而且 Consumer 在接收到批數據時,會默認把第一個消息的 key 當作這批消息的 key ,這會導致消息的錯亂。因此 key_shared 模式下,不支持默認的批量處理。
key-based batching 能夠確保 Producer 在打包消息時,將相同 key 的消息打包到同一批中,從而 consumer 在消費的時候,也能夠消費到指定 key 的批數據。
沒有指定 key 的消息在打包成批后,這一批數據也是沒有 key 的, Broker 在分發這批消息時,會使用 NON_KEY 作為這批消息的 key 。
2.4. 消息分塊
啟用分塊后,如果消息大小超過允許發送的最大消息大小時, Producer 會將原始消息分割成多個分塊消息,并將分塊消息與消息的元數據按順序發送到 Broker。
在 Broker 中,分塊消息會和普通消息以相同的方式存儲在 Ledger 中。唯一的區別是, Consumer 需要緩存分塊消息,并在接收到所有的分塊消息后將其合并成真正的消息。如果 Producer 不能及時發布消息的所有分塊, Consumer 不能在消息的過期時間內接收到所有的分塊,那么 Consumer 已接收到的分塊消息就會過期。
Consumer 會將分塊的消息拼接在一起,并將它們放入接收器隊列中。客戶端從接收器隊列中消費消息。當 Consumer 消費到原始的大消息并確認后, Consumer 就會發送與該大消息關聯的所有分塊消息的確認。
2.4.1. 處理一個 producer 和一個訂閱 consumer 的分塊消息
如下圖所示,當生產者向主題發送一批大的分塊消息和普通的非分塊消息時。假設生產者發送的消息為 M1,M1 有三個分塊 M1-C1,M1-C2 和 M1-C3。這個 Broker 在其管理的 Ledger 里面保存所有的三個塊消息,然后以相同的順序分發給消費者(獨占/災備模式)。消費者將在內存緩存所有的塊消息,直到收到所有的消息塊。將這些消息合并成為原始的消息 M1,發送給處理進程。
2.4.2. 多個生產者和一個生產者處理塊消息
當多個生產者發布塊消息到單個主題,這個 Broker 在同一個 Ledger 里面保存來自不同生產者的所有塊消息。如下所示,生產者 1 發布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三個塊組成。生產者 2 發布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三個塊組成。這些特定消息的所有分塊是順序排列的,但是其在 Ledger 里面可能不是連續的。這種方式會給消費者帶來一定的內存負擔。因為消費者會為每個大消息在內存開辟一塊緩沖區,以便將所有的塊消息合并為原始的大消息。
3. Pulsar 消費者 (Consumer)
Consumer 是通過訂閱關系連接 Topic ,接收消息的程序。
Consumer 向 Broker 發送 flow permit request 以獲取消息。在 Consumer 端有一個隊列,用于接收從 Broker 推送來的消息。
3.1. 消息確認
Pulsar 提供兩種確認模式:
累積確認:消費者只需要確認最后一條收到的消息,在此之前的消息,都不會被再次發送給消費者。
單條確認:消費者需要確認每條消息并發送 ack 給 Broker 。
如圖,上方為累積確認模式,當消費者發送 M12 的確認消息給 Broker 后, Broker 會把 M12 之前的消息和 M12 一樣都標記為已確認。下方為單條確認模式,當消費者發送 M7 的確認消息給 Broker 后, Broker 會把 M7 這條消息標記為已確認。當消費者發送 M12 的確認消息給 Broker 后, Broker 會把 M12 這條消息標記為已確認。
需要注意的是,訂閱模式中的 shared 模式是不支持累積確認的。因為該訂閱模式下的每個消費者都能消費數據,無法保證單個消費者的消費消息的時序和順序。
3.1.1. AcknowledgmentsGroupingTracker
消息的單條確認和累積確認并不是直接發送確認請求給 Broker,而是把請求轉交給 AcknowledgmentsGroupingTracker 處理。
為了保證消息確認的性能,并避免 Broker 接收到非常高并發的 ack 請求,Tracker 默認支持批量確認,即使是單條消息的確認,也會先進入隊列,然后再一批發往 Broker。在創建 consumer 的時候,可以設置 acknowledgementGroupTimeMicros,默認情況下,每 100ms 或者堆積超過 1000 時,AcknowledgmentsGroupingTracker 會發送一批確認請求。如果設置為 0,則每次確認消息后,Consumer 都會立即發送確認請求。
3.2. 取消確認
當 Consumer 無法處理一條消息并想重新消費時, Consumer 可以發送一個取消確認的消息給 Broker , Broker 會重新將這條消息發送給 Consumer 。如果啟用了批量處理,那這一批中的所有消息都會重新發送給消費者。
消息取消確認也有單條取消模式和累積取消模式 ,取決于消費者使用的訂閱模式。
在 Exclusive 模式和 Failover 訂閱模式中,消費者僅僅只能對收到的最后一條消息進行取消確認。
在 Shared 和 Key_Shared 的訂閱類型中,消費者可以單獨否定確認消息。
如果啟用了批量處理,那這一批中的所有消息都會重新發送給消費者。
3.2.1. NegativeAcksTracker
取消確認和其他消息確認一樣,不會立即請求 Broker,而是把請求轉交給 NegativeAcksTracker 進行處理。Tracker 中記錄著每條消息以及需要延遲的時間。Tracker 默認是 33ms 左右一個時間刻度進行檢查,默認延遲時間是 1 分鐘,抽取出已經到期的消息并觸發重新投遞。Tracker 存在的意義是為了合并請求。另外如果延遲時間還沒到,消息會暫存在內存,如果業務側有大量的消息需要延遲消費,還是建議使用 reconsumeLater 接口。NegativeAck 唯一的好處是不需要每條消息都指定時間,可以全局設置延遲時間。
3.3. redelivery backoff 機制
通常情況下可以使用取消確認來達到處理消息失敗后重新處理消息的目的,但通過 redelivery backoff 可以更好的實現這種目的??梢酝ㄟ^指定消息重試的次數、消息重發的延遲來重新消費處理失敗的消息。
3.4. 確認超時
除了取消確認和 redelivery backoff 機制外,還可以通過開啟自動重傳遞機制來處理未確認的消息。啟用自動重傳遞后,client 會在 ackTimeout 時間內跟蹤未確認的消息,并在消息確認超時后自動向代理重新發送未確認的消息請求。
如果開啟了批量處理,那這批消息都會重新發送給 Consumer 。
與確認超時相比,取消確認會更合適。因為取消確認能更精確地控制單個消息的再交付,并避免在消息處理時引起的超過確認超時而導致無效的再重傳。
3.5. 消息預拉取
Consumer 客戶端 SDK 會默認預先拉取消息到 Consumer 本地,Broker 側會把這些已經推送到 Consumer 本地的消息記錄為 pendingAck,這些消息既不會再投遞給別的消費者,也不會 ack 超時,除非當前 Consumer 被關閉,消息才會被重新投遞。Broker 側有一個 RedeliveryTracker 接口,這個 Tracker 會記錄消息到底被重新投遞了多少次,每條消息推送給消費者時,會先從 Tracker 的哈希表中查詢一下重投遞的次數,和消息一并推送給消費者。
3.6. 未確認的消息處理
如果消息被消費者消費后一直沒有確認怎么辦?
unAckedMessageTracker 中維護了一個時間輪,時間輪的刻度根據 ackTimeout 、tickDurationInMs 這兩個參數生成,每個刻度時間= ackTimeout / tickDurationInMs。新追蹤的消息會放入最后一個刻度,每次調度都會移除隊列頭第一個刻度,并新增一個刻度放入隊列尾,保證刻度總數不變。每次調度,隊列頭刻度里的消息將會被清理,unAckedMessageTracker 會自動把這些消息做重投遞。
重投遞就是客戶端發送一個 redeliverUnacknowledgedMessages 命令給 Broker。每一條推送給消費者但是未 ack 的消息,在 Broker 側都會有一個集合來記錄(pengdingAck),這是用來避免重復投遞的。觸發重投遞后,Broker 會把對應的消息從這個集合里移除,然后這些消息就可以再次被消費了。
4. Pulsar 服務端
Broker 是 Pulsar 的一個無狀態組件,主要負責運行以下兩個組件:
http 服務:提供為生產者和消費者管理任務和 Topic 查找的 REST API。Producer 通過連接到 Broker 來發送消息, Consumer 通過連接到 Broker 來接收消息。
調度器;提供異步 http 服務,用于二進制數據的傳輸。
4.1. 消息確認與留存
Pulsar Broker 會默認刪除已經被所有 Consumer 確認的消息,并以 backlog 的方式持久化存儲所有未被確認的內消息。
Pulsar 的 message retention(消息留存) 和 message expiry (消息過期)這兩個特性可以調整 Broker 的默認設置。
Message retention: 保留 Consumer 已確認的消息。
通過留存規則的設定,可以保證已經被確認且符合留存規則的消息持久地保存在 Pulsar 中,而沒有被留存規則覆蓋、已經被確認的消息會被刪除。
Message expire(消息過期):設置未確認消息的存活時長(TTL)。
通過設置消息的 TTL,有些即使還沒有被確認,但已經超過 TTL 的消息,也會被刪除。
4.2. 消息去重
實現消息去重的一種方式是確保消息僅生成一次,即生產者冪等。這種方式的缺點是把消息去重的工作交由應用去做。
在 Pulsar 中, Broker 支持配置開啟消息去重,用戶不需要為了消息去重去調整 Producer 的代碼。啟用消息去重后,即使一條消息被多次發送到 Topic 上,這條消息也只會被持久化到磁盤一次。
如下圖,未開啟消息去重時, Producer 發送消息 1 到 Topic 后, Broker 會把消息 1 持久化到 BookKeeper ,當 Producer 又發送消息 1 時, Broker 會把消息 1 再一次持久化到 BookKeeper 。開啟消息去重后,當 Producer 再次發送消息 1 時, Broker 不會把消息 1 再一次持久化到磁盤。
4.2.1. 去重原理
Producer 對每一個發送的消息,都會采用遞增的方式生成一個唯一的 sequenceID,這個消息會放在 message 的元數據中傳遞給 Broker 。同時, Broker 也會維護一個 PendingMessage 隊列,當 Broker 返回發送成功 ack 后, Producer 會將 PendingMessage 隊列中的對于的 Sequence ID 刪除,表示 Producer 任務這個消息生產成功。Broker 會記錄針對每個 Producer 接收到的最大 Sequence ID 和已經處理完的最大 Sequence ID。
當 Broker 開啟消息去重后, Broker 會對每個消息請求進行是否去重的判斷。收到的最新的 Sequence ID 是否大于 Broker 端記錄的兩個維度的最大 Sequence ID,如果大于則不重復,如果小于或等于則消息重復。消息重復時, Broker 端會直接返回 ack,不會繼續走后續的存儲處理流程。
4.3. 消息延遲傳遞
延時消息功能允許 Consumer 能夠在消息發送到 Topic 后過一段時間才能消費到這條消息。在這種機制中,消息在發布到 Broker 后,會被存儲在 BookKeeper 中,當到消息特定的延遲時間時,消息就會傳遞給 Consumer 。
下圖為消息延遲傳遞的機制。Broker 在存儲延遲消息的時候不會進行特殊的處理。當 Consumer 消費消息的時候,如果這條消息設置了延遲時間,則會把這條消息加入 DelayedDeliveryTracker 中,當到了指定的發送時間時,DelayedDeliveryTracker 才會把這條消息推送給消費者。
4.3.1. 延遲投遞原理
在 Pulsar 中,可以通過兩種方式實現延遲投遞。分別為 deliverAfter 和 deliverAt。
deliverAfter 可以指定具體的延遲時間戳,deliverAt 可以指定消息在多長時間后消費。兩種方式本質時一樣的,deliverAt 方式下,客戶端會計算出具體的延遲時間戳發送給 Broker 。
DelayedDeliveryTracker 會記錄所有需要延遲投遞的消息的 index 。index 由 Timestamp、 Ledger ID、 Entry ID 三部分組成,其中 Ledger ID 和 Entry ID 用于定位該消息,Timestamp 除了記錄需要投遞的時間,還用于延遲優先級隊列排序。DelayedDeliveryTracker 會根據延遲時間對消息進行排序,延遲時間最短的放在前面。當 Consumer 在消費時,如果有到期的消息需要消費,則根據 DelayedDeliveryTracker index 的 Ledger ID、 Entry ID 找到對應的消息進行消費。如下圖, Producer 依次投遞 m1、m2、m3、m4、m5 這五條消息,m2 沒有設置延遲時間,所以會被 Consumer 直接消費。m1、m3、m4、m5 在 DelayedDeliveryTracker 會根據延遲時間進行排序,并在到達延遲時間時,依次被 Consumer 進行消費。
4.4. Bundle
我們知道, Topic 分區會散落在不同的 Broker 中,那 Topic 分區和 Broker 的關系是如何維護的呢?當某個 Broker 負載過高時, Pulsar 怎么處理呢?
Topic 分區與 Broker 的關聯是通過 Bundle 機制進行管理的。
每個 namespace 存在一個 Bundle 列表,在 namesapce 創建時可以指定 Bundle 的數量。Bundle 其實是一個分片機制,每個 Bundle 擁有 namespace 整個 hash 范圍的一部分。每個 Topic (分區) 通過 hash 運算落到相應的 Bundle 區間,進而找到當前區間關聯的 Broker 。每個 Bundle 綁定唯一的一個 Broker ,但一個 Broker 可以有多個 Bundle 。
如下圖,T1、T2 這兩個 Topic 的 hash 結果落在[0x0000000L——0x4000000L]中,這個 hash 范圍的 Bundle 對應 Broker 2, Broker 2 會對 T1、T2 進行處理。
同理,T4 的 hash 結果落在[0x4000000L——0x8000000L]中,這個 hash 范圍的 Bundle 對應 Broker 1, Broker 1 會對 T4 進行處理;
T5 的 hash 結果落在[0x8000000L——0xC000000L]中,這個 hash 范圍的 Bundle 對應 Broker 3, Broker 3 會對 T5 進行處理;
T3 的 hash 結果落在[0xC000000L——0x0000000L]中,這個 hash 范圍的 Bundle 對應 Broker 3, Broker 3 會對 T3 進行處理。
Bundle 可以根據綁定的 Broker 的負載進行動態的調整、綁定。當 Bundle 綁定的 Broker 的 Topic 數過多、負載過高時,都會觸發 Bundle 拆分,將原有的 Bundle 拆分成 2 個 Bundle ,并將其中一個 Bundle 重新分配給不同的 Broker ,以降低原 Broker 的 Topic 數或負載。
5. Pulsar 存儲層(Bookkeeper)
BookKeeper 是 Pulsar 的存儲組件。
對于 Pulsar 的每個 Topic(分區),其數據并不會固定的分配在某個 Bookie 上,具體的邏輯實現我們在 Bundle 一節已經討論過,而 Topic 的物理存儲,實際上是通過 BookKeeper 組件來實現的。
5.1. 分片存儲
概念:
Bookie:BookKeeper 的一部分,處理需要持久化的數據。
Ledger:BookKeeper 的存儲邏輯單元,可用于追加寫數據。
Entry:寫入 BookKeeper 的數據實體。當批量生產時,Entry 為多條消息,當非批量生產時,Entry 為單條數據。
Pulsar 在物理上采用分片存儲的模式,存儲粒度比分區更細化、存儲負載更均衡。如圖,一個分區 Topic-Partition 2 的數據由多個分片組成。每個分片作為 BookKeeper 中的一個 Ledger ,均勻的分布并存儲在 BookKeeper 的多個 Bookie 節點中。
基于分配存儲的機制,使得 Bookie 的擴容可以即時完成,無需任何數據復制或者遷移。當 Bookie 擴容時,Broker 可以立刻發現并感知新的 Bookie ,并嘗試將新的分片 Segment 寫入新增加的 Bookie 中。
如上圖,在 Broker 中,消息以 Entry 的形式追加的形式寫入 Ledger 中,每個 Topic 分區都有多個非連續 ID 的 Ledger,Topic 分區的 Ledger 同一時刻只有一個處于可寫狀態。
Topic 分區在存儲消息時,會先找到當前使用的 Ledger ,生成 Entry ID(每個 Entry ID 在同一個 Ledger 內是遞增的)。當 Ledger 的長度或 Entry 個數超過閾值時,新消息會存儲到新 Ledger 中。每個 messageID 由[Ledger ID, Entry ID, Partition 編號,batch-index]組成。( Partition :消息所屬的 Topic 分區,batch-index:是否為批量消息)
一個 Ledger 會根據 Topic 指定的副本數量存儲到多個 Bookie 中。一個 Bookie 可以存放多個不連續的 Ledger。
5.2. 讀寫數據的流程
Journals :Journals 文件包含 BookKeeper 的事務日志信息。在對 Ledger 更新之前, Bookie 會保證更新的事務信息已經寫入 Journals 。當 Bookie 啟動或者舊的 Journals 大小達到閾值時,就會創建一個新的 Journals 。
Entry Logs:Entry Logs 管理從 Bookie 收到的 Entry 數據。來自不同 Ledger 的 Entry 會按順序聚合并寫入 Entry Logs ,這些 Entry 在 Entry Logs 中的偏移量會作為指針保存在 Ledger Cache 中,以便快速查找。當 Bookie 啟動或者舊的 Entry Logs 大小達到閾值時,就會創建一個新的 Entry Logs 。當舊的 Entry Logs 沒有與任何活躍的 Ledger 關聯時,就會被垃圾回收器刪除。
Index Files:每個 Ledger 都會創建一個 Index file,它包括一個頭和幾個固定長度的 Index page,這些 Index page 記錄存儲在 Entry Logs 中的 Entry 的偏移量。由于更新索引文件會引入隨機的磁盤 I/O,所以索引文件由后臺運行的同步線程延遲更新。這確保了更新的快速性能。在索引頁持久化到磁盤之前,將它們聚集在 Ledger Cache 中以方便查找。
Ledger Cache:Ledger Cache 存放在內存池中,這樣可以更高效地管理磁盤頭調度。
5.2.1. 消息的寫入
1.將 Entry 追加寫入 Ledger 中。
將這次 Entry 的更新操作寫入 Journal 日志中,當由多個數據寫入時,可以批量提交,將數據刷到 Journal 磁盤中。
將 Entry 數據寫入寫緩存中。
返回寫入成功響應。
到這里,消息寫入的同步流程已經完成。
3-A. 內存中的 Entry 數據會根據 Ledger 和寫入 Ledger 的時間順序進行排序,批量寫入 Entry Log 中。
3-B. Entry 在 Entry log 中的偏移量以 Index Page 的方式寫入 Ledger Cache 中,即 iIdex Files。
Entry Log 和 Ledger Cache 中的 Index File 會 Flush 到磁盤中。
5.2.2. 消息的讀取
A.先從寫緩存中以尾部讀的方式讀取。
B.如果寫緩存未命中,則從讀緩存中讀取。
C.如果讀緩存未命中,則從磁盤中讀取。磁盤讀取有三步:
C-1.讀取 Index Disk,獲取 Entry 的偏移量。
C-2.根據 Entry 的偏移量,在 Entry Disk 中快速找到 Entry 。
C-3.將 Entry 數據寫入讀緩存中。
6. 參考文獻
Pulsar 官方文檔
BookKeeper 官方文檔
【MQ Oteam】Apache Pulsar 技術系列 - 客戶端消息確認
【MQ Oteam】Apache Pulsar 技術系列 - Message deduplication 這里的去重與你想的可能不一樣
【MQ Oteam】Apache Pulsar 技術系列 - Pulsar 延遲消息投遞解析
Apache 系列—Pulsar 核心特性解析
總結
以上是生活随笔為你收集整理的新一代消息队列 Pulsar的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 腾讯自主研发动画组件PAG开源
- 下一篇: 腾讯 PB 级大数据计算如何做到秒级?