Apache Pulsar:实时数据处理中消息,计算和存储的统一
本文轉載自”AI前線“,整理自翟佳在 QCon2018 北京站的演講,在本次演講中,翟佳介紹了 Apache Pulsar 的架構、特性和其生態系統的組成,并展示了 Apache Pulsar 在消息、計算和存儲三個方面進行的協調、抽象和統一。
- Messaging:Pulsar 對 pub/sub 和 queue 兩種模式提供統一的支持,同時保證了一致性,高性能和易擴展性。
- Computing:Pulsar 內部的 Pulsar-Functions 提供了 Stream-native 的輕量級計算框架,保證了數據的即時流式處理。
- Storage:Pulsar 借助 Apache BookKeeper 提供了以 segment 為中心的存儲架構,保證了存儲的性能,持久性和彈性。
實時計算系統的發展
實時數據處理在剛剛興起的時候,一般企業會采用λ架構,維護兩套系統:一套用來處理實時的數據;另一套用 batch 的方式處理歷史數據。兩套系統帶來了資源的冗余占用和維護的不便。
為了消除冗余,逐漸演化出κ架構,使用一套系統來滿足實時數據處理和歷史數據處理的需求。
不管是λ架構還是κ架構,在實時處理的系統中,系統的核心由消息、計算和存儲三個子系統組成,比如消息系統有 Kafka、RabbitMQ、Flume 等;計算系統有 Spark Streaming、Flink、Heron 等;存儲系統有各種分布式的文件系統,DB、K/V store 等。 由于三個部分中,每個部分都有相應的不同產品,三個部分之間也相互分隔和獨立很少關聯,這帶來了一些問題,比如需要更多人力維護,部署復雜,調優難度大,監管難,數據丟失風險大等等。
為什么要選擇 Apache Pulsar?
面對消息,存儲和計算三個部分分隔的現狀,Apache Pulsar 在這三個方面進行了很好的協調、抽象和統一。 具體到 Apache Pulsar 內部,消息部分由 Pulsar Broker 來負責;存儲部分使用了 Apache BookKeeper,計算部分由 Pulsar Functions 來負責。
Apache Pulsar 是 2016 年 yahoo 開源的下一代大規模分布式消息系統,目前在 Apache 基金會下孵化。在 Yahoo 的生產環境中大規模部署并使用了近 4 年,服務于 Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa 以及 Yahoo 的 KV 存儲等,在 Yahoo 全球 8 個數據中心之間維護了全聯通的復制,并包含了 200 多萬個 Topics。
Apache Pulsar 有幾個明顯區別于其他消息系統的特點:
- 優秀的數據持久性和順序性。每一條消息都提供了全局唯一的 ID,多副本,并都是在實時刷盤后再返回給用戶。
- 統一的消費模型: 支持 Stream(如 Kafka)和 Queue(如 RabbitMQ)兩種消費模型, 支持 exclusive、failover 和 shared 三種消費模式。
- 靈活的擴展性: 節點擴展的線性和瞬時完成,在擴展中不會有數據的拷貝和遷移。
- 高吞吐低延遲,在實時刷盤的前提下,依然提供了高帶寬(180 萬 messages/ 秒)和低延遲(5ms at 99%)。
除了這些特性,Apache Pulsar 也具備了優秀的企業級特性,比如多機房互聯互備(Geo-replication),多租戶等。
Apache Pulsar 在架構上最明顯的優勢是采用了消息服務和消息存儲分層的策略。它包括了無狀態的消息服務層(broker 節點)和消息存儲層(BookKeeper 中 Bookie 是基本的存儲節點)。這為系統帶來了極好的擴展性和健壯性。
在消息服務層和存儲層,系統所關注的內容是不一樣的: 在服務層更多的是對 Producer 和 Consumer 的支持,更關注用戶接口和消息的服務質量,需要更好的 CPU 和網絡帶寬來支持消息的扇入扇出。存儲層更關注磁盤 IOPS 和存儲容量,負責數據的持久化等。
分層的架構帶為服務和存儲兩層都帶來了線性、瞬時的擴展性。如果需要增加和支持更多的 Producer 和 Consumer,只用對 broker 進行 Scale。如果存儲空間緊張,或者想要消息的時間保持的時間更長,可以單獨增加存儲節點 Bookie。
在服務層中,broker 不會有相關的數據被持久化保存,是無狀態的。對 Topic 的服務可以很容易地遷移。如果 broker 失效,可以很容易地將 topic 遷移到健康的 broker。
在存儲層(Bookie)也是一樣。每個 topic 的數據被打散并均勻 partition 到多個 segment,每個 segment 的數據又被分散存儲在 Bookie 集群中。當想增加容量的時候,只需要添加新的 Bookie,數據會優先選擇剛加入的 Bookie。
同樣當 broker 被 overloaded,添加新的 broker 之后,負載會被均衡地分配到新添加的 broker 之上。
介紹完 Apache Pulsar 的總體架構和特性,下面會從消息、存儲和計算三個方面分別介紹 Apache Pulsar 的設計理念,各層內部以及各層之間的協調、抽象和統一。
Apache Pulsar 的消息層
Apache Pulsar 面向用戶的也是最簡單的三個概念: 主題 Topic、生產者 Producer 和消費者 Consumer。 Topic 是消息的一個通道和載體; Producer 產生數據并向 Topic 這個通道中發送數據; Consumer 從 Topic 中獲取并消費數據。
在 Apache Pulsar 中提供了對 Namespace 的支持。Namespace 是 ApachePulsar 的多租戶機制中重要的組成部分。在一個 Topic 的名字中,包含了:租戶 (Tenant) ,命名空間(namespace)和 Topic 名字,這樣就可以對所有的 topic 提供層級化的管理。
Tenant 代表系統里的租戶。假設有一個 Pulsar 集群被多個組織共享,集群里的每個 Tenant 可以代表一個組織的團隊、一個核心的功能或一個產品線。一個 Tenant 可以包含多個 namespace,一個 namespace 可以包含多個主題。
Tenant 是資源的隔離的單位。namespace 是資源使用和權限設置的單位,我們可以設置權限、調整復制選項、管理跨集群的數據復制、控制消息的過期時間等。namespace 下的 Topic 會繼承 namespace 的配置。如果用戶獲取了 namespace 的寫入權限就可以往 namespace 寫入數據,如果要寫入的 topic 不存在,就會創建該 topic。
為了支持異地多備,namespace 又分為兩種,一種是本地的,只在集群內可見;一種是全局的,對多個集群可見。可以在不同的數據中心之間進行數據的交互和互備。
Apache Pulsar 的每個 namespace 可以包含多個 topic,而每個 topic 可以有多個生產者和訂閱者。每個訂閱者可以接受 topic 的所有的消息。為了給應用程序提供更大的靈活性,Apache Pulsar 通過增加一層 subscription 的抽象,提供了統一的消費模式。 消息的傳遞路徑是 producer-topic-subscription-consumer。subscription 類似 Kafka 中 consumer group 的概念。
Apache Pulsar 支持 exclusive、failover 和 shared 三種訂閱類型,它們可以共存在同一個 topic 上。數據雖然只寫了一次,但是可以通過三種的消費方式被多次消費。
前兩種 exclusive 和 failover,都是 Streaming 的模型,只有一個 consumer 來消費一個 topic partition 中的所有數據,都能保證嚴格的順序。Kafka 和 Kinesis 也是這種消費模型(一個 consumer 消費一個 partition)。
Exclusive 是只能有一個 consumer 來消費一個 topic 中的數據,不允許其他的 consumer 加入;failover 是允許多個 consumer 和一個 subscription 關聯,當 master consumer 失效后,可以有另外的 consumer 來接管成為新的 master。
第三種是 shared 的消費模式,它屬于 Queue 的模式,常見的 RabbitMQ、ActiveMQ 均屬于這種模式。如果三個 consumer 共同訂閱同一個 subscription,每個 consumer 大概會消費這個 topic 中的三分之一的數據,如果想?增加消費的帶寬,只用單獨增加 consumer 的數量而不需要改變 topic 和 partition,非常實用于一些 consumer 處理復雜度比較高的場景,比如視頻,圖片處理等。
除了這三種消費模式,Apache Pulsar 還提供了 reader 的 API 來讀取消息,讓用戶可以更加靈活的控制和消費消息。
Apache Pulsar 提供了兩種 ack 的機制: 累積(cumulative)模式和單條(individual)模式。
Ack 機制在在消息系統中是非常重要的。消息系統中的 broker 和 consumer 可能會出錯或宕機,當有錯誤發生的時候,如果能夠獲取上次消費者消費的位置,然后從這個消費的位置再接著消費,這是非常有用的,這樣可以避免丟失數據,避免把所有的處理過的數據再處理一遍。
一般通過 message acknowledgement、committing offset 來標記消息的消費情況。
Kafka 中通過 offset 來簡單的管理 ack,記錄一個 partition 的消費位置。
Pulsar 通過維護一個專門的數據結構 ManagedCursor 來管理 ack 的信息,每次 ack 的改變都會被持久化到硬盤中。
對于 cumulative 的 ack,在標記的消息之前,所有的數據都被消費過了;遇到出錯的情況會從標記的位置再開始消費。
對于 individual 的消費模式,會單獨標記已經被消費過的消息;遇到出錯的情況,所有的未被標記 ack 的消息都會被重新發送。Individual 的 ack 模式主要支持 share 的消費模式。它是很有必要的,因為對一般的 share 的消費模式,都是單個的消息消費處理比較慢,所以才增加 consumer。單獨的標記,能在出錯的時候減少不必要的昂貴的處理。
消息的 retention 策略,管理著消息什么時候被刪除。 其他的系統大多是通過時間來控制。有可能時間到了,但消息沒有被消費,也被刪除了。
Apache Pulsar 中,提供了比較全面的 retention 策略。一般情況下,借助 ack 的信息,當所有 subscription 都消費了消息之后,消息才會刪除。數據還可以額外的設置 retention period,即使都消費了也能再將消息保存一段時間。另外也支持 TTL 的模式。
對于留在 backlog 中的消息,Apache Pulsar 也提供了多種策略,包括 producer-request-hold、producer-exception、consumer-backlog-eviction 等。在 backlog 的 quota 達到時,供用戶選擇怎么處理新的消息和在 backlog 中的消息。
Apache Pulsar 的存儲層
接下來我們來看一下 Apache Pulsar 的存儲層,也就是 Apache BookKeeper。Apache BookKeeper 在 2011 年開源,并隨后加入 Apache,成為 Apache 的頂級項目。BookKeeper 是分布式的是一個可擴展的、高可用、低延遲的專門為實時系統優化過的存儲系統。更多系統可以參考 BookKeeper 的網站 https://bookkeeper.apache.org/ 和 github:https://github.com/apache/bookkeeper。
Apache BookKeeper 為 Pulsar 系統提供了一個以 Segment(BookKeeper ledger)為存儲單元的存儲服務。BookKeeper 的存儲節點稱作一個 Bookie。
?
- BookKeeper 為 append-only 的寫入模式提供了優化,通過獨特的設計提供了高帶寬和低延遲。
- BookKeeper 提供了強一致性和順序性。通過實時刷盤和多備份保證數據的持久性。順序性通過記錄本身攜帶的全局唯一順序 ID 來保證的。這樣對很多對順序要求比較高的應用場景。
- 高可用是說數據會同時寫入多個 bookie 上,如果 bookie 發生錯誤,即使只有一臺包含數據的 bookie 可用,仍能為應用提供服務,在其他 bookie 恢復或有新的 bookie 加入后,會自動檢查并補全所需要的數據備份。
- IO 隔離,對于 Bookie 的讀和寫是分別發生在不同的磁盤上的。這樣不依賴于文件系統和 pagecache 的設計,能保證即使有大量的讀的同時,也能保證寫的高帶寬和低延遲;在大量的寫入的同時,讀請求的服務質量也能得到保證。這也是能保證多租戶的一個關鍵。 ?
一個 BookKeeper 的集群由多個 Bookie 節點構成。每個 Bookie 負責具體的數據存儲。當用戶的 application 要使用 bk 的時候,會設定三個參數,ensemble size(用戶要使用幾臺 bookie)、write quorum(寫入的數據要保留幾個備份)和 ack quorum(每次的寫入操作,有幾個成功后就返回)。Bookie 采用 quorum-vote 的模式,當寫一條數據時,數據同時并發的寫到所有的 write quorum 的 bookie 中,當指定的 ack quorum 返回后,bookie 認為寫成功,返回。
當 ensemble 中有 bookie 出錯,會從 cluster 中尋找其他可用的 bookie,進行替換。然后后臺有 autorecovery 做數據的自動恢復,對用戶透明。
BookKeeper 的一個特性是存儲是以 Segment(在 BookKeeper 內部被稱作 ledger)為存儲的基本單元。每個 Segment 甚至到每個消息的粒度,都會被均勻分散到 BookKeeper 的集群中。保證了數據和服務在多個 Bookie 上的均勻性。通過這張圖,我們通過簡單對比 Pulsar 和 Kafka 中的 partition 的存儲過程,對 Pulsar 有一個更好的理解。
Pulsar 和 Kafka 都是基于 partition 的邏輯概念來做做 topic 的存儲。最根本的不同是,Kafka 的物理存儲也是以 partition 為單位的,每個 partition 必須作為一個整體(一個目錄)被存儲在某一個 broker 上。 而 Pulsar 的每個 partition 是以 segment 作為物理存儲的單位,Pulsar 中的每個 partition 會再被打散并均勻分散到多個 bookie 節點中。
這樣的一個直接的影響是,Kafka 的 partition 的大小,受制于單臺 broker 的存儲;而 Pulsar 的一個 partition 則可以利用整個集群的存儲容量。
當 partition 的容量上限達到后,需要擴容的時候,如果現有的單臺機器不能滿足,Kafka 可能需要添加新的存儲節點,將 partition 的數據搬移到更大的節點上。但是 Pulsar 只用添加新的 Bookie 存儲節點,新加入的節點由于剩余的空間大,會被優先使用,更多的接收新的數據;而且其中不會涉及到任何的老的數據的拷貝和搬移。
Pulsar 在單個節點失敗時也會體現同樣的優勢。如果 Pulsar 的服務節點 broker 失效,由于 broker 是無狀態的,其他的 broker 可以很快的接管 topic,不會涉及 topic 數據的拷貝;如果存儲節點 Bookie 失效,集群中其他的 Bookie 會從多個 Bookie 節點中并發讀取數據,并對失效節點的數據自動進行數據的恢復,不會對前端的服務有影響。
Apache BookKeeper 內部除了基礎的的 Segment(ledger), 還提供了 Stream 和 Table 兩種服務。 Segment 可以簡單理解為一段復制日志。Stream 服務是通過一定的方式,將一組 Segment 按照順序共同管理起來,這樣就可以組成一個源源不斷的流。進而,如果我們用 Stream 來作為一個 Table 的 change log,實現了一個簡單的 K/V Store,也就是這里說的 Table 的服務。在實時處理的過程中,比如 Pulsar Functions 的處理過程中,需要使用 K/V 的 Table 來存取計算的中間狀態。
通過在 BookKeeper 內部提供 Stream 和 Table 兩種服務,可以很方便的滿足在實時數據處理中的絕大部分的存儲需求。
Apache Pulsar 的計算層
介紹完 Pulsar 中的消息和存儲,下面我們來了解一下 Pulsar 中的計算部分 – Pulsar Functions。介紹一下 Pulsar Functions 的設計和實現。看看 Pulsar Functions 和其他的計算引擎不同的地方。
首先我們看一個計算引擎最本質的是要解決什么問題。 首先用戶定了了一個計算的需求,也就是處理過程: f(x),一組輸入數據通過 f(x)的計算,得到一組輸出的結果。
基于本質問題,計算引擎經過了長期的發展。第一代的計算引擎,以 Storm 為代表的通過一個有向無環圖(DAG)來完成一組計算,通常需要大量的代碼編寫工作。現在大部分的計算引擎都提供第二代的 API,即通過 DSL 的方式。第二代的 API 相比第一代更加的緊湊和方便,但是還是有些復雜,比如包含著大量的 map、flatmap 等。
我們發現,在實時數據的處理中,有大部分(60%——80%)的計算過程,本質上都是一些很簡單的數據轉換,比如 ETL/Reactive Services/Classification/Real-time Aggregation/Event Routing/Microservices 等等。
另外,云的興起,帶動了 serverless 的出現和興盛,Serverless 為我們提供了一個很好的思路。serverless 提供的是 function 的 API,每一個事件觸發一次 function,多個 function 可以通過組合的方式,完成比較復雜的邏輯。
基于這些原因,我們決定設計基于 Serverless 的,由消息來驅動的“Stream-native”的 Pulsar Functions。Pulsar Function 的一個特點是簡單:給用戶的接口簡單;每個 Function 的實現也十分容易理解;提供多語言的接口(目前支持 Java 和 Python)。
另一個特點是 Stream-native: Pulsar Functions 的輸入,輸出和中間的 log 都以 Topic 和消息為中心。
Pulsar Functions 提供兩種 API,第一種是 SDK less 的 API,用戶不用依賴 Pulsar 的 sdk,只用實現 java.util.function.Function 的接口。第二種借助 Pulsar SDK 的 API,通過 Context 來和 Pulsar 交互和定制。
和 Pulsar 的管理一樣,Pulsar Functions 也提供命令行和 Rest 兩種方式。執行的參數包括輸入的 topic,輸出的 topic 和要執行的 Function 的名字。
我們可以舉例說明一下 Pulsar Functions 適用的典型應用場景。
在邊緣計算(Edge Computing)中,傳感器會產生大量數據,而且數據會在邊緣的本地節點上進行很多簡單的處理,比如 Simple filtering, threshold detection, regex matching 等,另外邊緣節點的計算資源有限。 Pulsar Functions 對這樣的場景十分匹配。另外是在機器學習中。最開始的基礎模型通過離線進行計算和訓練。當訓練完,上線后,每一個輸入,都會匹配和應用模型,并對模型進行調整。這十分匹配 Pulsar Functions 的消息驅動的模式。另外模型本身也可以使用 BookKeeper 做存儲,簡化系統的部署。
這里 Pulsar Functions 的特性做一個總結。
首先,Pulsar Function 可以簡單運行在 Pulsar 的 broker 里面,簡化系統的部署。輸入的 Topic 中的每一個消息都會觸發對 Function 的執行。可以支持多個 Topic 作為輸入。用戶可以控制 Function 執行的各種語義:AtMostOnce 是當 Function 收到消息后就進行 ACK;AtLeastOnce 是在 Function 對消息處理完成后才進行 ACK;ExactlyOnce 是通過 Pulsar 內部實現的 deDup 的策略來實現。 Pulsar Functions 可以使用 BookKeeper 提供的 Stream 服務來做 Topic 的存儲,使用提供的 Table 服務來做中間狀態的存儲,實現存儲的統一,不需要部署其他的系統。這為系統的開發、測試、集成和運維帶來了更多的便利。
通過介紹 Pulsar 的消息,存儲和計算三個部分,希望能讓大家對 Pulsar 有更進一步的了解。在 Pulsar 的消息系統中,提供了基于 Stream 和 Queue 的統一的消費模式,提供了無狀態的 Broker 來提升系統的擴展性和容錯性。在存儲系統 BookKeeper 中,提供了對 Stream 的存儲和對 K/V Table 的存儲的統一,滿足了實時處理系統中對 topic 和狀態的存儲需求。 在計算部分,Pulsar Functions 中基于消息驅動(stream-native),可以計算和消息一種統一。
另外對于 Pulsar 系統和外部系統的互聯(connector),可以看作是一種特殊的 Pulsar Functions。
Pulsar 及 Kafka 基準測試對比
這里的 Benchmark(https://github.com/openmessaging/openmessaging-benchmark)是我們和阿里一起起草的 openMessaging 項目的一部分。如果有時間和機器,歡迎大家自己驗證一下。
這個 Benchmark 通過相同的配置,對 Apache Pulsar 和 Kafka 的帶寬和延遲進行了簡單的測試。
?最大吞吐量測試
這個結果是分別測試了 Pulsar 和 Kafka 在一般模式和 Exactly-once 模式下的 Publish 帶寬。
在 1KB 消息大小下,Pulsar 的一般模式和 Exactly-once 模式下的帶寬都在 21 萬條 / 秒左右;Kafka 在一般模式和 Exactly-once 模式下的帶寬分別是 7 萬多條 / 秒和 5 萬多條 / 秒。
除了帶寬數值的區別,另一方面是對 ExactlyOnce 的處理,Pulsar 通過自身的機制,幾乎相對于一般的 模式在性能上沒有區別。但是 Kafka 的兩種模式會有較大的差別。
?時延測試
這個結果是 Pulsar 和 Kafka 在固定的 Public 帶寬(50K/ 秒)下,各個百分位消息的發布時延。可以看出 Kafka 在不到 99% 的百分位,時延就開始大幅上升,但是 Pulsar 在 99.9% 的百分位以后,時延才開始上升。
這個結果是從時間軸的角度來看 Pulsar 和 Kafka 的時延。先不關注時延的絕對數值,直觀的感覺是 Pulsar 的時延更加穩定;Kafka 的時延會有很大的波動。 這和 Pulsar 中的內存和對 GC 的優化有直接的關系。Apache Pulsar 是一個新興的下一代的消息系統,由于 Pulsar Functions 的加入,和底層 Apache BookKeeper 提供的 Table 服務的完善,現在可以認為 Apache Pulsar 是一個在消息、存儲和計算三方面的統一的實時數據處理平臺。
Apache Pulsar 有很多先進的理念、設計和抽象在里面。由于時間關系有很多的部分沒能展開細講。
Apache Pulsar 和 Apache BookKeeper 中也有越來越多的有意思的 feature 和功能正在進行,公司和社區也都期待大家的關注和加入。如果大家有更多的關于 Meetup 和 POC 等需求,或者在使用其他消息系統中遇到問題,可以通過 Slack Channel 和微信聯系我們。
作者介紹
翟佳,Streamlio核心創始成員之一,畢業于中科院計算所,目前就職于一家下一代實時處理初創公司 Streamlio。在此之前任職于 EMC,是北京 EMC實時處理平臺的技術負責人。主要從事實時計算和分布式存儲系統的相關開發,是開源項目 Apache BookKeeper的PMC Member和 Committer,也是 Apache Pulsar的PMC Member和 Committer。
原文鏈接:https://mp.weixin.qq.com/s/B9zo0zThARAi11hRuJ-AqA
轉載于:https://my.oschina.net/apachepulsar/blog/1930480
總結
以上是生活随笔為你收集整理的Apache Pulsar:实时数据处理中消息,计算和存储的统一的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java蛮力法背包问题_蛮力法、动态规划
- 下一篇: 什么是四种七和弦 和三种转位