日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing

發(fā)布時間:2025/4/16 67 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Abstract

彈性非常適用于流系統(tǒng),以保證針對工作負(fù)載動態(tài)的低延遲,例如到達(dá)率的激增和數(shù)據(jù)分布的波動。現(xiàn)有系統(tǒng)使用以resource-centric的方法實現(xiàn)彈性,該方法在并行實例(即執(zhí)行程序)之間重新分配Key,以平衡工作負(fù)載和擴(kuò)展Operator。然而,這種Operator級別的重新分區(qū)需要全局同步并且禁止快速彈性。我們提出了一種以executor-centric的方法,它避免了Operator級別的Key數(shù)據(jù)重新分區(qū),并將執(zhí)行程序作為彈性的構(gòu)建塊。通過這種新方法,我們設(shè)計了具有兩級優(yōu)化的Elasticutor框架:i)執(zhí)行器的新穎實現(xiàn),即elastic executors,通過有效的executror內(nèi)負(fù)載均衡和executor擴(kuò)展來執(zhí)行彈性多核執(zhí)行,以及ii)a基于全局模型的調(diào)度程序,可根據(jù)瞬時工作負(fù)載為執(zhí)行程序動態(tài)分配CPU內(nèi)核。我們實施了一個原型Elasticutor并進(jìn)行了大量實驗。我們表明,與實際應(yīng)用程序的動態(tài)工作負(fù)載方法相比,Elasticutor的吞吐量增加了一倍,延遲降低了兩個數(shù)量級。

1. Introduction

?????? 分布式流系統(tǒng)[8,12,40,43,45,50,51]實現(xiàn)了對連續(xù)流的實時數(shù)據(jù)處理,并已廣泛用于欺詐檢測,監(jiān)控分析和定量融資等應(yīng)用。 在這樣的系統(tǒng)中,應(yīng)用程序邏輯被建模為計算圖,其中每個頂點表示與用戶定義的處理邏輯相關(guān)聯(lián)的Operator,并且每個邊指定運(yùn)算符之間的數(shù)據(jù)流的輸入-輸出關(guān)系。 為了實現(xiàn)大規(guī)模數(shù)據(jù)處理,輸入數(shù)據(jù)流通常被定義為通過key 空間分區(qū)到下游子分區(qū)中。并行執(zhí)行實例(即執(zhí)行程序)以將每個key的子空間靜態(tài)綁定到一定量的計算資源,通常是CPU核心。 結(jié)果,每個執(zhí)行器可以獨立地進(jìn)行與其key SubSpace相關(guān)聯(lián)的計算。

?????? 然而,在股票交易和視頻分析等實際應(yīng)用中,工作量隨時間波動很大,導(dǎo)致嚴(yán)重的性能下降[15,39]。 從時間角度來看,發(fā)送到Operator的總工作量可能會在短時間內(nèi)顯著激增,例如10秒,這使得Operator成為整個處理流程的瓶頸。 從空間角度來看,Key空間上的工作負(fù)載分布可能不穩(wěn)定,導(dǎo)致執(zhí)行程序中的工作負(fù)載偏差,其中一些CPU利用率較低,而另一些則過載。 為了適應(yīng)工作負(fù)載波動,先前的工作[14,15,39,41]提出了實現(xiàn)彈性的解決方案,即Operator擴(kuò)展和負(fù)載平衡。 所有這些解決方案都以資源為中心,因為執(zhí)行程序受特定資源的約束,并且通過跨執(zhí)行程序動態(tài)地重新分配Key來實現(xiàn)彈性。

?????? 圖1(a)說明了由于工作負(fù)載分配不平衡而導(dǎo)致執(zhí)行程序過載的情況。為了減輕性能瓶頸,重新分區(qū)Key空間,以便將重載執(zhí)行程序中的一定數(shù)量的工作負(fù)載與相應(yīng)的Key一起遷移到負(fù)載較輕的執(zhí)行程序。但是,這個過程需要一個耗時的協(xié)議[15,39]來維持狀態(tài)一致性。特別是,系統(tǒng)需要執(zhí)行以下操作:(a)阻止上游執(zhí)行程序向下游發(fā)送元組; (b)等待所有飛行中的元組進(jìn)行處理; (c)根據(jù)新的Key空間劃分,將狀態(tài)遷移到新的key space分區(qū)匯總; (d)更新上游執(zhí)行者的路由表;最后(e)恢復(fù)向下游發(fā)送元組的上游執(zhí)行者。由于Operator間路由更新和執(zhí)行器間狀態(tài)遷移都需要昂貴的全局同步,因此Key Space重新分配可能持續(xù)數(shù)秒,在此期間無法處理新的傳入元組并導(dǎo)致嚴(yán)重延遲。

?????? 為了實現(xiàn)快速彈性,我們提出了一種以執(zhí)行者為中心的范式。 核心思想是在執(zhí)行程序之間靜態(tài)劃分Operator的Key Space,但根據(jù)其瞬時工作負(fù)載動態(tài)地為每個執(zhí)行程序分配CPU核心。 圖1(b)說明了新方法不是對Key Space進(jìn)行重新分區(qū),而是通過將CPU內(nèi)核從較輕負(fù)載的執(zhí)行程序重新分配給過載的執(zhí)行程序來平衡工作負(fù)載。 由于每個執(zhí)行器擁有固定Key SubSpace,新方法實現(xiàn)了Operator間獨立性,即上游Operator不需要與下游Operator同步,并且執(zhí)行器間獨立性,即與Key SubSpace相關(guān)聯(lián)的狀態(tài)不需要遷移跨執(zhí)行者。 換句話說,這種新方法優(yōu)雅地解耦了Operator Key Space重新分區(qū)和計算資源的動態(tài)供應(yīng)之間的綁定。

?????? 基于以執(zhí)行器為中心的方法,我們設(shè)計了具有兩個優(yōu)化級別的Elasticutor框架。 在執(zhí)行程序級別,作為輕量級分布式子系統(tǒng)實現(xiàn),每個彈性執(zhí)行程序在其分配的CPU核心上均勻分配其工作負(fù)載,并在調(diào)度程序分配/取消分配CPU核心時快速擴(kuò)展。 在整體層面,基于模型的動態(tài)調(diào)度程序設(shè)計用于根據(jù)測量的性能指標(biāo)Metrics優(yōu)化核心到執(zhí)行程序的分配,以便以最小的狀態(tài)遷移開銷和最大的計算局部性來適應(yīng)工作負(fù)載動態(tài)。 我們實現(xiàn)了Elasticutor的原型,并使用合成和真實數(shù)據(jù)集進(jìn)行了大量的實驗。 結(jié)果表明,Elasticutor使吞吐量翻倍,并且比現(xiàn)有方法實現(xiàn)了更低的延遲。

?????? 本文的其余部分安排如下。 第2節(jié)介紹了以執(zhí)行者為中心的范例,并概述了Elasticutor框架。 第3節(jié)和第4節(jié)分別介紹了彈性執(zhí)行器和動態(tài)調(diào)度器的設(shè)計。 第5節(jié)討論了實驗結(jié)果。 第6節(jié)回顧了相關(guān)工作。 第7節(jié)總結(jié)了論文。

2. PARADIGM AND FRAMEWORK

2.1 Basic Concepts

?????? 我們考慮在由快速網(wǎng)絡(luò)設(shè)備連接的稱為節(jié)點的機(jī)器群集上的實時有狀態(tài)流處理系統(tǒng)。流是一個無限的序列。來自輸入流的元組連續(xù)到達(dá)系統(tǒng)并立即處理。用戶應(yīng)用程序被建模為計算的有向圖,稱為Topology,其中頂點是具有用戶定義的處理邏輯的運(yùn)算符,并且邊表示運(yùn)算符之間的處理序列。對于每對相鄰Operator,流的元組由上游Operator生成并由下游Operator消費。在有狀態(tài)計算中,Operator維護(hù)內(nèi)部狀態(tài),該狀態(tài)用于計算并將在輸入元組的處理期間更新。為了分配和并行化計算,操作符的狀態(tài)被實現(xiàn)為在key space上定義的可分割數(shù)據(jù)結(jié)構(gòu)。系統(tǒng)將key space劃分為sub space,并創(chuàng)建一個稱為執(zhí)行程序的并行實例,每個實例具有相同的數(shù)據(jù)處理邏輯。為了保證在這樣的分布式系統(tǒng)上維護(hù)的狀態(tài)的一致性,需要將元組正確路由到下游執(zhí)行器。因為以不同順序處理相同的輸入元組序列可能導(dǎo)致不同的輸出元組和狀態(tài),所以有狀態(tài)計算中的另一個基本要求是按到達(dá)順序處理相同key的元組。

?????? 流處理工作負(fù)載通常是動態(tài)的,因為對Operator的輸入速率和元組的key分配隨時間波動。為了保證動態(tài)工作負(fù)載下的性能,應(yīng)該向Operator適當(dāng)?shù)靥峁┯嬎阗Y源,即CPU核心,以便確保1)Operator擴(kuò)展,即CPU核心根據(jù)其工作負(fù)載動態(tài)地分配給Operator; 2)負(fù)載平衡,即每個Operator的工作負(fù)載均勻分布在分配的CPU核心上。如果不實現(xiàn)前者,一些Operator可能會過載或過度配置,分別成為性能瓶頸或浪費計算資源。如果不實現(xiàn)后者,一些CPU內(nèi)核將會過載,而其他CPU內(nèi)核將未得到充分利用,從而導(dǎo)致性能下降。我們將Operator縮放和負(fù)載平衡的機(jī)制稱為彈性。為了在動態(tài)工作負(fù)載下保持高性能,快速彈性是一項至關(guān)重要的要求。

2.2 The Executor-Centric Paradigm

?????? 表1總結(jié)了現(xiàn)有兩種彈性范式的主要特征:靜態(tài)和以資源為中心的方法。 靜態(tài)方法使用固定數(shù)量的執(zhí)行程序?qū)崿F(xiàn)每個Operator,并使用靜態(tài)Operator Key Space在執(zhí)行程序之間分配工作負(fù)載。 每個執(zhí)行程序由綁定到指定CPU核心的單個數(shù)據(jù)處理線程組成。 由于靜態(tài)Key分區(qū)和CPU內(nèi)核與執(zhí)行器的一對一綁定,靜態(tài)方法簡化了系統(tǒng)實現(xiàn),并在大多數(shù)最先進(jìn)的系統(tǒng)中采用[30,43]。 但是,由于既不能平衡分配的CPU內(nèi)核的工作負(fù)載,也不能調(diào)整分配給特定Operator的CPU內(nèi)核數(shù)量,因此這種方法對分區(qū)模式非常敏感,并且由于缺乏彈性而在動態(tài)工作負(fù)載下效率低下。

?????? 以資源為中心的方法通過支持動態(tài)Operator級Key分區(qū)來解決靜態(tài)方法的限制,同時遵循與靜態(tài)方法相同的執(zhí)行程序?qū)崿F(xiàn)。 憑借Operator級Key重新分區(qū)的功能,以資源為中心的方法實現(xiàn)了彈性,因為它可以將一些Key及其相應(yīng)的工作負(fù)載從重載執(zhí)行程序遷移到負(fù)載較輕的執(zhí)行程序,以平衡工作負(fù)載,或從現(xiàn)有執(zhí)行程序遷移到新創(chuàng)建的執(zhí)行者以擴(kuò)展Operator。 但是,如引言部分所述,此Operator級Key重新分區(qū)是一個耗時的過程,在此過程中需要昂貴的全局同步來遷移狀態(tài)并更新所有上游執(zhí)行程序的路由表。 因此,以資源為中心的方法不能實現(xiàn)快速彈性,只能解決非常有限的工作量動態(tài)。

?????? 為了實現(xiàn)快速彈性,我們提出了一種新的執(zhí)行范式:以executor-centric的方法。我們的想法來自觀察到Operator級別的Key重新分區(qū)太昂貴而無法實現(xiàn)快速彈性。因此,以執(zhí)行器為中心的方法使用靜態(tài)Operator級別的Key分區(qū),但將每個執(zhí)行程序?qū)崿F(xiàn)為彈性的構(gòu)建塊以處理工作負(fù)載波動。特別是,每個執(zhí)行程序都旨在通過動態(tài)創(chuàng)建或刪除數(shù)據(jù)處理線程來利用各種計算資源。因此,為了實現(xiàn)負(fù)載平衡和Operator擴(kuò)展,系統(tǒng)可以為每個彈性執(zhí)行器動態(tài)分配適當(dāng)數(shù)量的CPU內(nèi)核,而不是執(zhí)行昂貴的Operator Key重新分區(qū)。與Operator Key重新分區(qū)相比,可以有效地實現(xiàn)CPU內(nèi)核的重新分配和內(nèi)部執(zhí)行器負(fù)載平衡,因為它們不需要任何Operator間或執(zhí)行器間同步。有趣的是,我們的新方法通過避免全局同步實現(xiàn)了快速彈性。

2.3 Overview of Elasticutor Framework

?????? 遵循以執(zhí)行者為中心的方法,我們設(shè)計了專注于支持有狀態(tài)流處理的Elasticutor。 為了處理流系統(tǒng)上的大規(guī)模數(shù)據(jù),我們假設(shè)數(shù)據(jù)和狀態(tài)是在Key Space下定義的,基于哪些分區(qū)數(shù)據(jù)流和狀態(tài)可以由分布式計算單元并行處理和維護(hù)。 我們假設(shè)Key Space足夠細(xì)粒度,以便甚至可以在越來越多的計算資源(即CPU核心)上分配和平衡扭曲的工作負(fù)載。 對于像Heron,Flink和Samza這樣的其他最先進(jìn)的流處理系統(tǒng),還需要這種假設(shè)來實現(xiàn)高度并行化的有狀態(tài)流處理。

?????? 我們的設(shè)計目標(biāo)是實現(xiàn)實時響應(yīng),歸結(jié)為保證低延遲。 然而,過度延遲可能是由于較高的數(shù)據(jù)到達(dá)率導(dǎo)致的系統(tǒng)資源不足,或?qū)е鹿ぷ髫?fù)載不平衡的低效資源分配和調(diào)度。 前者需要資源縮放,而后者則不需要。 基于關(guān)注點分離原則,我們將Elasticutor設(shè)計為兩級架構(gòu),如圖2所示。

?????? 高級調(diào)度程序(在第4節(jié)中描述)處理可能在一段時間內(nèi)激增的動態(tài)工作負(fù)載,在此期間現(xiàn)有系統(tǒng)容量不足并需要擴(kuò)展。不需要過度配置,但我們假設(shè)可以從基于云的平臺按需獲取資源。我們假設(shè)工作負(fù)載的總體激增不會太頻繁發(fā)生,例如,在幾分鐘到幾小時的時間范圍內(nèi)。動態(tài)調(diào)度程序確定每個彈性執(zhí)行程序應(yīng)在瞬時工作負(fù)載下提供的所需CPU核心數(shù)。它采用基于排隊網(wǎng)絡(luò)的性能模型,并使用彈性執(zhí)行器的收集性能Metrics指標(biāo)作為輸入來生成資源分配決策。根據(jù)現(xiàn)有的核心到執(zhí)行器分配和集群中CPU核心的可用性,調(diào)度程序改進(jìn)了分配以適應(yīng)新的資源分配計劃,同時考慮了CPU重新分配開銷和計算資源的位置。

?????? 每個low-level執(zhí)行程序(在第3節(jié)中描述)被設(shè)計為一個輕量級,自包含的分布式子系統(tǒng),稱為彈性執(zhí)行程序,負(fù)責(zé)處理固定key subspace下的輸入。 為適應(yīng)工作負(fù)載波動,彈性執(zhí)行器可以使用動態(tài)數(shù)量的CPU核心,可能來自多個節(jié)點,由動態(tài)調(diào)度程序決定。 為了在工作負(fù)載波動的情況下充分利用其分配的CPU內(nèi)核,彈性執(zhí)行器具有高效的內(nèi)部負(fù)載平衡機(jī)制,可以在更短的時間范圍內(nèi)將分配的CPU內(nèi)核的輸入流的計算均勻分布。

?????? 以有狀態(tài)處理為目標(biāo)的流處理系統(tǒng)的設(shè)計空間還包括諸如狀態(tài)大小和數(shù)據(jù)流特征之類的維度,即每元組的計算和大小,以及key space下數(shù)據(jù)流的傾斜程度和動態(tài)性。我們將討論權(quán)衡Elasticutor與第5節(jié)中的變更方法進(jìn)行比較。

3. ELASTIC EXECUTOR

?????? 為了有效地利用CPU資源,彈性執(zhí)行器被設(shè)計為適應(yīng)兩種動態(tài):1)key分配的變化和2)CPU core重新分配如圖3所示。前者來自輸入流的波動,而后者由調(diào)度程序確定全局優(yōu)化。 為了在其計算資源上分配工作負(fù)載,彈性執(zhí)行程序為每個分配的CPU核心創(chuàng)建任務(wù),并在其上分配輸入數(shù)據(jù)元組。 在CPU重新分配時,將創(chuàng)建新任務(wù)或刪除現(xiàn)有任務(wù)。 這兩種動態(tài)都會在任務(wù)之間引入不平衡的工作負(fù)載,從而導(dǎo)致資源利用不足或性能下降。 因此,一個中心設(shè)計問題是如何在存在這種動態(tài)的情況下在任務(wù)之間保持平衡的工作負(fù)載分配。

3.1 Components and Working Mechanism

?????? 如圖4所示,彈性執(zhí)行器被實現(xiàn)為輕量級,自包含的分布式子系統(tǒng),其可以利用多個物理節(jié)點上的計算資源。 每個彈性執(zhí)行程序主要駐留在一個稱為其本地節(jié)點的物理節(jié)點中,在該節(jié)點中,它運(yùn)行本地主進(jìn)程以接收輸入元組并發(fā)送輸出元組。 對于每個分配的CPU核心,在該過程中創(chuàng)建作為數(shù)據(jù)處理線程實現(xiàn)的任務(wù)。 要在遠(yuǎn)程節(jié)點上使用CPU核心,可以創(chuàng)建遠(yuǎn)程進(jìn)程來托管遠(yuǎn)程數(shù)據(jù)處理的遠(yuǎn)程任務(wù)。

Intra-Executor Routing: 我們采用雙層設(shè)計,在圖4中央結(jié)構(gòu)中顯示的路由表中實現(xiàn),根據(jù)瞬時工作負(fù)載分配動態(tài)地將輸入元組映射到任務(wù)。第一層使用靜態(tài)散列函數(shù)將key Sub Space靜態(tài)劃分為Shard;第二層顯式維護(hù)動態(tài)Shard到任務(wù)映射,該映射在Shard重新分配時更新。我們在粗粒度而非每個Key的基礎(chǔ)上平衡工作負(fù)載,主要是因為細(xì)粒度方法需要維護(hù)每個Key的工作負(fù)載,因此遭受高內(nèi)存消耗。Shard數(shù)量的選擇提供了負(fù)載平衡質(zhì)量和維護(hù)開銷之間的權(quán)衡。然而,在實踐中,合理數(shù)量的Shard(例如,任務(wù)數(shù)量的4或8倍)實現(xiàn)了良好的平衡質(zhì)量,同時保持低維護(hù)開銷。我們將在5.3節(jié)討論Shard數(shù)如何影響某些極端設(shè)置中的系統(tǒng)性能。

Executor-Level Fault Tolerance在流處理系統(tǒng)中已經(jīng)廣泛研究了容錯[8,11,14,35,49],因此既不是本文的重點也不是本文的貢獻(xiàn)。 在這里,我們只討論如何從故障中恢復(fù)每個彈性執(zhí)行器的遠(yuǎn)程任務(wù),以便Elasticutor可以利用最先進(jìn)的狀態(tài)檢查點技術(shù),如流水線快照協(xié)議[11],實現(xiàn)容錯。

?????? 彈性執(zhí)行程序的主要過程在邏輯上維護(hù)其任務(wù)狀態(tài)的主副本。默認(rèn)情況下,狀態(tài)在主進(jìn)程的內(nèi)存中維護(hù)以進(jìn)行高效訪問,但是當(dāng)狀態(tài)太大而無法容納在內(nèi)存中時,也可以將狀態(tài)存儲在外部存儲中。主進(jìn)程還用增加的元組ID標(biāo)記每個輸入數(shù)據(jù)元組。對于每個遠(yuǎn)程任務(wù)T,彈性執(zhí)行程序維護(hù)一個待處理的元組隊列以備份發(fā)送到T的數(shù)據(jù)元組,并且值ts表示已經(jīng)處理了ID小于ts的所有數(shù)據(jù)元組,并且處理這些元組產(chǎn)生的狀態(tài)更新已被沖回主副本。每個遠(yuǎn)程任務(wù)周期性地,例如每10秒,將其本地狀態(tài)的更新與tmax(即,已經(jīng)處理的元組的最大ID)一起發(fā)送到彈性執(zhí)行器的主進(jìn)程。在從遠(yuǎn)程任務(wù)T接收到狀態(tài)更新時,彈性執(zhí)行器更新狀態(tài)的主副本,在T的待處理隊列中移除ID不大于tmax的元組,并更新ts = tmax。當(dāng)遠(yuǎn)程任務(wù)T失敗時,彈性執(zhí)行程序使用狀態(tài)的主副本創(chuàng)建新任務(wù),并通過在待處理隊列T中重放大于ts的ID的元組來開始執(zhí)行新任務(wù)。

3.2 Consistent Workload Redistribution

?????? 雖然狀態(tài)共享提高了Shard重新分配的效率,但需要注意保證一致性。 一般而言,盡管使用與以資源為中心的方法的Key重新分區(qū)類似的過程,我們通過利用以執(zhí)行器為中心的方法啟用的Operator間和執(zhí)行者間的獨立性來實現(xiàn)具有狀態(tài)一致性的有效分片重新分配。

?????? 考慮圖4中的情況,其中元組t1處于任務(wù)T2的待處理隊列中,元組t2剛剛到達(dá)執(zhí)行器的主進(jìn)程,并且元組t3將由上游執(zhí)行器發(fā)出。 假設(shè)所有三個元組都屬于shard r4。 如果在處理t1之前或在更新t2和t3的路由之前將shard r4從源任務(wù)T2重新分配給新的目標(biāo)任務(wù),則狀態(tài)將變得不一致。 特別地,如果目的地任務(wù)是本地的,例如T1,那么t2可能在t1之前被處理,違反了順序處理的要求。 如果目的地任務(wù)是遠(yuǎn)程的,例如T0,則由t1對狀態(tài)的修改將丟失。

Inter-Operator Consistent Routing為了保證從上游Operator到分配的任務(wù)所在的正確進(jìn)程的一致路由(例如t3),彈性執(zhí)行器在其本地主進(jìn)程中實現(xiàn)接收器守護(hù)進(jìn)程,作為來自上游Operator的所有元組的單一入口。接收器根據(jù)內(nèi)部路由表將元組路由到適當(dāng)?shù)谋镜鼗蜻h(yuǎn)程任務(wù)。類似地,發(fā)射器守護(hù)程序在主進(jìn)程中實現(xiàn)為執(zhí)行程序的單個退出,以將任務(wù)生成的輸出元組轉(zhuǎn)發(fā)給下游Operator。遠(yuǎn)程過程僅與彈性執(zhí)行器的主過程上的接收器和發(fā)射器通信。因此,無論在彈性執(zhí)行器中的任務(wù)之間如何動態(tài)重新分配分片,上游和下游Operator總是通過其接收器和發(fā)送器向執(zhí)行器發(fā)送元組或從接收器接收元組,從而避免由分片重新分配引起的任何Operator間同步。相反,以資源為中心的方法通過Operator級key space重新分區(qū)來重新分配工作負(fù)載,從而導(dǎo)致與所有上游執(zhí)行程序同步。

?????? 請注意,與以資源為中心的方法相比,來自上游執(zhí)行器的元組直接路由到下游Operator,Elasticutor可能涉及接收器/發(fā)射器和遠(yuǎn)程任務(wù)之間的額外遠(yuǎn)程數(shù)據(jù)傳輸。 這是我們?yōu)閷崿F(xiàn)快速彈性而做出的權(quán)衡。 在典型的工作負(fù)載中,遠(yuǎn)程數(shù)據(jù)傳輸不是性能瓶頸,如圖13所示。在5.3節(jié)中,我們通過正確配置Operator的執(zhí)行器數(shù)量,討論如何避免/減少某些極端工作負(fù)載中的遠(yuǎn)程數(shù)據(jù)傳輸。

Intra-Executor State Consistency: 為了保證在重新分區(qū)Shard期間的狀態(tài)一致性,彈性檢測器采用類似于以資源為中心的方法中使用的Operator級重新分區(qū)的Key重新分區(qū)過程,但不涉及任何全局同步。關(guān)鍵是要確保a)在將shard狀態(tài)遷移到目標(biāo)任務(wù)之前必須處理掛起的元組,即SOurce任務(wù)中排隊的分片的未處理元組。 b)具有相同Key的元組不會同時在任何兩個任務(wù)中處理。在圖4中Shard r4的重新分配期間,暫停了元組r4的路由,并將標(biāo)簽元組發(fā)送到其Source taskT2。由于任務(wù)以先來先服務(wù)的方式處理其輸入元組,因此當(dāng)T2從其待處理隊列中拉出標(biāo)簽元組時,保證已經(jīng)發(fā)送到T2的任何待處理元組唄處理。之后,r4的狀態(tài)將遷移到目標(biāo)任務(wù)。如果將分片重新分配給其源任務(wù)的本地任務(wù),則省略狀態(tài)遷移。在狀態(tài)遷移之后,在恢復(fù)r4的元組的路由之前,在路由表中更新Shard到任務(wù)映射。

Discussions值得注意的是,我們提出的以執(zhí)行為中心的范例適用于其他現(xiàn)有的分布式流系統(tǒng),例如Apache Flink,Apache Heron和Apache Samza,其中有狀態(tài)處理可以通過在key space下劃分狀態(tài)和數(shù)據(jù)來并行化。 對于無狀態(tài)應(yīng)用程序,我們的方法仍然可以應(yīng)用,但可能不一定是最佳選擇,因為通過簡單地以循環(huán)方式發(fā)送元組或者向負(fù)載最少的執(zhí)行程序發(fā)送元組可以輕松實現(xiàn)負(fù)載平衡。

雖然我們的方法不適用于基于批處理的系統(tǒng),但我們的雙層負(fù)載平衡設(shè)計與小批量定向的Spark Streaming [50]采用的方法有一些相似之處。 兩個主要的區(qū)別是:1)我們設(shè)計一個額外的中間層碎片提供了維護(hù)成本和平衡負(fù)載之間的權(quán)衡,2)我們的測量和平衡設(shè)計對流量系統(tǒng)更自然,其中操作過程輸入元組 而不是基于小批量。

4. DYNAMIC SCHEDULER

?????? 動態(tài)調(diào)度程序的目標(biāo)是通過在不斷變化的工作負(fù)載下自適應(yīng)地將CPU核心分配給彈性執(zhí)行程序來滿足用戶定義的延遲要求。 通過使用由系統(tǒng)測量的瞬時性能指標(biāo),調(diào)度程序首先根據(jù)排隊網(wǎng)絡(luò)模型估計每個執(zhí)行程序所需的核心數(shù)量,并進(jìn)一步(重新)將物理核心分配給執(zhí)行者,以便最小化重新分配開銷 并最大化執(zhí)行程序內(nèi)的計算位置。

4.1 Model-Based Resource Allocation

?????? 我們將m個彈性執(zhí)行器的拓?fù)銭 = {1,··,m}建模為Jackson網(wǎng)絡(luò),其中每個執(zhí)行器j∈E被視為M / M / kj系統(tǒng)[42],其中kj表示分配給j的CPU核心數(shù)。 輸入流的平均處理等待時間,表示為E [T],可以作為資源分配決策k的函數(shù)來計算。

?????? 其中λ0表示輸入流的到達(dá)率,Tj和λj分別表示執(zhí)行器j的平均處理時間和到達(dá)率。 當(dāng)kj>λj/μj時,每個E [Tj](kj)是有界的,其中μj表示彈性執(zhí)行器j的處理速率,并且可以作為由系統(tǒng)測量的參數(shù)λ0,{λj}和{μj}的函數(shù)來計算。基于等式(1),調(diào)度器嘗試找到分配k以確保E [T]不大于用戶指定的等待時間目標(biāo)Tmax,同時最小化CPU核心的總數(shù)。 特別地,每個kj被初始化為λj/μj+ 1,這是使系統(tǒng)穩(wěn)定的最低要求。 我們重復(fù)向向量k中的值加1,導(dǎo)致E [T]的最顯著減少,直到E [T]≤Tmax或íkj超過可用CPU資源的數(shù)量。 這個貪心算法在找到解k時已經(jīng)證明是最優(yōu)的[22]。

4.2 CPU-to-Executor Assignment

?????? 性能模型僅建議新的分配,即每個執(zhí)行者需要的CPU核心數(shù)量,這是由工作負(fù)載波動引起的; 調(diào)度程序仍然需要通過更新現(xiàn)有的核心到執(zhí)行程序分配來適應(yīng)新的分配計劃。 新分配計劃的CPU重新分配是系統(tǒng)性能的關(guān)鍵,因為它可能會引入1)轉(zhuǎn)換期間的狀態(tài)遷移成本,以及2)之后的遠(yuǎn)程數(shù)據(jù)傳輸成本。 例如,在重新分配CPU內(nèi)核時,彈性檢測器會創(chuàng)建一個新任務(wù),如果CPU內(nèi)核遠(yuǎn)離彈性執(zhí)行程序,則涉及狀態(tài)遷移和將來的遠(yuǎn)程數(shù)據(jù)傳輸。 為了優(yōu)化執(zhí)行效率,我們搜索最小化遷移成本的CPU到執(zhí)行器分配,同時限制計算局部性以限制未來的遠(yuǎn)程數(shù)據(jù)傳輸成本。

?????? 為了模擬遷移成本,我們考慮一個n個節(jié)點的集群,其中每個節(jié)點都有ci CPU核心。對于任何執(zhí)行器j∈E,我們用I(j)表示其主進(jìn)程所在的節(jié)點,并且通過列向量xj =(x1j,...,xnj)T表示在所有節(jié)點上分配給它的核的數(shù)量。我們將jj = i = 1 xi jas定義為j的指定核心總數(shù),并用矩陣X =(x1,···,xm)表示CPU到執(zhí)行器的分配。給定任何新的分配k,從現(xiàn)有分配X到新分配X的轉(zhuǎn)換需要執(zhí)行一組CPU分配/解除分配。核心重新分配的開銷由狀態(tài)遷移成本決定,狀態(tài)遷移成本與跨網(wǎng)絡(luò)移動的狀態(tài)大小成比例。我們用sj表示任何執(zhí)行者j的聚合狀態(tài)大小。為簡單起見,我們假設(shè)彈性執(zhí)行器的分片均勻分布在分配的CPU核心上;因此,與每個CPU核心相關(guān)的狀態(tài)數(shù)據(jù)量大約是sj / Xj。給定任何分配k,可用核心c和現(xiàn)有分配X~,我們按如下方式制定CPU分配問題。

上述優(yōu)化問題最小化了從現(xiàn)有賦值X到新賦值X的遷移成本C(X | X?),其中求和中的每個項都測量執(zhí)行者j將其狀態(tài)遷移出節(jié)點i的成本。約束包括(a)CPU核心的數(shù)量,(b)分配要求和(c)計算局部性,即,要求分配給執(zhí)行器的集合E(φ)的所有核心都在其本地節(jié)點上。系統(tǒng)通過其總輸入和輸出數(shù)據(jù)速率除以核心數(shù)kj來測量任何執(zhí)行器j的瞬時每核數(shù)據(jù)強(qiáng)度,并且E(φ)表示數(shù)據(jù)強(qiáng)度高于閾值的執(zhí)行器集合。 。因為如果分配的核心是遠(yuǎn)程的,數(shù)據(jù)密集型執(zhí)行程序?qū)a(chǎn)生更高的網(wǎng)絡(luò)成本,我們通過避免將遠(yuǎn)程核心分配給E(φ)的成員來強(qiáng)制執(zhí)行計算局部性。這個整數(shù)規(guī)劃問題可以簡化為NP-hard多處理器調(diào)度問題[23]。因此,我們設(shè)計了一個有效的貪婪算法1來找到近似解。對于任何賦值X,我們將E + = {j∈E| Xj <kj}定義為欠配置執(zhí)行器的集合,E +Δ= {j∈E+∩E(φ)}作為數(shù)據(jù)密集型執(zhí)行器的子集,并且E- = {j∈E| Xj> kj}是一組過度供應(yīng)的執(zhí)行者。我們使用C + i j(X)和

C-ij(X)分別表示在節(jié)點i上分配/取消分配CPU核心到執(zhí)行器j的開銷,可以導(dǎo)出為C + ij(X)= sj(Xj-xi j)/(Xj( Xj + 1))和C-ij(X)= sj(Xj-xi j)/(Xj(Xj-1))。

算法1按數(shù)據(jù)強(qiáng)度按降序?qū) +中的執(zhí)行程序進(jìn)行排序,并嘗試通過從其他執(zhí)行程序解除分配核心,逐個為每個執(zhí)行程序j分配目標(biāo)CPU核心數(shù)。具體來說,如果彈性執(zhí)行器j是數(shù)據(jù)密集型的,即j∈E(φ),它只接受節(jié)點i = I(j)上的CPU核心,以避免創(chuàng)建遠(yuǎn)程任務(wù)。因此,在所有非數(shù)據(jù)密集型執(zhí)行程序中,算法在節(jié)點I(j)上找到一個CPU核心,可以用最小的釋放開銷重新分配給j(第7行)。相反,如果j不是數(shù)據(jù)密集型,則它接受任何節(jié)點上的CPU核心。該算法搜索E-中的所有執(zhí)行程序,以獲得具有CPU內(nèi)核的執(zhí)行程序,該內(nèi)核可以通過最小的釋放和分配開銷重新分配給j(第9行)。在任何一種情況下,如果找到這樣的有效核心重新分配,則算法將其添加到新的賦值X中;否則,它返回FAIL,這表示沒有找到可行的解決方案,并且暗示需要更高的數(shù)據(jù)不敏感閾值φ來獲得可行的解決方案。

?????? 算法1按數(shù)據(jù)強(qiáng)度按降序?qū) +中的執(zhí)行程序進(jìn)行排序,并嘗試通過從其他執(zhí)行程序解除分配核心,逐個為每個執(zhí)行程序j分配目標(biāo)CPU核心數(shù)。具體來說,如果彈性執(zhí)行器j是數(shù)據(jù)密集型的,即j∈E(φ),它只接受節(jié)點i = I(j)上的CPU核心,以避免創(chuàng)建遠(yuǎn)程任務(wù)。因此,在所有非數(shù)據(jù)密集型執(zhí)行程序中,算法在節(jié)點I(j)上找到一個CPU核心,可以用最小的釋放開銷重新分配給j(第7行)。相反,如果j不是數(shù)據(jù)密集型,則它接受任何節(jié)點上的CPU核心。該算法搜索E-中的所有執(zhí)行程序,以獲得具有CPU內(nèi)核的執(zhí)行程序,該內(nèi)核可以通過最小的釋放和分配開銷重新分配給j(第9行)。在任何一種情況下,如果找到這樣的有效核心重新分配,則算法將其添加到新的賦值X中;否則,它返回FAIL,這表示沒有找到可行的解決方案,并且暗示需要更高的數(shù)據(jù)不敏感閾值φ來獲得可行的解決方案。

?????? φ的選擇提供了公式4.2的可行性與彈性執(zhí)行器的計算局部性之間的權(quán)衡。由于動態(tài)分配算法非常有效,我們使用低默認(rèn)值φ=?φ運(yùn)行算法。如果沒有找到可行的解決方案,我們將φ加倍并重新運(yùn)行算法,直到我們找到一個。在我們的實驗中,我們將φ~設(shè)置為512 KB / s,低于該值時,計算局部性的好處可以忽略不計。

Discussions我們的動態(tài)調(diào)度程序設(shè)計適用于使用連續(xù)運(yùn)算符的流處理,并遵循數(shù)據(jù)流模型[5]。調(diào)度程序確定每個執(zhí)行程序滿足延遲要求所需的資源,并計算資源分配以最大限度地降低狀態(tài)遷移成本。在這個級別工作的其他調(diào)度程序包括Flink的DS2 [28],Heron的Dhalion [21],Storm的RAS [36]等等。相比之下,基于云的資源管理系統(tǒng)(如YARN [44]和Mesos [25])更加以集群為中心[26,27],即它們主要旨在管理不同應(yīng)用程序之間的集群資源。他們通常會收到應(yīng)用程序經(jīng)理的資源需求,并根據(jù)效率和公平等標(biāo)準(zhǔn)來決定如何配置資源。通常開發(fā)協(xié)商器/協(xié)調(diào)器模塊用于協(xié)助不同級別的調(diào)度器之間的交互。典型的例子包括Storm-on-Yarn [3]和Flink-on-Yarn [2]。

5. PERFORMANCE EVALUATION

?????? 我們在Apache Storm上大約10,000行Java中實現(xiàn)了Elasticutor的原型[43]。 Elasticutor的源代碼可在[4]獲得。 Storm是一種流行的分布式流處理系統(tǒng),它暴露了低級API,例如Bolt API。 這對于原型研究思想來說相對容易一些。 Storm遵循靜態(tài)方法,其操作符由用戶通過抽象類Bolt實現(xiàn)。 我們添加了一個新的抽象類ElasticBolt,它提供了與Bolt相同的編程接口,但是向用戶空間公開了一個新的狀態(tài)訪問接口。 對于任何定義為ElasticBolt的運(yùn)算符,Elasticutor創(chuàng)建了許多具有內(nèi)置狀態(tài)管理,度量標(biāo)準(zhǔn)Metrics測量和彈性功能的彈性執(zhí)行器。 動態(tài)調(diào)度程序?qū)崿F(xiàn)為在Storm的主節(jié)點(nimbus)上運(yùn)行的守護(hù)程序進(jìn)程。

?????? 我們的實驗在Amazon EC2上進(jìn)行,具有32個t2.2x大型實例(節(jié)點),每個實例具有8個CPU核心和32GB RAM,運(yùn)行Ubuntu 16.04。網(wǎng)絡(luò)是1Gbps以太網(wǎng)。在所有方法下,執(zhí)行器以循環(huán)方式分配給節(jié)點。除非另有說明,否則Elasticutor每個運(yùn)算符使用32個彈性執(zhí)行程序,每個執(zhí)行程序使用256個分片(每個運(yùn)算符8192個分片)。為了公平比較,我們?yōu)殪o態(tài)方法中的Operator創(chuàng)建了足夠的執(zhí)行程序,以充分利用集群中的所有CPU核心;并將RC方法中的key spcae分區(qū)的粒度設(shè)置為每個運(yùn)算符8192個分片,與Elasticutor中的相同。為確保系統(tǒng)穩(wěn)定性,Storm,Heron和Flink等現(xiàn)有流系統(tǒng)實施反壓機(jī)制,以控制Operator的輸入速率。為了關(guān)注系統(tǒng)性能,我們評估壓力情況,其中足夠高的到達(dá)率使輸入隊列保持非空并且可能觸發(fā)Storm的背壓機(jī)制。

?????? 我們在2.3節(jié)中詳細(xì)討論了有狀態(tài)流處理設(shè)計空間中的Elasticutor。在本節(jié)中,我們將Elasticutor的性能與靜態(tài)方法(默認(rèn)Storm)和以資源為中心(RC)方法的性能進(jìn)行比較。第2.2節(jié)總結(jié)了這三種方法的主要區(qū)別。我們通過啟用創(chuàng)建/刪除執(zhí)行程序和Operator級Key重新分區(qū)來實現(xiàn)基于Storm的RC。為了公平比較,RC使用與Elasticutor相同的性能模型,負(fù)載平衡算法和進(jìn)程內(nèi)狀態(tài)共享機(jī)制。我們將評估Elasticutor在設(shè)計空間中的不同維度所做出的性能和權(quán)衡,包括狀態(tài)大小,每元組計算和大小,以及數(shù)據(jù)流的偏度和動態(tài)性。通常,只要有足夠的計算資源可用于系統(tǒng)中的擴(kuò)展,Elasticutor amis的設(shè)計就可以容納計算密集型工作負(fù)載。但是,由于遠(yuǎn)程任務(wù)的引入可能會導(dǎo)致數(shù)據(jù)傳輸和狀態(tài)遷移開銷和延遲,我們的設(shè)計假定工作負(fù)載在元組大小和狀態(tài)大小方面不會過于數(shù)據(jù)密集,并且網(wǎng)絡(luò)帶寬容量確實如此不成為瓶頸。我們假設(shè)在關(guān)鍵領(lǐng)域的數(shù)據(jù)分布中表現(xiàn)出的傾斜度是一種規(guī)范,我們關(guān)注的是更具挑戰(zhàn)性的情況,其中數(shù)據(jù)傾斜度也會突然變化,這可以在股票交易數(shù)據(jù)集和評估中顯示出來。

5.1 Micro-Benchmarking

?????? 在本小節(jié)中,我們使用一個簡單而有代表性的拓?fù)浣Y(jié)構(gòu),如圖5所示,它允許輕松控制工作負(fù)載特性,例如輸入速率,計算成本和數(shù)據(jù)分布。拓?fù)溆缮善骱陀嬎闫鹘M成,輸入數(shù)據(jù)流由生成器饋送到計算器進(jìn)行處理。我們確保數(shù)據(jù)生成速率使計算器的輸入隊列飽和。計算器運(yùn)算符中每個元組的處理時間遵循正態(tài)分布N(μ,δ2=0.5μ)。通過在執(zhí)行時間內(nèi)循環(huán)運(yùn)行數(shù)據(jù)加密來實現(xiàn)計算,以耗盡CPU周期并模擬計算密集型工作負(fù)載。除非另有說明,否則每個元組由一個整數(shù)鍵和一個128字節(jié)的有效負(fù)載組成,并且處理的平均CPU成本為1 ms。密鑰空間包含10K個不同的值,其頻率遵循zipf分布[37],偏差因子為0.5。默認(rèn)狀態(tài)大小為256MB,每個分片為32KB。為了模擬工作負(fù)載動態(tài),我們通過應(yīng)用每分鐘ω次隨機(jī)排列來改變元組Key的頻率。

工作負(fù)載動態(tài)的穩(wěn)健性:圖6描繪了隨著ω沿x軸變化的三種方法下的吞吐量和平均處理延遲。我們觀察到,當(dāng)工作負(fù)載是動態(tài)的時,Elasicutor在兩個指標(biāo)方面始終優(yōu)于其他方法,即ω > 0。特別是,由于密鑰分配偏差導(dǎo)致工作負(fù)載不平衡,靜態(tài)方法的性能較差,但由于沒有執(zhí)行彈性操作,因此在所有情況下都相對穩(wěn)定。 由于RC和Elasticutor都能夠適應(yīng)偏斜的鍵分布,因此當(dāng)ω很小時,它們會大大優(yōu)于靜態(tài)。 然而,隨著ω的增加,盡管由于彈性操作成本較高而導(dǎo)致RC和Elasticutor的性能下降,但Elasticutor的性能下降是微不足道的,而RC的性能下降變大了2-3個數(shù)量級,使RC無用為ω 達(dá)到16。

?????? 為了更好地解釋ω變化時三種方法的性能,我們關(guān)注ω= 2的情景,即每30秒進(jìn)行一次混洗,并繪制在圖7中1秒的滑動時間窗口內(nèi)測量的瞬時吞吐量。我們觀察到 靜態(tài)方法的吞吐量始終遠(yuǎn)低于RC和Elasticutor的吞吐量,盡管變化不大。 由于關(guān)鍵混洗觸發(fā)彈性操作的執(zhí)行,RC和Elasticutor每30秒就會出現(xiàn)一次瞬態(tài)吞吐量降低。 然而,RC的退化要差得多,其瞬態(tài)持續(xù)時間為10到20秒,而Elasticutor的衰減僅持續(xù)1到3秒。 這解釋了隨著工作負(fù)載變得更加動態(tài),兩種方法中性能差距擴(kuò)大的原因。

?????? 在不同數(shù)據(jù)強(qiáng)度下的性能:為了評估工作負(fù)載的數(shù)據(jù)強(qiáng)度如何影響三種方法的性能,我們改變元組大小(表示為s)和每個元組的計算成本(表示為c),并將其性能進(jìn)行比較。結(jié)果表明,在數(shù)據(jù)強(qiáng)度較高的情況下,例如,元組大小較大或每個元組的計算成本較低,由于數(shù)據(jù)傳輸開銷較高,三種方法的吞吐量會下降。例如,當(dāng)c = 0.01ms且s = 2KB時,一個CPU內(nèi)核上的全速元組處理的數(shù)據(jù)傳輸要求是2Gbps,超過了網(wǎng)絡(luò)帶寬,即1Gbps,因此導(dǎo)致性能顯著下降。所有的方法。但是,Elasticutor通常對元組大小比競爭對手更敏感,特別是當(dāng)計算成本極低時,例如,每個元組c = 0.01ms,因為它具有獨特的兩級元組路由機(jī)制,從而帶來更高的數(shù)據(jù)傳輸開銷數(shù)據(jù)強(qiáng)度。

?????? 不同狀態(tài)大小下的性能:圖9比較了三種方法在吞吐量和延遲方面的性能,因為狀態(tài)大小沿x軸變化。請注意,由于每個運(yùn)算符有8192個分片,因此當(dāng)每個分片的狀態(tài)大小為32MB時,運(yùn)算符的狀態(tài)大小將為256GB,這相當(dāng)大。結(jié)果表明,隨著狀態(tài)大小的增加,RC和Elasticutor的性能下降,這是由于較大的狀態(tài)大小導(dǎo)致的狀態(tài)遷移開銷增加。當(dāng)狀態(tài)大小接近32MB時,作為一種極端情況,由于執(zhí)行彈性的巨大運(yùn)營成本,Elasticutor和RC都比靜態(tài)方法表現(xiàn)更差。我們還觀察到,在相同的州規(guī)模下,Elasicutor的表現(xiàn)優(yōu)于RC方法。這表明Elasticutor中使用的技術(shù),如狀態(tài)共享機(jī)制和動態(tài)調(diào)度,可以有效地減少彈性操作中的狀態(tài)遷移開銷。

對執(zhí)行工作負(fù)載分配傾斜的魯棒性:實際上,由于Key分配偏差或者由于Operator級Key分區(qū)功能的不正確配置,工作負(fù)載可能無法在執(zhí)行程序之間平均分配。為了評估三種方法對傾斜執(zhí)行器工作負(fù)載分布的穩(wěn)健性,我們使用由圖10中的偏度因子α控制的變化的密鑰分布偏度來評估它們的性能。注意,α越大,密鑰分布具有越大的偏斜。例如,當(dāng)α= 0時,密鑰遵循均勻分布,而當(dāng)α≥0.8時,大多數(shù)工作負(fù)荷落入幾個密鑰。結(jié)果表明,靜態(tài)方法受到負(fù)載不平衡的影響很大,而α<0.8時,Elasticutor和RC對執(zhí)行器負(fù)載不平衡的抵抗力更強(qiáng)。主要觀察結(jié)果是,當(dāng)α≤0.6時,Elasticutor始終優(yōu)于RC,但其性能急劇下降,并且在極度偏斜的工作負(fù)荷分布下比RC差,例如α≥0.7。這表明盡管依賴于創(chuàng)建更多遠(yuǎn)程任務(wù)來處理傾斜的執(zhí)行程序工作負(fù)載分配,但Elasticutor中的執(zhí)行程序能夠處理高達(dá)α= 0.5的工作負(fù)載不平衡,而不會在運(yùn)行遠(yuǎn)程任務(wù)時引入明顯的延遲增加和吞吐量降低。但是,當(dāng)0.6≤α≤0.8時,大多數(shù)過載執(zhí)行器無法通過有效利用更多遠(yuǎn)程任務(wù)來進(jìn)一步卸載其工作負(fù)載,這主要是由于擁塞的網(wǎng)絡(luò)帶寬,因此成為性能瓶頸,導(dǎo)致系統(tǒng)吞吐量和延遲較差。

Shard reassignment cost因為RC方法和Elasticutor都使用碎片重新分配來平衡工作負(fù)載,我們會比較它們的成本以更好地理解產(chǎn)生的不同延遲。 圖11顯示了每個分片的平均節(jié)點內(nèi)和節(jié)點間重新分配時間,分為同步時間和狀態(tài)遷移時間。 我們觀察到RC中的碎片重新分配時間遠(yuǎn)遠(yuǎn)長于Elasticutor,這主要是由于RC方法中的同步時間極長。 我們還可以看到Elasticutor在狀態(tài)遷移中花費的時間比RC短,但與同步時間相比,狀態(tài)遷移中兩種方法之間的差異較小。

?????? 為了深入了解兩種方法之間的同步時間差異,我們改變了上游執(zhí)行器的數(shù)量,并發(fā)現(xiàn)RC比Elasticutor需要2-3個更大的時間來同步,并且它們的差異隨著更多的上游執(zhí)行器而變寬, 如圖12(a)所示。 Elasticutor遵循以執(zhí)行者為中心的標(biāo)準(zhǔn),從而避免在分片重新分配期間與上游檢查員同步。 因此,無論上游執(zhí)行器的數(shù)量如何,其同步時間約為2 ms。 相反,在RC方法中,需要更新上游執(zhí)行器的路由表,并且需要全局同步來清除執(zhí)行器和上游執(zhí)行器之間的飛行中元組。 因此,RC中的同步時間要高得多,并且隨著上游執(zhí)行器的數(shù)量而大大增加。

圖12(b)描繪了狀態(tài)大小變化時的狀態(tài)遷移時間。 我們觀察到,由于進(jìn)程內(nèi)狀態(tài)共享機(jī)制,兩種方法中的節(jié)點內(nèi)狀態(tài)遷移的延遲可忽略不計。 當(dāng)狀態(tài)大小達(dá)到32 MB時,節(jié)點間狀態(tài)遷移的時間顯著增加,其中狀態(tài)的網(wǎng)絡(luò)數(shù)據(jù)傳輸是狀態(tài)遷移過程中的主要開銷。 該圖還顯示,在給定相同狀態(tài)大小的情況下,由于執(zhí)行器為中心的范例啟用了執(zhí)行器間獨立性,因此Elasticutor遷移狀態(tài)所需的時間比RC短一些。

5.2 Scalability of a Single Elastic Executor

?????? Elasticutor的主要優(yōu)點是它通過分配更多CPU內(nèi)核而不是通過Operator級Key Space重新分區(qū)來處理工作負(fù)載動態(tài)。 盡管在一個合理的設(shè)置中,Operator通常有足夠的執(zhí)行器來分?jǐn)倖蝹€執(zhí)行器上的工作負(fù)載,但由于Key分配偏差,操作員不正確,執(zhí)行人員可能負(fù)載過重而需要許多遠(yuǎn)程任務(wù)。 級別分區(qū)或不必要的執(zhí)行程序。 因此,為了Elasticutor的健壯性,彈性執(zhí)行器具有良好的可伸縮性是至關(guān)重要的,即能夠有效地擴(kuò)展到許多CPU核心,并且在運(yùn)行遠(yuǎn)程任務(wù)時不會引入明顯的延遲。

?????? 為了評估彈性執(zhí)行器可以有效擴(kuò)展的范圍,我們只為計算Operaotr設(shè)置了一個彈性執(zhí)行器,但逐漸分配更多的CPU核心并測量其吞吐量和處理延遲。 由于每個節(jié)點有8個CPU核心,因此分配的前8個核心是本地核心,后續(xù)核心是遠(yuǎn)程核心。 在我們的評估中,我們改變了彈性的數(shù)據(jù)強(qiáng)度和運(yùn)營成本,這是影響可擴(kuò)展性的主要因素。 前者決定了遠(yuǎn)程數(shù)據(jù)傳輸在運(yùn)行遠(yuǎn)程任務(wù)時的長期成本,并且與元組大小成正比,與每個元組的計算成本成反比。 后者影響執(zhí)行彈性操作的短期運(yùn)輸開銷,并與規(guī)模和工作量動態(tài)(ω)呈正相關(guān)。

?????? 圖13描繪了執(zhí)行器在不同計算成本(左)和元組大小(右)下的可伸縮性。我們觀察到單個彈性執(zhí)行器通常可以有效地擴(kuò)展到整個集群(256個CPU核心),這表明遠(yuǎn)程數(shù)據(jù)傳輸?shù)某杀究梢院雎圆挥嫛N覀冞€觀察到彈性執(zhí)行器無法有效地利用超過16個具有非常大的元組大小的CPU核心,例如8KB,或者非常低的計算成本,例如每個元組0.01ms,這表明巨大的遠(yuǎn)程數(shù)據(jù)傳輸鏈接到高數(shù)據(jù)強(qiáng)度可防止執(zhí)行程序擴(kuò)展。圖14顯示了彈性執(zhí)行器向外擴(kuò)展時的99%延遲。我們可以看到,在大多數(shù)情況下,由于Netty [1]啟用了有效的網(wǎng)絡(luò)數(shù)據(jù)傳輸,處理延遲不會隨著彈性執(zhí)行器的擴(kuò)展而顯著增加。然而,在數(shù)據(jù)密集型工作負(fù)載中,例如,計算成本≤0.1ms或元組大小≥2KB,隨著分配的CPU核心數(shù)超過遠(yuǎn)程數(shù)據(jù)傳輸成為性能瓶頸的點,等待時間大大增加。請注意,由于我們在任何一對輸入輸出執(zhí)行器之間實現(xiàn)了反壓機(jī)制,因此延遲不會無限增長。

?????? 圖15顯示了彈性執(zhí)行器在各種碎片狀態(tài)大小下的可擴(kuò)展性,ω= 2(左)和16(右)。 結(jié)果表明,彈性執(zhí)行器在所有狀態(tài)尺寸下均可有效擴(kuò)展,但是32MB。 狀態(tài)較大時,狀態(tài)遷移會成為性能瓶頸,從而阻止執(zhí)行程序有效地使用遠(yuǎn)程CPU核心。 通過比較兩個子圖,我們觀察到當(dāng)ω增加到16時,由于與更高工作負(fù)載動態(tài)相關(guān)的狀態(tài)遷移需求增加,大狀態(tài)下的可擴(kuò)展性顯著降低。

5.3 Choosing Appropriate Parameters

?????? Elasticutor中有兩個重要參數(shù):每個執(zhí)行程序的硬數(shù),表示為z,每個運(yùn)算符的執(zhí)行數(shù),表示為asy。作為一個規(guī)則,將z設(shè)置在256和1024之間可以實現(xiàn)良好的內(nèi)部執(zhí)行器負(fù)載平衡,并且將y設(shè)置為計算密集型運(yùn)營商的節(jié)點數(shù)可以為那些運(yùn)營商提供足夠的潛力來擴(kuò)展工作負(fù)載陣陣。然而,在下文中,我們在各種工作負(fù)載下評估系統(tǒng)性能的大范圍(y,z),以便了解這兩個參數(shù)影響系統(tǒng)性能的原因和方式以及如何在極端情況下選擇合適的參數(shù)工作負(fù)載。為了進(jìn)行全面觀察,我們使用三種代表性工作負(fù)載,即默認(rèn)工作負(fù)載,數(shù)據(jù)密集型工作負(fù)載和高度動態(tài)工作負(fù)載。設(shè)s和ω分別表示以字節(jié)為單位的元組大小和每分鐘的密鑰重組。在默認(rèn)工作負(fù)載中,(s,ω)=(128B,2)。我們分別通過將s增加到8K和ω增加到16來獲得數(shù)據(jù)密集型工作負(fù)載和高動態(tài)工作負(fù)載。因此,(s,ω)=(8K,2)用于數(shù)據(jù)密集型工作負(fù)載,(s,ω)=(128B,16)用于高動態(tài)工作負(fù)載。圖16顯示了在三個工作負(fù)載下具有各種y和z的系統(tǒng)吞吐量。為了比較,我們還在圖中顯示了靜態(tài)和RC方法的吞吐量。

?????? Number of shards從圖16中,我們觀察到隨著z增加,吞吐量通常會增加,盡管邊際增長正在減少。 這表明當(dāng)使用太少的分片時,例如,z≤64,執(zhí)行器內(nèi)負(fù)載平衡質(zhì)量差,妨礙彈性執(zhí)行器有效地利用多個核心; 然而,太精細(xì)的分片(例如,z≥1024)不會進(jìn)一步提高吞吐量,因為執(zhí)行器內(nèi)的負(fù)載平衡已經(jīng)有效。 基于那些z = 16個觀測值,我們驗證每個執(zhí)行器256到1024個分片實現(xiàn)了良好的性能。

?????? Number of executors如圖16(a)所示,對于一個足夠大的z,除了y = 256之外,Elasticutor實現(xiàn)了有希望的性能。當(dāng)y = 256時,即集群中的CPU核心數(shù)量,每個彈性執(zhí)行器只能分配一個CPU核心。因此,執(zhí)行者失去彈性,Elasticutor被降級為靜態(tài)方法。通過比較圖16(a)和圖16(b),我們可以看到,當(dāng)元組大小增加到8K時,靜態(tài)和RC的性能變化不大,而在y = 1的情況下Elasticutor的性能嚴(yán)重下降。與默認(rèn)工作負(fù)載相比,在數(shù)據(jù)密集型工作負(fù)載中運(yùn)行遠(yuǎn)程任務(wù)時遠(yuǎn)程數(shù)據(jù)傳輸?shù)某杀靖叱?4倍。這限制了單個執(zhí)行程序的可伸縮性,因此導(dǎo)致單個執(zhí)行程序需要擴(kuò)展到許多遠(yuǎn)程CPU核心的小y的性能較差。通過比較圖16(a)和圖16(c),我們觀察到隨著洗牌頻率從2增加到16,雖然吞吐量一般會減少,但當(dāng)y很小時,減少幅度要大得多,即1或者8.在頻繁混洗的動態(tài)工作負(fù)載下,例如ω= 16,需要重新分配更多分片以進(jìn)行負(fù)載平衡,從而導(dǎo)致高遷移成本。相反,當(dāng)y足夠大時,大多數(shù)執(zhí)行器可以使用本地CPU內(nèi)核進(jìn)行擴(kuò)展,從而避免由于內(nèi)部處理狀態(tài)共享機(jī)制導(dǎo)致的狀態(tài)遷移;因此,吞吐量不會降低太多。總之,為每個節(jié)點設(shè)置一個或兩個執(zhí)行程序?qū)Ω鞣N工作負(fù)載都很穩(wěn)健。

5.4 Evaluation of Realtime Application

?????? 為了評估Elasticutor在實際應(yīng)用中的表現(xiàn),我們使用上海證券交易所(SSE)交易的股票的匿名訂單數(shù)據(jù)集,收集時間超過三個月,每個交易時間約有800萬條記錄。該應(yīng)用程序執(zhí)行證券交易所的市場清算機(jī)制,并提供實時分析。應(yīng)用程序的拓?fù)浣Y(jié)構(gòu)如圖17所示。輸入流由買方和賣方的限價訂單組成,這些限價訂單指定了特定庫存的特定交易量的出價和要價。順序元組的大小為96字節(jié)。在新訂單到達(dá)時,交易員操作員針對未完成的訂單執(zhí)行它,并確定交易數(shù)量和現(xiàn)金轉(zhuǎn)移。一旦進(jìn)行了這樣的交易,就會向下游運(yùn)營商發(fā)送160字節(jié)的交易記錄,包括時間,股票數(shù)量和交易價格以及賣方,買方和股票的ID,包括6個運(yùn)營商的統(tǒng)計數(shù)據(jù)和5個事件處理的運(yùn)營商。分析運(yùn)算符生成統(tǒng)計數(shù)據(jù),例如移動平均值和綜合指數(shù),并觸發(fā)用戶定義的事件,例如當(dāng)特定股票的交易價格超過預(yù)定義閾值時的警報。每個統(tǒng)計運(yùn)營商的狀態(tài)大小約為200MB到400MB,而事件處理運(yùn)營商的狀態(tài)相對較小,低于10MB。由于交易和分析涉及個股,我們將庫存ID的空間劃分為并行處理。由于股票交易具有不可預(yù)測的性質(zhì),股票的到達(dá)率和分布均隨著時間的推移而波動很大,從而導(dǎo)致高度動態(tài)的工作量。為了說明工作負(fù)荷動態(tài),圖18顯示了5種最受歡迎??的股票的即時到達(dá)率。

?????? 除了靜態(tài),RC和Elasticutor之外,我們還測試了一個天真的以執(zhí)行器為中心(naive-EC)的實現(xiàn),它與Elasticutor相同,只是在調(diào)度程序中禁用了遷移成本和計算局部性的優(yōu)化。 圖19繪制了在32個節(jié)點上運(yùn)行的四種方法下的瞬時吞吐量和第99百分位處理延遲。 我們觀察到,naive-EC和Elasicutor都優(yōu)于靜態(tài)和RC方法,大約使吞吐量翻倍,并將延遲降低1-2個數(shù)量級。 盡管naive-EC和Elasticutor之間的性能差距是可識別的,但與執(zhí)行者為中心的方法和其他兩種方法之間的差距相比,它們之間的差距很小。 這一觀察結(jié)果表明,盡管動態(tài)調(diào)度程序中的優(yōu)化能夠顯著提高性能,但Elasticutor的更好性能主要歸功于采用的以執(zhí)行器為中心的有利范式。

?????? 為了進(jìn)一步說明naive-EC和Elasticutor之間性能差距背后的原因,我們在表2中顯示了它們的狀態(tài)遷移率和遠(yuǎn)程數(shù)據(jù)傳輸率。前者的速率是整個系統(tǒng)在網(wǎng)絡(luò)中遷移的狀態(tài)的聚合大小。 單位時間。 后一種速率是在所有彈性執(zhí)行器和它們的遠(yuǎn)程任務(wù)之間以單位時間傳輸?shù)臄?shù)據(jù)的總量。 我們觀察到,naive-EC下的狀態(tài)遷移率和遠(yuǎn)程數(shù)據(jù)傳輸率分別比Elasticutor下的5倍和10倍高。 通過較少的狀態(tài)遷移,彈性執(zhí)行器轉(zhuǎn)換到新的資源分配計劃將更有效,從而實現(xiàn)更高的性能。 同樣,通過較少的遠(yuǎn)程數(shù)據(jù)傳輸,Operator間數(shù)據(jù)傳輸可以使用更多的網(wǎng)絡(luò)帶寬,從而進(jìn)一步提高性能。

?????? 最后,我們在SSE工作負(fù)載下評估Elasticutor的可伸縮性。 我們改變計算集群的大小,即節(jié)點的數(shù)量,并測量Elasticutor的吞吐量和調(diào)度成本,即動態(tài)調(diào)度器計算新的CPU到執(zhí)行器分配所需的平均時間。 保持較低的調(diào)度成本對于系統(tǒng)適應(yīng)動態(tài)工作負(fù)載非常重要。 表3顯示了隨著規(guī)模增加的吞吐量和調(diào)度成本。 我們觀察到隨著集群的增長,吞吐量幾乎呈線性增長; 并且調(diào)度成本大約是幾毫秒,并且隨著節(jié)點數(shù)量的增加而略有增長.

6. RELATEDWORK

?????? Stream Processing System 早期的流處理系統(tǒng),如Aurora [7],Borealis [6],TelegraphCQ [17]和STREAM [10],旨在通過利用分布式但靜態(tài)的計算資源來處理海量數(shù)據(jù)更新。 借助云計算技術(shù),出現(xiàn)了新一代流系統(tǒng),重點是并行數(shù)據(jù)處理,可用性和容錯,以充分利用基于云的平臺上的靈活資源管理方案。 Spark Streaming [50],Storm [43],Samza [35],Heron [31],Flink [12]和Waterwheel [46]是最流行的開源系統(tǒng),提供分布式流處理和分析。 大型工業(yè)企業(yè)也在開發(fā)內(nèi)部分布式流系統(tǒng),如Muppet [32],MillWheel [8],Trill [16],Dataflow [9]和StreamScope [33]。

Elasticity.大量的工作探索了實現(xiàn)彈性的可能性。卡斯特羅等人。 [15]將資源重新擴(kuò)展操作與分布式流系統(tǒng)中的容錯功能相結(jié)合,以便在遷移到新計算節(jié)點之前將與處理邏輯綁定的中間狀態(tài)寫入持久性存儲。王等人。 [47]提出了彈性流水線技術(shù),以便為分布式SQL查詢啟用動態(tài),工作負(fù)載感知的運(yùn)行時重新配置。在Flux [39]中提出了一種自適應(yīng)分區(qū)算子,以實現(xiàn)節(jié)點之間的分區(qū)移動以實現(xiàn)負(fù)載平衡。但是,由于其工作負(fù)載遷移基于每個分區(qū),因此當(dāng)單個分區(qū)超出群集中任何節(jié)點的處理能力時,此方法將面臨困難。 ChronoStream [48]將計算狀態(tài)劃分為一個集合的粒度切片單元,并在節(jié)點之間動態(tài)分配它們以支持彈性。 Gedik等人。 [24]提出了在不違反狀態(tài)一致性的情況下擴(kuò)展有狀態(tài)運(yùn)算符的機(jī)制。 Chi [34]是一個具有監(jiān)控和動態(tài)重新配置功能的控制面板。然而,這些方法在以資源為中心的標(biāo)準(zhǔn)之后實現(xiàn)了彈性,這導(dǎo)致了昂貴的同步并且妨礙了快速彈性。它們適用于以粗糙時間粒度使用彈性功能的情況,即每5分鐘;實現(xiàn)的彈性太慢,無法應(yīng)用于具有高動態(tài)工作負(fù)載的應(yīng)用中。 Elasticutor采用新的以執(zhí)行器為中心的方法來避免這個問題。此方法大大降低了執(zhí)行工作負(fù)載重新同步的同步開銷,因此可在幾毫秒內(nèi)實現(xiàn)工作負(fù)載重新分配。

Workload Distribution。分布式流系統(tǒng)的通用工作負(fù)載分配是一個具有挑戰(zhàn)性的問題,因為隨著時間的推移,輸入數(shù)據(jù)流的偏差很大并且差異很大。 Shah [39]等。為傳統(tǒng)流處理框架中的單個操作設(shè)計了動態(tài)工作負(fù)載再分配機(jī)制,例如Borealis [6]。 [24]和[20]研究了混合路由策略,通過其密鑰對工作負(fù)載進(jìn)行分組,以便根據(jù)CPU,內(nèi)存和帶寬資源動態(tài)平衡負(fù)載。 TimeStream [38]采用圖形重組策略,直接用全新的處理拓?fù)涮鎿Q原始處理拓?fù)洹H欢?#xff0c;系統(tǒng)在所有可應(yīng)用的圖形結(jié)構(gòu)的巨大搜索空間中監(jiān)視和優(yōu)化拓?fù)浣Y(jié)構(gòu)具有挑戰(zhàn)性。 Cardellini等。 [13]研究了Storm之上的有狀態(tài)任務(wù)遷移。丁等人。 [18]討論了基于馬爾可夫決策過程(MDP)制定任務(wù)遷移計劃的長期優(yōu)化,以提高分布式流引擎的資源利用率。但是,Elasticutor不僅可以實現(xiàn)工作負(fù)載分配中的負(fù)載平衡,還可以考慮遷移成本最小化和計算局部性。

7. Conclusion

我們設(shè)計并實現(xiàn)了Elasticutor,它為流處理系統(tǒng)提供了快速彈性。 彈性器遵循一種新的以執(zhí)行器為中心的方法,該方法將執(zhí)行程序靜態(tài)綁定到運(yùn)算符,但允許執(zhí)行程序獨立擴(kuò)展。 這種方法將操作員的擴(kuò)展與有狀態(tài)處理所需的全局同步分離開來。 Elasticutor框架有兩個構(gòu)建塊:彈性執(zhí)行器,執(zhí)行動態(tài)負(fù)載平衡,以及優(yōu)化計算資源使用的調(diào)度程序。 實驗表明,與傳統(tǒng)的以資源為中心的提供彈性的方法相比,彈性器使吞吐量增加一倍,平均延遲降低了幾個數(shù)量級。

總結(jié)

以上是生活随笔為你收集整理的一周一论文(翻译)——[SIGMOD 19] Elasticutor:Rapid Elasticity for Realtime Stateful Stream Processing的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。