日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark如何防止内存溢出_Spark 理论基石 —— RDD

發布時間:2025/3/21 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark如何防止内存溢出_Spark 理论基石 —— RDD 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

RDD,學名可伸縮的分布式數據集(Resilient Distributed Dataset)。是一種對數據集形態的抽象,基于此抽象,使用者可以在集群中執行一系列計算,而不用將中間結果落盤。而這正是之前 MR 抽象的一個重要痛點,每一個步驟都需要落盤,使得不必要的開銷很高。

對于分布式系統,容錯支持是必不可少的。為了支持容錯,RDD 只支持粗粒度的變換。即,輸入數據集是 immutable (或者說只讀)的,每次運算會產生新的輸出。不支持對一個數據集中細粒度的更新操作。這種約束,大大簡化了容錯支持,并且能滿足很大一類的計算需求。

初次接觸 RDD 的概念的時候,不大能夠理解為什么要以數據集為中心做抽象。后來隨著不斷深入的了解,對數據集的一致性抽象正是計算流水線(pipeline)得以存在和優化的精髓所在。在定義了數據集的基本屬性(不可變,分區,依賴關系,存放位置等)后,就可以在此基礎上施加各種高階算子,以構建 DAG 執行引擎,并做適當優化。從這個角度來說,RDD 實在是一種精妙設計。

例行總結一下 RDD 論文的主要設計點有:

  • 顯式抽象。將運算中的數據集進行顯式抽象,定義了其接口和屬性。由于數據集抽象的統一,從而可以將不同的計算過程組合起來進行統一的 DAG 調度。
  • 基于內存。相較于 MapReduce 中間結果必須落盤,RDD 通過將結果保存在內存中,從而大大降低了單個算子計算延遲以及不同算子之間的加載延遲。
  • 寬窄依賴。在進行 DAG 調度時,定義了寬窄依賴的概念,并以此進行階段劃分,優化調度計算。
  • 譜系容錯。主要依賴譜系圖計算來進行錯誤恢復,而非進行冗余備份,因為內存實在是有限,只能以計算換存儲了。
  • 交互查詢。修改了 Scala 的解釋器,使得可以交互式的查詢基于多機內存的大型數據集。進而支持類 SQL 等高階查詢語言。
  • 作者:青藤木鳥 Muniao's blog 轉載請注明出處

    小引

    Dryad 和 MapReduce 是業已流行的大數據分析工具。它們給用戶提供了一些高階算子來使用,而不用去關心底層的分布式和容錯細節。但它們都缺少對分布式內存的抽象,不同計算過程之間只能夠通過外存來耦合:前驅任務將計算結果寫到外存上去,后繼任務再將其作為輸入加載到內存,然后才能接著執行后繼計算任務。這樣的設計有兩個很大的劣勢:復用性差、延遲較高。這對于像 Page-Rand,K-Means,LR 等要求迭代式計算的機器學習算法(需要數據復用)極其不友好;對于一些隨機的交互式查詢(要求延遲低)也是個災難。因為他們將大部分的時間都耗費在數據備份、硬盤 IO 和數據序列化之上。

    在 RDD 之前,為了解決數據復用的問題,業界已有諸多嘗試。包括將中間結果放在內存中的迭代式圖計算系統——Pregel,以及將多個 MR 串在一塊,緩存循環不變量的 HaLoop。但這些系統只支持受限的計算模型(比如MR),而且只進行隱式[1]的數據復用。如何進行更通用的數據復用,以支持更復雜的查詢計算,仍是一個難題。

    RDD 正是為解決這個問題而設計,高效地復用數據的一個數據結構抽象。RDD 支持數據容錯、數據并行;在此之上,能夠讓用戶利用多機內存、控制數據分區、構建一系列運算過程。從而解決很多應用中連續計算過程對于數據復用的需求。

    其中比較難的一個設計是如何針對內存數據進行高效的容錯?,F有的一些基于集群內存的系統,比如分布式KV、共享內存、Piccolo 都提供一種可以細粒度的修改的可變數據集抽象。為了支持這種抽象之上的容錯,就需要進行數據多機冗余或者操作日志備份。這些操作都會導致多機間大量的數據傳輸,由于網絡帶寬遠慢于 RAM,使得分布式利用內存這件事失去其優勢。

    與之相對,RDD 只提供粗粒度的、基于整個數據集的計算接口,即數據集中的所有條目都施加同一種操作。這樣一來,為了容錯,我們只需要備份每個操作而非數據本身(因為是整體更新的);在某個分區數據出現問題進行錯誤恢復時,只需要從原始數據集出發,按順序再算一遍即可。

    初看起來,這種計算抽象很受限,但它其實能滿足現有的一大類的集群計算需求,包括 MR、 DryadLINQ、 SQL、Pregel 和 HaLoop。并且能滿足一些其他計算需求,比如說交互式計算。RDD 的實現系統 Spark,提供類似 DryadLINQ 的高階算子,應該是第一個提供交互式的集群運算接口。

    RDD

    本節首先給出 RDD 的詳細定義,然后介紹下 Spark 的中針對 RDD 的操作接口,繼而對比了 RDD 與提供細粒度更新接口的共享內存抽象優劣。最后就 RDD 的局限性討論一下。

    RDD 抽象

    RDD 是一個基于分區的、只讀的數據記錄集抽象。RDD 只可以通過對持久存儲或其他 RDD 進行確定性運算得來,這種運算被稱為變換。常用的變換算子包括:map,filter 和 join。

    RDD 沒有選擇不斷的做檢查點以進行容錯,而是會記下 RDD 從最初的外存的數據集變化而來的變化路徑,也就是其譜系(lineage)。理論上所有的 RDD 都可以在出錯后從外存中依據譜系圖進行重建。一般來說,重建的粒度是分區(Partition)而非整個數據集,一來代價更小,二來不同分區可能在不同機器上。

    用戶可以對 RDD 的兩個方面進行控制:持久化和分區控制。對于前者,如果某些 RDD 需要復用,那么用戶可以指示系統按照某種策略將其進行持久化。后者來說,用戶可以定制分區路由函數,將數據集合中的記錄按照某個鍵值路由到不同分區。比如進行 Join 操作的時候,可以講待 Join 數據集按照相同的策略進行分區,以并行 Join。

    Spark 編程接口

    Spark 通過暴露與編程語言集成的算子來提供操作 RDD 的接口。 其中 RDD 表現為編程語言中的類,而 RDD 的算子為作用于這些類上的函數。之前的系統如 DryadLINQ 和 FlumeJava 也使用了類似的形式。

    用戶使用 RDD 時,首先將數據從持久化存儲中通過變換(Transformations,如 map 或者 filter)將其載入內存,然后可以對 RDD 施加任何系統支持的一系列變換,最后利用動作(Action)算子,將 RDD 重新持久化到外存中或者將控制權交還用戶。和 DryadLINQ 一樣,這個加載-變換-落盤的過程是聲明式(Declarative,或者說是惰式[2])的,Spark 在拿到整個拓撲后會利用執行引擎進行執行優化(比如將并行化、流水線化,之后會進一步討論)。

    此外很重要的一個接口是 persist,可以由用戶來告訴系統哪些 RDD 需要持久化,如何持久化(本機硬盤還是跨機器存儲),如果有多個 RDD 需要持久化,那么優先級如何確定。Spark 默認將 RDD 保存在內存中,如果內存不夠用了會根據用戶配置將數據溢出(spill)到硬盤上。

    舉個栗子

    假設我們相對存在于 HDFS 上的日志文件,找出錯誤條目,針對出現 hdfs 關鍵字的具體條目進行分析。利用 Spark 接口,使用 Scala 語言實現,代碼如下:

    lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR")) errors.persist() ? // Return the time fields of errors mentioning // HDFS as an array (assuming time is field // number 3 in a tab-separated format): errors.filter(_.contains("HDFS")).map(_.split(’t’)(3)).collect()

    第一行基于某個 hdfs 上的文件定義一個 rdd(每一行作為集合中的一個條目)。第二行通過 filter 變換生成新的 rdd,第三行請求 spark 將其結果進行暫存。最后一行是鏈式操作,以一個 collect 的動作結尾,求出包含 HDFS 關鍵字的所有行數的各個字段。

    其計算的譜系圖(lineage)如下:

    有兩點需要注意:

  • 直到遇到 collect 這個動作(Action)之前都沒有發生實際的運算。
  • 鏈式操作時不保存中間結果;
  • 由于第三行將結果在內存中進行了緩存,因此還可以基于此做其他動作。比如,計算包含 'MySQL' 關鍵字的錯誤條數:

    // Count errors mentioning MySQL: errors.filter(_.contains("MySQL")).count()

    RDD 模型的優點

    為了理解 RDD 帶來的好處,可以看下面一個表,將 RDD 與 DSM (Distributed Shared Memory)做了詳細對比。DSM 在這里是一個很寬泛的抽象,不僅包括一般的內存共享系統,還包括其他支持細粒度的狀態更新的框架,比如說 Piccolo、分布式數據庫等。

    首先, DSM 和 RDD 最主要的區別在于,DSM 支持對數據集細粒度的更新。即,可以對任意內存位置進行更新。而 RDD 舍棄了這一點,只允許批量的寫入數據,從而提高了容錯效率:

  • 使用 lineage 來按需恢復數據,而不用定期 snapshot,減小了不必要開銷。
  • 每個 Partition 出錯后可以單獨進行恢復,而不用進行全數據集的重建。
  • 其次,RDD 的不可變的特點允許系統叫較容易的對某些計算進行遷移。比如說 MR 中的某些 Stragger 任務就可以很方便的遷移到其他計算節點上去,因為其輸入數據一定不會被改變,因此不用考慮一致性的問題。

    最后還有兩個好處值得一提:

  • 由于只支持批量計算,因此調度系統可以比較好的利用數據局部性的特點加快運算速度。
  • 如果集群內存不夠的話,只要數據支持迭代,就可以分批加載到內存進行運算,或者分批將結果 spill 到外存。如此一來,在內存不夠時能提供很優雅的退化操作,并不太損失性能。
  • RDD 不適用的場景

    如前所述,RDD 適用于針對全數據集統一處理的粗粒度變換的抽象。相對的,就不適用于要求對數據進行細粒度的、異步更新的數據集。比如說 web 應用,再比如說爬蟲等等。對于這些引用類型,傳統的快照+操作日志的容錯方式可能更適合一些。如數據庫 RAMCloud , Percolator 和 Piccolo。 RDD 的目標在于批量分析型應用,而將這些異步應用的需求留給那些專有系統。

    Spark 編程接口

    Spark 利用 Scala 語言作為 RDD 抽象的接口,因為 Scala 兼顧了精確(其函數式語義適合交互式場景)與高效(使用靜態類型)。當然,對于 RDD 本身來說,不限定于任何特定的語言表達。下面從執行流程與代碼分發兩個方面來詳細說明下 Spark 是如何執行用戶代碼的。

    開發者利用 Spark 提供的庫編寫驅動程序 (driver programe)以使用 Spark。驅動程序會定義一到多個 RDD,并對其進行各種變換。Spark 提供的庫會連接 Spark 集群,生成計算拓撲,并將拓撲分散到多個 workers 上去進行執行,同時記下變換的譜系(lineage)。這些 workers 是分散在 Spark 集群內各個機器上的常駐進程,它們在內存里保存計算過程中生成的 RDD 的各個分區。

    像前面舉的例子一樣,開發者需要將函數作為參數傳給 map 等 Spark 算子。Spark 會將這些函數(或者說閉包)序列化為 Java 對象,然后分發給執行節點進行加載。閉包所涉及的變量會被當做上述生成對象的字段值。RDD 本身會被包裝成靜態類型的參數進行傳遞。由于 Scala 支持類型推斷,大部分例子都省掉了 RDD 數據類型。

    盡管 Spark 暴露的 Scala 的 RDD 接口在概念上看起來很簡單,但實在實現上有一些很臟的角落,比如說 Scala 的閉包得使用反射, 比如說盡量避免修改 Scala 的解釋器。

    Spark 中的 RDD 操作

    下表列出了 Spark 中支持的 RDD 操作。如前面所說,變換(transformations)是生成新 RDD 的惰性算子,而動作(actions)是觸發調度的算子,它會返回一個結果或者將數據寫到外存中。

    需要注意的是:

  • 有些操作如 join,要求操作數 RDD 必須是鍵值對(key value pairs)。
  • map 是一對一的映射,而 flatMap 是類似于 MapReduce 中一對一或者一對多的映射。
  • save 會將 RDD 進行持久化。
  • groupByKey,reduceByKey 和 sort 都會導致 RDD 中不同分區進行再哈希或者重排。
  • RDD 的表示

    提供 RDD 抽象的一個難點在于,如何高效的跟蹤譜系并能提供豐富的變換支持。最后我們選用了基于圖的調度模型,將調度和算子進行了解耦。從而能夠在不改變調度模塊邏輯的前提下,很方便的增加算子支持。具體來說,RDD 抽象的核心組成主要有以下五個部分:

  • 分區集(partition set)。分區是每個 RDD 的最小構成單元。
  • 依賴集(dependencies set)。主要是 RDD 間的父子依賴關系。
  • 變換函數(compute function)。作用于分區上的變換函數,可以由幾個父分區計算得到一個子分區。
  • 分區模式(partition scheme)。該 RDD 的分區是基于哈希分片的還是直接切分的。
  • 數據放置(data placement)。知道分區的存放位置可以進行計算優化。
  • 在 RDD 的接口設計中最有趣的一個點是如何對 RDD 間的依賴關系進行規約。最后發現可以將所有依賴歸納為兩種類型:

  • 窄依賴(narrow dependencies):父 RDD 的分區最多被一個子 RDD 的分區所依賴,比如 map。
  • 寬依賴(wide dependencies):父 RDD 的分區可能被多個子 RDD 的分區所依賴,比如 join。
  • 如此歸納的原因主要有兩點。

    調度優化。對于窄依賴,可以對分區間進行并行流水化調度,先計完成某個窄依賴算子(比如說 map)的分區不用等待其他分區而直接進行下一個窄依賴算子(比如 filter )的運算。與之相對,寬依賴的要求父 RDD 的所有分區就緒,并進行跨節點的傳送后,才能進行計算。類似于 MapReduce 中的 shuffle。

    數據恢復。在某個分區出現錯誤或者丟失時,窄依賴的恢復更為高效。因為涉及到的父分區相對較少,并且可以并行恢復。而對于寬依賴,由于依賴復雜(如上圖,子 RDD 的每個分區都會依賴父 RDD 的所有分區),一個分區的丟失可能就會引起全盤的重新計算。

    這樣將調度和算子解耦的設計大大簡化了變換的實現,大部分變換都可以用20余行代碼來實現。由于不需要了解調度細節,任何人都可以很快的上手實現一個新的變換。試舉幾例:

    HDFS 文件:partitions 函數返回 HDFS 文件的所有 block,每個 block 被當做一個 partition。 preferredLocations 返回每個 block 所在的位置,Iterator 會對每個 block 進行讀取。

    map:在任意 RDD 上調用 map 會返回一個 MappedRDD 對象,該對象的 partitions 函數和 preferredLocations 與父 RDD 保持一致。對于 iterator,只需要將傳給 map 算子的函數以此作用到其父 RDD 的各個分區即可。

    union: 在兩個 RDD 上調用 union 會返回一個新的 RDD,該 RDD 的每個分區由對應的兩個父 RDD 通過窄依賴計算而來。

    sample:抽樣函數和 map 大體一致。但該函數會給每個分區保存一個隨機數種子來決定父 RDD 的每個記錄是否保留。

    join:在兩個 RDD 上調用 join 操作可能會導致兩個窄依賴(比如其分區都是按待 join 的key 哈希的),兩個寬依賴,或者混合依賴。每種情況下,子 RDD 都會有一個 partitioner 函數,或繼承自父分區,或是默認的hash 分區函數。

    實現

    Spark 最初版本(論文里提到的),只有 1.4w 行 Scala 代碼,由 mesos 管理資源分配,可以和 Hadoop 生態共用資源,并從 Hadoop/Hbase 中加載數據。對于 Spark 的實現,有幾個值得一說的點: Job 調度,交互式解釋器,內存管理和檢查點機制(checkpointing)。

    Job 調度

    Spark 調度設計依賴于上一節提到的 RDD 的抽象。它的調度策略和 Dryad 有點像,但又不盡相同。在用戶在某個 RDD 上調用 Action 類型(count,save 等等)的算子時,調度器就會根據用戶代碼中調用算子的順序生成計算拓撲。我們把每一個變換前后的 RDD 當做點,算子產生的 RDD 間的依賴/父子關系當做邊,如此構成一個有向無環圖(DAG)。為了減小傳輸,調度器會將幾個連續的計算進行歸并,稱為階段(Stage),進行階段歸并的依據為是否需要 shuffle,也即是否為寬依賴。這樣,會形成一個新的由階段組成的更精簡的 DAG。

    之后,調度器會從目標 RDD 出發,沿著 DAG 圖中的邊往前遍歷,對每個不在內存中的分區進行計算。如果需要計算的分區已經在內存中了,則直接利用結果即可,如上圖所示。

    然后,調度器會將任務調度到離其依賴 RDD 的 Partition 近的地方去:

  • 如果 Partition 在某節點的內存中,則將任務的調度到該節點上。
  • 如果 Partition 還在硬盤上,則將任務調度到 preferredLocations 函數返回的地方去(如 HDFS 文件)。
  • 對于寬依賴,Spark 和 MR 一樣,會將其中間結果輸出持久化起來,以簡化容錯。如果某個 Stage 的父 RDD 不可用,調度器就會新提交一些并行運行的任務,來生成這些缺失的分區。不過現在 Spark 還不能對調度器本身故障進行恢復,雖然看起來對 RDD 的譜系圖進行冗余備份或許是一個簡單可行的方案。

    最后,現在仍是由用戶 Driver 程序調用 Action 算子來觸發調度任務。但我們正在探索維持一些周期性的檢查性任務,對 RDD 中某些缺失的分區進行補足。

    解釋器集成

    像 Python 和 Ruby 一樣,Scala 提供交互式的 shell 環境。由于 Spark 將數據保存在內存中,我們希望可以借助 Scala 的這個交互式環境讓用戶對大數據集進行交互式實時的查詢。

    Scala 的解釋器對用戶代碼進行解釋執行的通常做法是,將用戶鍵入的每一行 Scala 命令編譯成一個 Java Class 字節碼,然后將其加載到 JVM 中。該類包含一個初始化過的單例實例,實例中包含用戶定義的變量和函數。比如,用戶輸入:

    var x = 5 println(x)

    Scala 解釋器會針對第一行生成一個叫做 Line1 的類,其中有一個 x 的字段,并且將第二行編譯為:println(Line1.getInstance().x)

    為了讓 Scala 解釋器能在分布式環境運行,我們在 Spark 中對其進行了以下修改:

  • 類代碼傳輸(Class shipping):為了讓工作節點(Worker Nodes)可以拉取驅動節點(Driver Node)上解釋器用戶輸入編譯成的字節碼,我們讓解釋器可以通過 HTTP 將每個類的訪問開放出來。
  • 代碼生成修改(Modified code generation):Scala 解釋器在處理不同行的訪問時,會通過一個靜態方法來獲取其初始化后單例,進而訪問上一行的變量 Line.x。但是我們只能通過 HTTP 傳字節碼而沒有將初始化后實例(即 x 已經被賦值)傳過來,因此工作節點不能訪問到 x。因此我們改變了代碼生成邏輯,使得不同行之間能夠直接引用實例。
  • 下圖反映了我們修改后的 Scala 解釋器生成 Java 對象的過程:

    我們發現解釋器在對大型數據集進行交互式查詢時很有幫助,我們計劃對更高級的查詢語言進行支持,如 SQL。

    內存管理

    Spark 提供了三種存儲 RDD 的方式:

  • 內存中沒有序列化過的 Java 對象
  • 內存中序列化過的數據
  • 磁盤
  • 由于 Spark 跑在 JVM 上,因此第一種存儲方式訪問最快,第二種允許用戶犧牲一點性能以換取更高效的內存利用。當數據尺度太大以至于內存裝不下的時候,第三種方式很有用。

    為了有效的利用有限的內存,我們在 RDD 分區級別上進行 LRU 式的驅逐策略。即,當我們新計算出一個 RDD 的分區時,如果發現內存不夠用,就會從內存中驅逐出去一個最久沒有使用過的 RDD 的分區。但是,如果這個最久沒有使用過的分區和新計算出的分區屬于同一個 RDD,我們會接著尋找,直到找到一個和當前分區不屬于一個 RDD 并且最久沒用過的分區。因為 Spark 的大部分計算會施加于整個 RDD 上,這樣做可以防止這些分區被反復的計算-驅逐。這個策略在論文成文時用的很好,不過,我們仍然提供給了用戶進行深度控制的接口——指定存儲優先級。

    現在每個 Spark 實例擁有自己的分立的內存空間,我們計劃將來提供跨 Spark 實例的統一的內存管理。

    檢查點機制

    盡管所有失敗的 RDD 都可以通過譜系(lineage)來重新計算得出,但是對于某些譜系特別長的 RDD 來說,這將是一個很耗時間的操作,因此提供 RDD 級別的外存檢查點(checkpointing)可能會很有用。

    對于具有很長譜系圖,并且譜系圖中存在很多寬依賴的 RDD,在外存上做檢查點會很有幫助,因為某一兩個的分區可能會引起全盤的重算,對于這種又臭又長的計算拓撲來說,依據譜系圖重算無疑非常浪費時間。而對于只有窄依賴的、不那么長譜系圖來說,在外存做檢查點可能有些得不償失,因為他們可以很簡單的并行計算出來。

    Spark 現階段提供檢查點的 API (給 persist 函數傳 REPLICATE 標志),然后由用戶來決定是否對其持久化。但我們在思考,是否可以進行一些自動的檢查點計算。由于調度器知道每個數據集的內存占用以及計算使用時間,我們或許可以選擇性的對某些關鍵 RDD進行持久化以最小化宕機恢復時間。

    最后,由于 RDD 的只讀特性,我們在做檢查點時不用像通用共享內存模型那樣過分考慮一致性的問題,因此可以用后臺線程默默地干這些事情而不用影響主要工作流,也不用使用復雜的分布式的快照算法來解決一致性問題。

    注解

    [1] 隱式與顯示顯式:在這里可以理解為,顯式是把數據集這個概念完整的構造出來,定義他的內涵和邊界,并基于其上做一些外延拓展。而隱式只是事實上復用了數據,但并沒有定義被復用的數據格式。

    [2] 聲明式(Declarative)語言與命令式(Imperative)語言:前者例子有 SQL,HTML;后者例子最常見的有 Shell,其他的常見編程語言 C,Java,Python 也屬于此列。前者的好處在于將"干什么"和"怎么干"這兩件事解耦,這樣一來就可以開發不同的執行引擎,針對不同場景來優化"怎么干"這件事。而后者會告訴機器以特定的順序執行特定的操作,與直覺一致,是一般編程語言的路子。


    掃一掃關注我的公眾號:分布式點滴

    總結

    以上是生活随笔為你收集整理的spark如何防止内存溢出_Spark 理论基石 —— RDD的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。