深度揭秘Twitter的新一代流处理引擎Heron
流計算又稱實時計算,是繼以Map-Reduce為代表的批處理之后的又一重要計算模型。隨著互聯(lián)網(wǎng)業(yè)務(wù)的發(fā)展以及數(shù)據(jù)規(guī)模的持續(xù)擴大,傳統(tǒng)的批處理計算難以有效地對數(shù)據(jù)進行快速低延遲處理并返回結(jié)果。由于數(shù)據(jù)幾乎處于不斷增長的狀態(tài)中,及時處理計算大批量數(shù)據(jù)成為了批處理計算的一大難題。在此背景之下,流計算應(yīng)運而生。相比于傳統(tǒng)的批處理計算,流計算具有低延遲、高響應(yīng)、持續(xù)處理的特點。在數(shù)據(jù)產(chǎn)生的同時,就可以進行計算并獲得結(jié)果。更可以通過Lambda架構(gòu)將即時的流計算處理結(jié)果與延后的批處理計算結(jié)果結(jié)合,從而較好地滿足低延遲、高正確性的業(yè)務(wù)需求。
Twitter由于本身的業(yè)務(wù)特性,對實時性有著強烈的需求。因此在流計算上投入了大量的資源進行開發(fā)。第一代流處理系統(tǒng)Storm發(fā)布以后得到了廣泛的關(guān)注和應(yīng)用。根據(jù)Storm在實踐中遇到的性能、規(guī)模、可用性等方面的問題,Twitter又開發(fā)了第二代流處理系統(tǒng)——Heron[1],并在2016年將它開源。
重要概念定義
在開始了解Heron的具體架構(gòu)和設(shè)計之前,我們首先定義一些流計算以及在Heron設(shè)計中用到的基本概念:
-
Tuple:流計算任務(wù)中處理的最小單元數(shù)據(jù)的抽象。
-
Stream:由無限個Tuple組成的連續(xù)序列。
-
Spout:從外界數(shù)據(jù)源獲得數(shù)據(jù)并生成Tuple的計算任務(wù)。
-
Bolt:處理上游Spout或者Bolt生成的Tuple的計算任務(wù)。
-
Topology:一個通過Stream將Spout和Bolt相連的處理Tuple的邏輯計算任務(wù)。
-
Grouping:流計算中的Tuple分發(fā)策略。在Tuple通過Stream傳遞到下游Bolt的過程中,Grouping策略決定了如何將一個Tuple路由給一個具體的Bolt實例。典型的Grouping策略有:隨機分配、基于Tuple內(nèi)容的分配等。
-
Physical Plan:基于Topology定義的邏輯計算任務(wù)以及所擁有的計算資源,生成的實際運行時信息的集合。
在以上流處理基本概念的基礎(chǔ)上,我們可以構(gòu)建出流處理的三種不同處理語義:
-
至多一次(At-Most-Once): 盡可能處理數(shù)據(jù),但不保證數(shù)據(jù)一定會被處理。吞吐量大,計算快但是計算結(jié)果存在一定的誤差。
-
至少一次(At-Least-Once):在外部數(shù)據(jù)源允許Replay(重演)的情況下,保證數(shù)據(jù)至少被處理一次。在出現(xiàn)錯誤的情況下會重新處理該數(shù)據(jù),可能會出現(xiàn)重復(fù)處理多次同一數(shù)據(jù)的情況。保證數(shù)據(jù)的處理但是延遲升高。
-
僅有一次(Exactly-Once):每一個數(shù)據(jù)確保被處理且僅被處理一次。結(jié)果精確但是所需要的計算資源增多并且還會導(dǎo)致計算效率降低。
從上可知,三種不同的處理模式有各自的優(yōu)缺點,因此在選擇處理模式的時候需要綜合考量一個Topology對于吞吐量、延遲、結(jié)果誤差、計算資源的要求,從而做出最優(yōu)的選擇。目前的Heron已經(jīng)實現(xiàn)支持至多一次和至少一次語義,并且正在開發(fā)對于僅有一次語義的支持。
Heron系統(tǒng)概覽
保持與Storm接口(API)兼容是Heron的設(shè)計目標(biāo)之一。因此,Heron的數(shù)據(jù)模型與Storm的數(shù)據(jù)模型基本保持一致。每個提交給Heron的Topology都是一個由Spout和Bolt這兩類結(jié)點(Vertex)組成的,以Stream為邊(Edge)的有向無環(huán)圖(Directed acyclic graph)。其中Spout結(jié)點是Topology的數(shù)據(jù)源,它從外部讀取Topology所需要處理的數(shù)據(jù),常見的如kafka-spout,然后發(fā)送給后續(xù)的Bolt結(jié)點進行處理。Bolt節(jié)點進行實際的數(shù)據(jù)計算,常見的運算如Filter、Map以及FlatMap等。
我們可以把Heron的Topology類比為數(shù)據(jù)庫的邏輯查詢計劃。這種邏輯上的計劃最后都要變成實質(zhì)上的處理計劃才能執(zhí)行。用戶在編寫Topology時指定每個Spout和Bolt任務(wù)的并行度和Tuple在Topology中結(jié)點間的分發(fā)策略(Grouping)。所有用戶提供的信息經(jīng)過打包算法(Pakcing)的計算,這些Spout和Bolt任務(wù)(task)被分配到一批抽象容器中。最后再把這些抽象容器映射到真實的容器中,就可以生成一個物理上可執(zhí)行的計劃(Physical plan),它是所有邏輯信息(拓?fù)鋱D、并行度、計算任務(wù))和運行時信息(計算任務(wù)和容器的對應(yīng)關(guān)系、實際運行地址)的集合。
整體結(jié)構(gòu)
總體上,Heron的整體架構(gòu)如圖1所示。用戶通過命令行工具(Heron-CLI)將Topology提交給Heron Scheduler。再由Scheduler對提交的Topology進行資源分配以及運行調(diào)度。在同一時間,同一個資源平臺上可以運行多個相互獨立Topology。
圖1 Heron架構(gòu)
與Storm的Service架構(gòu)不同,Heron是Library架構(gòu)。Storm在架構(gòu)設(shè)計上是基于服務(wù)的,因此需要設(shè)立專有的Storm集群來運行用戶提交的Topology。在開發(fā)、運維以及成本上,都有諸多的不足。而Heron則是基于庫的,可以運行在任意的共享資源調(diào)度平臺上。最大化地降低了運維負(fù)擔(dān)以及成本開銷。
目前的Heron支持Aurora、YARN、Mesos以及EC2,而Kubernetes和Docker等目前正在開發(fā)中。通過可擴展插件Heron Scheduler,用戶可以根據(jù)不同的需求及實際情況選擇相應(yīng)的運行平臺,從而達到多平臺資源管理器的支持[2]。
而被提交運行Topology的內(nèi)部結(jié)構(gòu)如圖2所示,不同的計算任務(wù)被封裝在多個容器中運行。這些由調(diào)度器調(diào)度的容器可以在同一個物理主機上,也可分布在多個主機上。其中每一個Topology的第一個容器(容器0)負(fù)責(zé)整個Topology的管理工作,主要運行一個Topology Master進程;其余各個容器負(fù)責(zé)用戶提交的計算邏輯的實現(xiàn),每個容器中主要運行一個Stream Manager進程,一個Metrics Manager進程,以及多個Instance進程。每個Instance都負(fù)責(zé)運行一個Spout或者Bolt任務(wù)(task)。對于Topology Master、Stream Manager以及Instance進程的結(jié)構(gòu)及重要功能,我們會在本文的后面章節(jié)進行詳細的分析。
圖2 Topology結(jié)構(gòu)
狀態(tài)(State)存儲和監(jiān)控
Heron的State Manager是一個抽象的模塊,它在具體實現(xiàn)中可以是ZooKeeper或者是文件系統(tǒng)。它的主要作用是保存各個Topology的各種元信息:Topology的提交者、提交時間、運行時生成的Physical Plan以及Topology Master的地址等,從而為Topology的自我恢復(fù)提供幫助。
每個容器中的Metrics Manager負(fù)責(zé)收集所在容器的運行時狀態(tài)指標(biāo)(Metrics),并上傳給監(jiān)控系統(tǒng)。當(dāng)前Heron版本中,簡化的監(jiān)控系統(tǒng)集成在Topology Master中。將來這一監(jiān)控模塊將會成為容器0中的一個獨立進程。Heron還提供Heron-Tracker和Heron-UI 這兩個工具來查看和監(jiān)測一個數(shù)據(jù)中心中運行的所有Topology。
啟動過程
在一個Topology中,Topology Master是整個Topology的元信息管理者,它維護著完整的Topology元信息。而Stream Manager是每個容器的網(wǎng)關(guān),它負(fù)責(zé)各個Instance之間的數(shù)據(jù)通信,以及和Topology Master之間的控制信令。
當(dāng)用戶提交Topology之后,Scheduler便會開始分配資源并運行容器。每個容器中啟動一個Heron Executor的進程,它區(qū)分容器0和其他容器,分別啟動Topology Master或者Stream Manager等進程。在一個普通容器中,Instance進程啟動后會主動向本地容器的Stream Manager進行注冊。當(dāng)Stream Manager收到所有Instance的注冊請求后,會向Topology Master發(fā)送包含了自己的所負(fù)責(zé)的Instance的注冊信息。當(dāng)Topology Master收到所有Stream Manager的注冊信息以后,會生成一個各個Instance,Stream Manager的實際運行地址的Physical Plan并進行廣播分發(fā)。收到了Physical Plan的各個Stream Manager之間就可以根據(jù)這一Physical Plan互相建立連接形成一個完全圖,然后開始處理數(shù)據(jù)。
Instance進行具體的Tuple數(shù)據(jù)計算處理。Stream Manager則不執(zhí)行具體的計算處理任務(wù),只負(fù)責(zé)中繼轉(zhuǎn)發(fā)Tuple。從數(shù)據(jù)流網(wǎng)絡(luò)的角度,可以把Stream Manager理解為每個容器的路由器。所有Instance之間的Tuple傳遞都是通過Stream Manager中繼。因此容器內(nèi)的Instance之間通信是一跳(hop)的星形網(wǎng)絡(luò)。所有的Stream Manager都互相連接,形成Mesh網(wǎng)絡(luò)。容器之間的通信也是通過Stream Manager中繼的,是通過兩跳的中繼完成的。
核心組件分析
TMaster
TMaster是Topology Master的簡寫。與很多Master-Slave模式分布式系統(tǒng)中的Master單點處理控制邏輯的作用相同,TMaster作為Master角色提供了一個全局的接口來了解Topology的運行狀態(tài)。同時,通過將重要的狀態(tài)信息(Physical Plan)等記錄到ZooKeeper中,保證了TMaster在崩潰恢復(fù)之后能繼續(xù)運行。
實際產(chǎn)品中的TMaster在啟動的時候,會在ZooKeeper的某一約定目錄中創(chuàng)建一個Ephemeral Node來存儲自己的IP地址以及端口,讓Stream Manager能發(fā)現(xiàn)自己。Heron使用Ephemeral Node的原因包括:
-
避免了一個Topology出現(xiàn)多個TMaster的情況。這樣就使得這個Topology的所有進程都能認(rèn)定同一個TMaster;
-
同一Topology內(nèi)部的進程能夠通過ZooKeeper來發(fā)現(xiàn)TMaster所在的位置,從而與其建立連接。
TMaster主要有以下三個功能:
-
構(gòu)建、分發(fā)并維護Topology的Physical Plan;
-
收集各個Stream Manager的心跳,確認(rèn)Stream Manager的存活;
-
收集和分發(fā)Topology部分重要的運行時狀態(tài)指標(biāo)(Metrics)。
由于Topology的Physical Plan只有在運行時才能確定,因此TMaster就成為了構(gòu)建、分發(fā)以及維護Physical Plan的最佳選擇。在TMaster完成啟動和向ZooKeeper注冊之后,會等待所有的Stream Manager與自己建立連接。在Stream Manager與TMaster建立連接之后,Stream Manager會報告自己的實際IP地址、端口以及自己所負(fù)責(zé)的Instance地址與端口。TMaster在收到所有Stream Manager報告的地址信息之后就能構(gòu)建出Physical Plan并進行廣播分發(fā)。所有的Stream Manager都會收到由TMaster構(gòu)建的Physical Plan,并且根據(jù)其中的信息與其余的Stream Manager建立兩兩連接。只有當(dāng)所有的連接都建立完成之后,Topology才會真正開始進行數(shù)據(jù)的運算和處理。當(dāng)某一個Stream Manager丟失并重連之后,TMaster會檢測其運行地址及端口是否發(fā)生了改變;若改變,則會及時地更新Physical Plan并廣播分發(fā),使Stream Manager能夠建立正確的連接,從而保證整個Topology的正確運行。
TMaster會接受Stream Manager定時發(fā)送的心跳信息并且維護各個Stream Manager的最近一次心跳時間戳。心跳首先能夠幫助TMaster確認(rèn)Stream Manager的存活,其次可以幫助其決定是否更新一個Stream Manager的連接并且更新Physical Plan。
TMaster還會接受由Metrics Manager發(fā)送的一部分重要Metrics并且向Heron-Tracker提供這些Metrics。Heron-Tracker可以通過這些Metrics來確定Topology的運行情況并使得Heron-UI能夠基于這些重要的Metrics來進行監(jiān)控檢測。典型的Metrics有:分發(fā)Tuple的次數(shù),計算Tuple的次數(shù)以及處于backpressure狀態(tài)的時間等。
非常值得注意的一點是,TMaster本身并不參與任何實際的數(shù)據(jù)處理。因此它也不會接受和分發(fā)任何的Tuple。這一設(shè)計使得TMaster本身邏輯清晰,也非常輕量,同時也為以后功能的拓展留下了巨大的空間。
Stream Manager 和反壓(Back pressure)機制
Stmgr是Stream Manager的簡寫。Stmgr管理著Tuple的路由,并負(fù)責(zé)中繼Tuple。當(dāng)Stmgr拿到Physical Plan以后就能根據(jù)其中的信息知道與其余的Stmgr建立連接形成Mesh網(wǎng)絡(luò),從而進行數(shù)據(jù)中繼以及Backpressure控制。Tuple傳遞路徑可以通過圖3來說明,圖3中容器1的Instance D(1D)要發(fā)送一個Tuple給容器4中的Instance C(4C),這個Tuple經(jīng)過的路徑為:容器1的1D,容器1的Stmgr,容器4的Stmgr,容器4的4C。又比如從3A到3B的Tuple經(jīng)過的路徑為:3A,容器3的Stmgr,3B。與Internet的路由機制對比,Heron的路由非常簡單,這得益于Stmgr之間兩兩相連,使得所有的Instance之間的距離不超過2跳。
圖3 Tuple發(fā)送路徑示例
Acking
Stmgr除了路由中繼Tuple的功能以外,它還負(fù)責(zé)確認(rèn)(Acking)Tuple已經(jīng)被處理。Acking的概念在Heron的前身Storm中已經(jīng)存在。Acking機制的目的是為了實現(xiàn)At-Least-Once的語義。原理上,當(dāng)一個Bolt實例處理完一個Tuple以后,這個Bolt實例發(fā)送一個特殊的Acking Tuple給這個bolt的上游Bolt實例或者Spout實例,向上游結(jié)點確認(rèn)Tuple已經(jīng)處理完成。這個過程層層向上游結(jié)點推進,直到Spout結(jié)點。實現(xiàn)上,當(dāng)Acking Tuple經(jīng)過Stmgr時候由異或(xor)操作標(biāo)記Tuple,由異或操作的特性得知是否處理完成。當(dāng)一個Spout實例在一定時間內(nèi)還沒有收集到Acking Tuple,那么它將重發(fā)對應(yīng)的數(shù)據(jù)Tuple。Heron的Acking機制的實現(xiàn)與它的前任Storm一致。
Back Pressure
Heron引入了反壓(Back Pressure)機制,來動態(tài)調(diào)整Tuple的處理速度以避免系統(tǒng)過載。一般來說,解決系統(tǒng)過載問題有三種策略:1. 放任不管;2. 丟棄過載數(shù)據(jù);3. 請求減少負(fù)載。Heron采用了第三種策略,通過Backpressure機制來進行過載恢復(fù),保證系統(tǒng)不會在過載的情況下崩潰。
Backpressure機制觸發(fā)過程如下:當(dāng)某一個Bolt Instance處理速度跟不上Tuple的輸入速度時,會造成負(fù)責(zé)向該Instance轉(zhuǎn)發(fā)Tuple的Stmgr緩存不斷堆積。當(dāng)緩存大小超過一個上限值(Hight Water Mark)時,該Stmgr會停止從本地的Spout中讀取Tuple并向Topology中的其他所有Stmgr發(fā)送一個“開始Backpressure”的信息。而其余的Stmgr在接收到這一消息時也會停止從他們所負(fù)責(zé)的Spout Instance處讀取并轉(zhuǎn)發(fā)Tuple。至此,整個Topology就不再從外界讀入Tuple而只處理堆積在內(nèi)部的未處理Tuple。而處理的速度則由最慢的Instance來決定。在經(jīng)過一定時間的處理以后,當(dāng)緩存的大小減低到一個下限值(Low Water Mark)時,最開始發(fā)送“開始Backpressure”的Stmgr會再次發(fā)送“停止Backpressure”的信息,從而使得所有的Stmgr重新開始從Spout Instance讀取分發(fā)數(shù)據(jù)。而由于Spout通常是從具有允許重演(Replay)的消息隊列中讀取數(shù)據(jù),因此即使凍結(jié)了也不會導(dǎo)致數(shù)據(jù)的丟失。
注意在Backpressure的過程中兩個重要的數(shù)值:上限值(High Water Mark)和下限值(Low Water Mark)。只有當(dāng)緩存區(qū)的大小超過上限值時才會觸發(fā)Backpressure,然后一直持續(xù)到緩存區(qū)的大小減低到下限值時。這一設(shè)計有效地避免了一個Topology不停地在Backpressure狀態(tài)和正常狀態(tài)之間震蕩變化的情況發(fā)展,一定程度上保證了Topology的穩(wěn)定。
Instance
Instance是整個Heron處理引擎的核心部分之一。Topology中不論是Spout類型結(jié)點還是Bolt類型結(jié)點,都是由Instance來實現(xiàn)的。不同于Storm的Worker設(shè)計,在當(dāng)前的Heron中每一個Instance都是一個獨立的JVM進程,通過Stmgr進行數(shù)據(jù)的分發(fā)接受,完成用戶定義的計算任務(wù)。獨立進程的設(shè)計帶來了一系列的優(yōu)點:便于調(diào)試、調(diào)優(yōu)、資源隔離以及容錯恢復(fù)等。同時,由于數(shù)據(jù)的分發(fā)傳送任務(wù)已經(jīng)交由Stmgr來處理,Instance可以用任何編程語言來進行實現(xiàn),從而支持各種語言平臺。
Instance采用雙線程的設(shè)計,如圖4所示。一個Instance的進程包含Gateway以及Task Execution這兩個線程。Gateway線程主要控制著Instance與本地Stmgr和Metrics Manager之間的數(shù)據(jù)交換。通過TCP連接,Gateway線程:1. 接受由Stmgr分發(fā)的待處理Tuple;2. 發(fā)送經(jīng)Task Execution處理的Tuple給Stmgr;3. 轉(zhuǎn)發(fā)由Task Execution線程產(chǎn)生的Metrics給Metrics Manager。不論是Spout還是Bolt,Gateway線程完成的任務(wù)都相同。
Task Execution線程的職責(zé)是執(zhí)行用戶定義的計算任務(wù)。對于Spout和Bolt,Task Execution線程會相應(yīng)地去執(zhí)行open()和prepare()方法來初始化其狀態(tài)。如果運行的Instance是一個Bolt實例,那么Task Execution線程會執(zhí)行execute()方法來處理接收到的Tuple;如果是Spout,則會重復(fù)執(zhí)行nextTuple()方法來從外部數(shù)據(jù)源不停地獲取數(shù)據(jù),生成Tuple,并發(fā)送給下游的Instance進行處理。經(jīng)過處理的Tuple會被發(fā)送至Gateway線程進行下一步的分發(fā)。同時在執(zhí)行的過程中,Task Execution線程會生成各種Metrics(tuple處理數(shù)量,tuple處理延遲等)并發(fā)送給Metrics Manager進行狀態(tài)監(jiān)控。
圖4 Instance結(jié)構(gòu)
Gateway線程和Task Execution線程之間通過三個單向的隊列來進行通信,分別是數(shù)據(jù)進入隊列、數(shù)據(jù)發(fā)送隊列以及Metrics發(fā)送隊列。Gateway線程通過數(shù)據(jù)進入隊列向Task Execution線程傳入Tuple;Task Execution通過數(shù)據(jù)發(fā)送隊列將處理完的Tuple發(fā)送給Gateway線程;Task Execution線程通過Metrics發(fā)送隊列將收集的Metric發(fā)送給Gateway線程。
總結(jié)
在本文中,我們介紹了流計算的背景和重要概念,并且詳細分析了Twitter目前的流計算引擎—— Heron的結(jié)構(gòu)及重要組件。希望能借此為大家提供一些在設(shè)計和構(gòu)建流計算系統(tǒng)時的經(jīng)驗,也歡迎大家向我們提供建議和幫助。如果大家對Heron的開發(fā)和改進感興趣,可以在Github上進行查看。
【1】Kulkarni, Sanjeev, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, Jignesh M. Patel, Karthik Ramasamy, and Siddarth Taneja. "Twitter heron: Stream processing at scale." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239-250. ACM, 2015.
【2】Maosong Fu, Ashvin Agrawal, Avrilia Floratou, Bill Graham, Andrew Jorgensen, Mark Li, Neng Lu, Karthik Ramasamy, Sriram Rao and Cong Wang. "Twitter Heron: Towards Extensible Streaming Engines." In 2017 International Conference on Data Engineering (ICDE). IEEE, 2017.
呂能,Twitter實時計算平臺團隊成員。專注于分布式系統(tǒng),曾參與過Twitter的Manhattan鍵值存儲系統(tǒng),Obs監(jiān)控警報系統(tǒng)的開發(fā),目前負(fù)責(zé)Heron的開發(fā)研究。曾在國際頂級期刊和會議發(fā)表多篇學(xué)術(shù)論文。?
吳惠君,Twitter軟件工程師,致力于實時流處理引擎Heron的研究和開發(fā)。他畢業(yè)于Arizona State University,專攻大數(shù)據(jù)處理和移動云計算,曾在國際頂級期刊和會議發(fā)表多篇學(xué)術(shù)論文,并有多項專利。?
符茂松,Twitter實時計算平臺團隊主管,負(fù)責(zé)Heron, Presto等服務(wù)。Heron的原作者之一。專注于分布式系統(tǒng),在SIGMOD、ICDE等會議期刊發(fā)表多篇論文。本科畢業(yè)于華中科技大學(xué),研究生畢業(yè)于Carnegie Mellon University。?
本文為《程序員》原創(chuàng)文章,未經(jīng)允許禁止轉(zhuǎn)載
總結(jié)
以上是生活随笔為你收集整理的深度揭秘Twitter的新一代流处理引擎Heron的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深圳人口为什么负增长
- 下一篇: 体育彩票前区中4位号码中4个号码多少奖金