spark如何防止内存溢出_Spark 理论基石 —— RDD
概述
RDD,學名可伸縮的分布式數據集(Resilient Distributed Dataset)。是一種對數據集形態的抽象,基于此抽象,使用者可以在集群中執行一系列計算,而不用將中間結果落盤。而這正是之前 MR 抽象的一個重要痛點,每一個步驟都需要落盤,使得不必要的開銷很高。
對于分布式系統,容錯支持是必不可少的。為了支持容錯,RDD 只支持粗粒度的變換。即,輸入數據集是 immutable (或者說只讀)的,每次運算會產生新的輸出。不支持對一個數據集中細粒度的更新操作。這種約束,大大簡化了容錯支持,并且能滿足很大一類的計算需求。
初次接觸 RDD 的概念的時候,不大能夠理解為什么要以數據集為中心做抽象。后來隨著不斷深入的了解,對數據集的一致性抽象正是計算流水線(pipeline)得以存在和優化的精髓所在。在定義了數據集的基本屬性(不可變,分區,依賴關系,存放位置等)后,就可以在此基礎上施加各種高階算子,以構建 DAG 執行引擎,并做適當優化。從這個角度來說,RDD 實在是一種精妙設計。
例行總結一下 RDD 論文的主要設計點有:
作者:青藤木鳥 Muniao's blog 轉載請注明出處
小引
Dryad 和 MapReduce 是業已流行的大數據分析工具。它們給用戶提供了一些高階算子來使用,而不用去關心底層的分布式和容錯細節。但它們都缺少對分布式內存的抽象,不同計算過程之間只能夠通過外存來耦合:前驅任務將計算結果寫到外存上去,后繼任務再將其作為輸入加載到內存,然后才能接著執行后繼計算任務。這樣的設計有兩個很大的劣勢:復用性差、延遲較高。這對于像 Page-Rand,K-Means,LR 等要求迭代式計算的機器學習算法(需要數據復用)極其不友好;對于一些隨機的交互式查詢(要求延遲低)也是個災難。因為他們將大部分的時間都耗費在數據備份、硬盤 IO 和數據序列化之上。
在 RDD 之前,為了解決數據復用的問題,業界已有諸多嘗試。包括將中間結果放在內存中的迭代式圖計算系統——Pregel,以及將多個 MR 串在一塊,緩存循環不變量的 HaLoop。但這些系統只支持受限的計算模型(比如MR),而且只進行隱式[1]的數據復用。如何進行更通用的數據復用,以支持更復雜的查詢計算,仍是一個難題。
RDD 正是為解決這個問題而設計,高效地復用數據的一個數據結構抽象。RDD 支持數據容錯、數據并行;在此之上,能夠讓用戶利用多機內存、控制數據分區、構建一系列運算過程。從而解決很多應用中連續計算過程對于數據復用的需求。
其中比較難的一個設計是如何針對內存數據進行高效的容錯?,F有的一些基于集群內存的系統,比如分布式KV、共享內存、Piccolo 都提供一種可以細粒度的修改的可變數據集抽象。為了支持這種抽象之上的容錯,就需要進行數據多機冗余或者操作日志備份。這些操作都會導致多機間大量的數據傳輸,由于網絡帶寬遠慢于 RAM,使得分布式利用內存這件事失去其優勢。
與之相對,RDD 只提供粗粒度的、基于整個數據集的計算接口,即數據集中的所有條目都施加同一種操作。這樣一來,為了容錯,我們只需要備份每個操作而非數據本身(因為是整體更新的);在某個分區數據出現問題進行錯誤恢復時,只需要從原始數據集出發,按順序再算一遍即可。
初看起來,這種計算抽象很受限,但它其實能滿足現有的一大類的集群計算需求,包括 MR、 DryadLINQ、 SQL、Pregel 和 HaLoop。并且能滿足一些其他計算需求,比如說交互式計算。RDD 的實現系統 Spark,提供類似 DryadLINQ 的高階算子,應該是第一個提供交互式的集群運算接口。
RDD
本節首先給出 RDD 的詳細定義,然后介紹下 Spark 的中針對 RDD 的操作接口,繼而對比了 RDD 與提供細粒度更新接口的共享內存抽象優劣。最后就 RDD 的局限性討論一下。
RDD 抽象
RDD 是一個基于分區的、只讀的數據記錄集抽象。RDD 只可以通過對持久存儲或其他 RDD 進行確定性運算得來,這種運算被稱為變換。常用的變換算子包括:map,filter 和 join。
RDD 沒有選擇不斷的做檢查點以進行容錯,而是會記下 RDD 從最初的外存的數據集變化而來的變化路徑,也就是其譜系(lineage)。理論上所有的 RDD 都可以在出錯后從外存中依據譜系圖進行重建。一般來說,重建的粒度是分區(Partition)而非整個數據集,一來代價更小,二來不同分區可能在不同機器上。
用戶可以對 RDD 的兩個方面進行控制:持久化和分區控制。對于前者,如果某些 RDD 需要復用,那么用戶可以指示系統按照某種策略將其進行持久化。后者來說,用戶可以定制分區路由函數,將數據集合中的記錄按照某個鍵值路由到不同分區。比如進行 Join 操作的時候,可以講待 Join 數據集按照相同的策略進行分區,以并行 Join。
Spark 編程接口
Spark 通過暴露與編程語言集成的算子來提供操作 RDD 的接口。 其中 RDD 表現為編程語言中的類,而 RDD 的算子為作用于這些類上的函數。之前的系統如 DryadLINQ 和 FlumeJava 也使用了類似的形式。
用戶使用 RDD 時,首先將數據從持久化存儲中通過變換(Transformations,如 map 或者 filter)將其載入內存,然后可以對 RDD 施加任何系統支持的一系列變換,最后利用動作(Action)算子,將 RDD 重新持久化到外存中或者將控制權交還用戶。和 DryadLINQ 一樣,這個加載-變換-落盤的過程是聲明式(Declarative,或者說是惰式[2])的,Spark 在拿到整個拓撲后會利用執行引擎進行執行優化(比如將并行化、流水線化,之后會進一步討論)。
此外很重要的一個接口是 persist,可以由用戶來告訴系統哪些 RDD 需要持久化,如何持久化(本機硬盤還是跨機器存儲),如果有多個 RDD 需要持久化,那么優先級如何確定。Spark 默認將 RDD 保存在內存中,如果內存不夠用了會根據用戶配置將數據溢出(spill)到硬盤上。
舉個栗子
假設我們相對存在于 HDFS 上的日志文件,找出錯誤條目,針對出現 hdfs 關鍵字的具體條目進行分析。利用 Spark 接口,使用 Scala 語言實現,代碼如下:
lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR")) errors.persist() ? // Return the time fields of errors mentioning // HDFS as an array (assuming time is field // number 3 in a tab-separated format): errors.filter(_.contains("HDFS")).map(_.split(’t’)(3)).collect()第一行基于某個 hdfs 上的文件定義一個 rdd(每一行作為集合中的一個條目)。第二行通過 filter 變換生成新的 rdd,第三行請求 spark 將其結果進行暫存。最后一行是鏈式操作,以一個 collect 的動作結尾,求出包含 HDFS 關鍵字的所有行數的各個字段。
其計算的譜系圖(lineage)如下:
有兩點需要注意:
由于第三行將結果在內存中進行了緩存,因此還可以基于此做其他動作。比如,計算包含 'MySQL' 關鍵字的錯誤條數:
// Count errors mentioning MySQL: errors.filter(_.contains("MySQL")).count()RDD 模型的優點
為了理解 RDD 帶來的好處,可以看下面一個表,將 RDD 與 DSM (Distributed Shared Memory)做了詳細對比。DSM 在這里是一個很寬泛的抽象,不僅包括一般的內存共享系統,還包括其他支持細粒度的狀態更新的框架,比如說 Piccolo、分布式數據庫等。
首先, DSM 和 RDD 最主要的區別在于,DSM 支持對數據集細粒度的更新。即,可以對任意內存位置進行更新。而 RDD 舍棄了這一點,只允許批量的寫入數據,從而提高了容錯效率:
其次,RDD 的不可變的特點允許系統叫較容易的對某些計算進行遷移。比如說 MR 中的某些 Stragger 任務就可以很方便的遷移到其他計算節點上去,因為其輸入數據一定不會被改變,因此不用考慮一致性的問題。
最后還有兩個好處值得一提:
RDD 不適用的場景
如前所述,RDD 適用于針對全數據集統一處理的粗粒度變換的抽象。相對的,就不適用于要求對數據進行細粒度的、異步更新的數據集。比如說 web 應用,再比如說爬蟲等等。對于這些引用類型,傳統的快照+操作日志的容錯方式可能更適合一些。如數據庫 RAMCloud , Percolator 和 Piccolo。 RDD 的目標在于批量分析型應用,而將這些異步應用的需求留給那些專有系統。
Spark 編程接口
Spark 利用 Scala 語言作為 RDD 抽象的接口,因為 Scala 兼顧了精確(其函數式語義適合交互式場景)與高效(使用靜態類型)。當然,對于 RDD 本身來說,不限定于任何特定的語言表達。下面從執行流程與代碼分發兩個方面來詳細說明下 Spark 是如何執行用戶代碼的。
開發者利用 Spark 提供的庫編寫驅動程序 (driver programe)以使用 Spark。驅動程序會定義一到多個 RDD,并對其進行各種變換。Spark 提供的庫會連接 Spark 集群,生成計算拓撲,并將拓撲分散到多個 workers 上去進行執行,同時記下變換的譜系(lineage)。這些 workers 是分散在 Spark 集群內各個機器上的常駐進程,它們在內存里保存計算過程中生成的 RDD 的各個分區。
像前面舉的例子一樣,開發者需要將函數作為參數傳給 map 等 Spark 算子。Spark 會將這些函數(或者說閉包)序列化為 Java 對象,然后分發給執行節點進行加載。閉包所涉及的變量會被當做上述生成對象的字段值。RDD 本身會被包裝成靜態類型的參數進行傳遞。由于 Scala 支持類型推斷,大部分例子都省掉了 RDD 數據類型。
盡管 Spark 暴露的 Scala 的 RDD 接口在概念上看起來很簡單,但實在實現上有一些很臟的角落,比如說 Scala 的閉包得使用反射, 比如說盡量避免修改 Scala 的解釋器。
Spark 中的 RDD 操作
下表列出了 Spark 中支持的 RDD 操作。如前面所說,變換(transformations)是生成新 RDD 的惰性算子,而動作(actions)是觸發調度的算子,它會返回一個結果或者將數據寫到外存中。
需要注意的是:
RDD 的表示
提供 RDD 抽象的一個難點在于,如何高效的跟蹤譜系并能提供豐富的變換支持。最后我們選用了基于圖的調度模型,將調度和算子進行了解耦。從而能夠在不改變調度模塊邏輯的前提下,很方便的增加算子支持。具體來說,RDD 抽象的核心組成主要有以下五個部分:
在 RDD 的接口設計中最有趣的一個點是如何對 RDD 間的依賴關系進行規約。最后發現可以將所有依賴歸納為兩種類型:
如此歸納的原因主要有兩點。
調度優化。對于窄依賴,可以對分區間進行并行流水化調度,先計完成某個窄依賴算子(比如說 map)的分區不用等待其他分區而直接進行下一個窄依賴算子(比如 filter )的運算。與之相對,寬依賴的要求父 RDD 的所有分區就緒,并進行跨節點的傳送后,才能進行計算。類似于 MapReduce 中的 shuffle。
數據恢復。在某個分區出現錯誤或者丟失時,窄依賴的恢復更為高效。因為涉及到的父分區相對較少,并且可以并行恢復。而對于寬依賴,由于依賴復雜(如上圖,子 RDD 的每個分區都會依賴父 RDD 的所有分區),一個分區的丟失可能就會引起全盤的重新計算。
這樣將調度和算子解耦的設計大大簡化了變換的實現,大部分變換都可以用20余行代碼來實現。由于不需要了解調度細節,任何人都可以很快的上手實現一個新的變換。試舉幾例:
HDFS 文件:partitions 函數返回 HDFS 文件的所有 block,每個 block 被當做一個 partition。 preferredLocations 返回每個 block 所在的位置,Iterator 會對每個 block 進行讀取。
map:在任意 RDD 上調用 map 會返回一個 MappedRDD 對象,該對象的 partitions 函數和 preferredLocations 與父 RDD 保持一致。對于 iterator,只需要將傳給 map 算子的函數以此作用到其父 RDD 的各個分區即可。
union: 在兩個 RDD 上調用 union 會返回一個新的 RDD,該 RDD 的每個分區由對應的兩個父 RDD 通過窄依賴計算而來。
sample:抽樣函數和 map 大體一致。但該函數會給每個分區保存一個隨機數種子來決定父 RDD 的每個記錄是否保留。
join:在兩個 RDD 上調用 join 操作可能會導致兩個窄依賴(比如其分區都是按待 join 的key 哈希的),兩個寬依賴,或者混合依賴。每種情況下,子 RDD 都會有一個 partitioner 函數,或繼承自父分區,或是默認的hash 分區函數。
實現
Spark 最初版本(論文里提到的),只有 1.4w 行 Scala 代碼,由 mesos 管理資源分配,可以和 Hadoop 生態共用資源,并從 Hadoop/Hbase 中加載數據。對于 Spark 的實現,有幾個值得一說的點: Job 調度,交互式解釋器,內存管理和檢查點機制(checkpointing)。
Job 調度
Spark 調度設計依賴于上一節提到的 RDD 的抽象。它的調度策略和 Dryad 有點像,但又不盡相同。在用戶在某個 RDD 上調用 Action 類型(count,save 等等)的算子時,調度器就會根據用戶代碼中調用算子的順序生成計算拓撲。我們把每一個變換前后的 RDD 當做點,算子產生的 RDD 間的依賴/父子關系當做邊,如此構成一個有向無環圖(DAG)。為了減小傳輸,調度器會將幾個連續的計算進行歸并,稱為階段(Stage),進行階段歸并的依據為是否需要 shuffle,也即是否為寬依賴。這樣,會形成一個新的由階段組成的更精簡的 DAG。
之后,調度器會從目標 RDD 出發,沿著 DAG 圖中的邊往前遍歷,對每個不在內存中的分區進行計算。如果需要計算的分區已經在內存中了,則直接利用結果即可,如上圖所示。
然后,調度器會將任務調度到離其依賴 RDD 的 Partition 近的地方去:
對于寬依賴,Spark 和 MR 一樣,會將其中間結果輸出持久化起來,以簡化容錯。如果某個 Stage 的父 RDD 不可用,調度器就會新提交一些并行運行的任務,來生成這些缺失的分區。不過現在 Spark 還不能對調度器本身故障進行恢復,雖然看起來對 RDD 的譜系圖進行冗余備份或許是一個簡單可行的方案。
最后,現在仍是由用戶 Driver 程序調用 Action 算子來觸發調度任務。但我們正在探索維持一些周期性的檢查性任務,對 RDD 中某些缺失的分區進行補足。
解釋器集成
像 Python 和 Ruby 一樣,Scala 提供交互式的 shell 環境。由于 Spark 將數據保存在內存中,我們希望可以借助 Scala 的這個交互式環境讓用戶對大數據集進行交互式實時的查詢。
Scala 的解釋器對用戶代碼進行解釋執行的通常做法是,將用戶鍵入的每一行 Scala 命令編譯成一個 Java Class 字節碼,然后將其加載到 JVM 中。該類包含一個初始化過的單例實例,實例中包含用戶定義的變量和函數。比如,用戶輸入:
var x = 5 println(x)Scala 解釋器會針對第一行生成一個叫做 Line1 的類,其中有一個 x 的字段,并且將第二行編譯為:println(Line1.getInstance().x)
為了讓 Scala 解釋器能在分布式環境運行,我們在 Spark 中對其進行了以下修改:
下圖反映了我們修改后的 Scala 解釋器生成 Java 對象的過程:
我們發現解釋器在對大型數據集進行交互式查詢時很有幫助,我們計劃對更高級的查詢語言進行支持,如 SQL。
內存管理
Spark 提供了三種存儲 RDD 的方式:
由于 Spark 跑在 JVM 上,因此第一種存儲方式訪問最快,第二種允許用戶犧牲一點性能以換取更高效的內存利用。當數據尺度太大以至于內存裝不下的時候,第三種方式很有用。
為了有效的利用有限的內存,我們在 RDD 分區級別上進行 LRU 式的驅逐策略。即,當我們新計算出一個 RDD 的分區時,如果發現內存不夠用,就會從內存中驅逐出去一個最久沒有使用過的 RDD 的分區。但是,如果這個最久沒有使用過的分區和新計算出的分區屬于同一個 RDD,我們會接著尋找,直到找到一個和當前分區不屬于一個 RDD 并且最久沒用過的分區。因為 Spark 的大部分計算會施加于整個 RDD 上,這樣做可以防止這些分區被反復的計算-驅逐。這個策略在論文成文時用的很好,不過,我們仍然提供給了用戶進行深度控制的接口——指定存儲優先級。
現在每個 Spark 實例擁有自己的分立的內存空間,我們計劃將來提供跨 Spark 實例的統一的內存管理。
檢查點機制
盡管所有失敗的 RDD 都可以通過譜系(lineage)來重新計算得出,但是對于某些譜系特別長的 RDD 來說,這將是一個很耗時間的操作,因此提供 RDD 級別的外存檢查點(checkpointing)可能會很有用。
對于具有很長譜系圖,并且譜系圖中存在很多寬依賴的 RDD,在外存上做檢查點會很有幫助,因為某一兩個的分區可能會引起全盤的重算,對于這種又臭又長的計算拓撲來說,依據譜系圖重算無疑非常浪費時間。而對于只有窄依賴的、不那么長譜系圖來說,在外存做檢查點可能有些得不償失,因為他們可以很簡單的并行計算出來。
Spark 現階段提供檢查點的 API (給 persist 函數傳 REPLICATE 標志),然后由用戶來決定是否對其持久化。但我們在思考,是否可以進行一些自動的檢查點計算。由于調度器知道每個數據集的內存占用以及計算使用時間,我們或許可以選擇性的對某些關鍵 RDD進行持久化以最小化宕機恢復時間。
最后,由于 RDD 的只讀特性,我們在做檢查點時不用像通用共享內存模型那樣過分考慮一致性的問題,因此可以用后臺線程默默地干這些事情而不用影響主要工作流,也不用使用復雜的分布式的快照算法來解決一致性問題。
注解
[1] 隱式與顯示顯式:在這里可以理解為,顯式是把數據集這個概念完整的構造出來,定義他的內涵和邊界,并基于其上做一些外延拓展。而隱式只是事實上復用了數據,但并沒有定義被復用的數據格式。
[2] 聲明式(Declarative)語言與命令式(Imperative)語言:前者例子有 SQL,HTML;后者例子最常見的有 Shell,其他的常見編程語言 C,Java,Python 也屬于此列。前者的好處在于將"干什么"和"怎么干"這兩件事解耦,這樣一來就可以開發不同的執行引擎,針對不同場景來優化"怎么干"這件事。而后者會告訴機器以特定的順序執行特定的操作,與直覺一致,是一般編程語言的路子。
掃一掃關注我的公眾號:分布式點滴
總結
以上是生活随笔為你收集整理的spark如何防止内存溢出_Spark 理论基石 —— RDD的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis setnx 分布式锁_Spr
- 下一篇: string 中的offset_Kafk