Spark RDD使用详解1--RDD原理
RDD簡介
????? 在集群背后,有一個非常重要的分布式數據架構,即彈性分布式數據集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的并能夠被并行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對于迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升比較大。
????? (1)RDD的特點
????? 1)創建:只能通過轉換 ( transformation ,如map/filter/groupBy/join 等,區別于動作 action) 從兩種數據源中創建 RDD 1 )穩定存儲中的數據; 2 )其他 RDD。
????? 2)只讀:狀態不可變,不能修改。
????? 3)分區:支持使 RDD 中的元素根據那個 key 來分區 ( partitioning ) ,保存到多個結點上。還原時只會重新計算丟失分區的數據,而不會影響整個系統。
????? 4)路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關于它是如何從其他 RDD 產生而來的。
????? 5)持久化:支持將會被重用的 RDD 緩存 ( 如 in-memory 或溢出到磁盤 )。
????? 6)延遲計算: Spark 也會延遲計算 RDD ,使其能夠將轉換管道化 (pipeline transformation)。
????? 7)操作:豐富的轉換(transformation)和動作 ( action ) , count/reduce/collect/save 等。
????? 執行了多少次transformation操作,RDD都不會真正執行運算(記錄lineage),只有當action操作被執行時,運算才會觸發。
??????(2)RDD的好處
????? 1)RDD只能從持久存儲或通過Transformations操作產生,相比于分布式共享內存(DSM)可以更高效實現容錯,對于丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
????? 2)RDD的不變性,可以實現類Hadoop MapReduce的推測式執行。
????? 3)RDD的數據分區特性,可以通過數據的本地性來提高性能,這不Hadoop MapReduce是一樣的。
????? 4)RDD都是可序列化的,在內存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現在的MapReduce。
????? 5)批量操作:任務能夠根據數據本地性 (data locality) 被分配,從而提高性能。
??????(3)RDD的內部屬性
????? 通過RDD的內部屬性,用戶可以獲取相應的元數據信息。通過這些信息可以支持更復雜的算法或優化。
????? 1)分區列表:通過分區列表可以找到一個RDD中包含的所有分區及其所在地址。
????? 2)計算每個分片的函數:通過函數可以對每個數據塊進行RDD需要進行的用戶自定義函數運算。
????? 3)對父RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD都有依賴。
????? 4)可選:key-value型的RDD是根據哈希來分區的,類似于mapreduce當中的Paritioner接口,控制key分到哪個reduce。
????? 5)可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。(存儲的是一個表,可以將處理的分區“本地化”)???
?
?//只計算一次
protected def getPartitions: Array[Partition]
//對一個分片進行計算,得出一個可遍歷的結果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只計算一次,計算RDD對父RDD的依賴
protected def getDependencies: Seq[Dependency[_]] = deps
//可選的,分區的方法,針對第4點,類似于mapreduce當中的Paritioner接口,控制key分到哪個reduce
@transient val partitioner: Option[Partitioner] = None
//可選的,指定優先位置,輸入參數是split分片,輸出結果是一組優先的節點位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
?
????? (4)RDD的存儲與分區
????? 1)用戶可以選擇不同的存儲級別存儲RDD以便重用。
????? 2)當前RDD默認是存儲于內存,但當內存不足時,RDD會spill到disk。
????? 3)RDD在需要進行分區把數據分布于集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
????? RDD根據useDisk、useMemory、useOffHeap、deserialized、replication參數的組合定義了以下存儲級別:
?
?//存儲等級定義:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
?
????? (5)RDD的容錯機制
??????RDD的容錯機制實現分布式數據集容錯方法有兩種:數據檢查點和記錄更新,RDD采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然后創建某個RDD的變換序列(血統 lineage)存儲下來;變換序列指,每個RDD都包含了它是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴。窄依賴:子RDD中的每個數據塊只依賴于父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴于父RDD中的所有數據塊。例如:map變換,子RDD中的數據塊只依賴于父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴于多塊父RDD中的數據塊,因為一個key可能分布于父RDD的任何一個數據塊中, 將依賴關系分類的兩個特性:第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所有數據都計算完成之后,并且父RDD的計算結果進行hash并傳到對應節點上之后才能計算子RDD。第二,數據丟失時,對于窄依賴只需要重新計算丟失的那一塊數據來恢復;對于寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。也是這兩個特性要求對于不同依賴關系要采取不同的任務調度機制和容錯恢復機制。
????? (6)Spark計算工作流
????? 圖1-5中描述了Spark的輸入、運行轉換、輸出。在運行轉換中通過算子對RDD進行轉換。算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。
????? ·輸入:在Spark程序運行中,數據從外部數據空間(例如,HDFS、Scala集合或數據)輸入到Spark,數據就進入了Spark運行時數據空間,會轉化為Spark中的數據塊,通過BlockManager進行管理。
????? ·運行:在Spark數據輸入形成RDD后,便可以通過變換算子fliter等,對數據操作并將RDD轉化為新的RDD,通過行動(Action)算子,觸發Spark提交作業。如果數據需要復用,可以通過Cache算子,將數據緩存到內存。
????? ·輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS)或Scala數據或集合中(collect輸出到Scala集合,count返回Scala Int型數據)。
????? Spark的核心數據模型是RDD,但RDD是個抽象類,具體由各子類實現,如MappedRDD、ShuffledRDD等子類。Spark將常用的大數據操作都轉化成為RDD的子類。
?
RDD編程模型
? ? ? 來看一段代碼:textFile算子從HDFS讀取日志文件,返回“file”(RDD);filter算子篩出帶“ERROR”的行,賦給 “errors”(新RDD);cache算子把它緩存下來以備未來使用;count算子返回“errors”的行數。RDD看起來與Scala集合類型 沒有太大差別,但它們的數據和運行模型大相迥異。
? ? ? 上圖給出了RDD數據模型,并將上例中用到的四個算子映射到四種算子類型。Spark程序工作在兩個空間中:Spark RDD空間和Scala原生數據空間。在原生數據空間里,數據表現為標量(scalar,即Scala基本類型,用橘色小方塊表示)、集合類型(藍色虛線 框)和持久存儲(紅色圓柱)。
? ? ? 下圖描述了Spark運行過程中通過算子對RDD進行轉換, 算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。
圖1 兩個空間的切換,四類不同的RDD算子
? ? ? 輸入算子(橘色箭頭)將Scala集合類型或存儲中的數據吸入RDD空間,轉為RDD(藍色實線框)。輸入算子的輸入大致有兩類:一類針對 Scala集合類型,如parallelize;另一類針對存儲數據,如上例中的textFile。輸入算子的輸出就是Spark空間的RDD。
? ? ? 因為函數語義,RDD經過變換(transformation)算子(藍色箭頭)生成新的RDD。變換算子的輸入和輸出都是RDD。RDD會被劃分 成很多的分區 (partition)分布到集群的多個節點中,圖1用藍色小方塊代表分區。注意,分區是個邏輯概念,變換前后的新舊分區在物理上可能是同一塊內存或存 儲。這是很重要的優化,以防止函數式不變性導致的內存需求無限擴張。有些RDD是計算的中間結果,其分區并不一定有相應的內存或存儲與之對應,如果需要 (如以備未來使用),可以調用緩存算子(例子中的cache算子,灰色箭頭表示)將分區物化(materialize)存下來(灰色方塊)。
? ? ? 一部分變換算子視RDD的元素為簡單元素,分為如下幾類:
-
輸入輸出一對一(element-wise)的算子,且結果RDD的分區結構不變,主要是map、flatMap(map后展平為一維RDD);
-
輸入輸出一對一,但結果RDD的分區結構發生了變化,如union(兩個RDD合為一個)、coalesce(分區減少);
-
從輸入中選擇部分元素的算子,如filter、distinct(去除冗余元素)、subtract(本RDD有、它RDD無的元素留下來)和sample(采樣)。
? ? ? 另一部分變換算子針對Key-Value集合,又分為:
-
對單個RDD做element-wise運算,如mapValues(保持源RDD的分區方式,這與map不同);
-
對單個RDD重排,如sort、partitionBy(實現一致性的分區劃分,這個對數據本地性優化很重要,后面會講);
-
對單個RDD基于key進行重組和reduce,如groupByKey、reduceByKey;
-
對兩個RDD基于key進行join和重組,如join、cogroup。
? ? ? 后三類操作都涉及重排,稱為shuffle類操作。
? ? ?從RDD到RDD的變換算子序列,一直在RDD空間發生。這里很重要的設計是lazy evaluation:計算并不實際發生,只是不斷地記錄到元數據。元數據的結構是DAG(有向無環圖),其中每一個“頂點”是RDD(包括生產該RDD 的算子),從父RDD到子RDD有“邊”,表示RDD間的依賴性。Spark給元數據DAG取了個很酷的名字,Lineage(世系)。這個 Lineage也是前面容錯設計中所說的日志更新。
? ? ? Lineage一直增長,直到遇上行動(action)算子(圖1中的綠色箭頭),這時 就要evaluate了,把剛才累積的所有算子一次性執行。行動算子的輸入是RDD(以及該RDD在Lineage上依賴的所有RDD),輸出是執行后生 成的原生數據,可能是Scala標量、集合類型的數據或存儲。當一個算子的輸出是上述類型時,該算子必然是行動算子,其效果則是從RDD空間返回原生數據空間。
?
RDD運行邏輯
? ? ? 如圖所示,在Spark應用中,整個執行流程在邏輯上運算之間會形成有向無環圖。Action算子觸發之后會將所有累積的算子形成一個有向無環圖,然后由調度器調度該圖上的任務進行運算。Spark的調度方式與MapReduce有所不同。Spark根據RDD之間不同的依賴關系切分形成不同的階段(Stage),一個階段包含一系列函數進行流水線執行。圖中的A、B、C、D、E、F、G,分別代表不同的RDD,RDD內的一個方框代表一個數據塊。數據從HDFS輸入Spark,形成RDD A和RDD C,RDD C上執行map操作,轉換為RDD D,RDD B和RDD F進行join操作轉換為G,而在B到G的過程中又會進行Shuffle。最后RDD G通過函數saveAsSequenceFile輸出保存到HDFS中。
?
RDD依賴關系
? ? ? RDD的依賴關系如下圖所示:
? ? ? 窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每個分區都只被子 RDD 的一個分區所使用,例如map、filter。相應的,那么寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴,例如groupByKey、reduceByKey等操作。如果父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,否則的話就是寬依賴。
? ? ? 這種劃分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基于一對一的關系,可以在 filter 之后執行 map 。其次,窄依賴支持更高效的故障還原。因為對于窄依賴,只有丟失的父 RDD 的分區需要重新計算。而對于寬依賴,一個結點的故障可能導致來自所有父 RDD 的分區丟失,因此就需要完全重新執行。因此對于寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出一樣。
? ? ? 特別說明:對于join操作有兩種情況,如果join操作的使用每個partition僅僅和已知的Partition進行join,此時的join操作就是窄依賴;其他情況的join操作就是寬依賴;因為是確定的Partition數量的依賴關系,所以就是窄依賴,得出一個推論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨著RDD數據規模的改變而改變)
?
? ? ? 如何劃分Stage如下圖所示:
? ? ? Stage劃分的依據就是寬依賴,什么時候產生寬依賴呢?例如reduceByKey,groupByKey等Action。
? ? ? 1.從后往前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到Stage中;
? ? ? 2.每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition數量決定的;
? ? ? 3.最后一個Stage里面的任務的類型是ResultTask,前面所有其他Stage里面的任務類型都是ShuffleMapTask;
? ? ? 4.代表當前Stage的算子一定是該Stage的最后一個計算步驟;
? ? ? 補充:Hadoop中的MapReduce操作中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;在一個Stage內部,首先是算子合并,也就是所謂的函數式編程的執行的時候最終進行函數的展開從而把一個Stage內部的多個算子合并成為一個大算子(其內部包含了當前Stage中所有算子對數據的計算邏輯);其次是由于Transformation操作的Lazy特性!!在具體算子交給集群的Executor計算之前,首先會通過Spark Framework(DAGScheduler)進行算子的優化。
?
RDD如何操作
????? (1)RDD的創建方式
????? 1)從Hadoop文件系統(或與Hadoop兼容的其他持久化存儲系統,如Hive、Cassandra、HBase)輸入(例如HDFS)創建。
????? 2)從父RDD轉換得到新RDD。
????? 3)通過parallelize或makeRDD將單機數據創建為分布式RDD。
??????(2)RDD的兩種操作算子
????? 對于RDD可以有兩種操作算子:轉換(Transformation)與行動(Action)。
????? 1)轉換(Transformation):Transformation操作是延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操作不是馬上執行,需要等到有Action操作的時候才會真正觸發運算。
????? 2)行動(Action):Action算子會觸發Spark提交作業(Job),并將數據輸出Spark系統。
? ???1.Transformation具體內容:
? ???2.Action具體內容:
?
總結
? ? ? 相比MapReduce,Spark提供了更加優化和復雜的執行流。讀者還可以深入了解Spark的運行機制與Spark算子,這樣能更加直觀地了解API的使用。Spark提供了更加豐富的函數式算子,這樣就為Spark上層組件的開發奠定了堅實的基礎。后續文章將詳細介紹Spark算子源代碼及示例。
總結
以上是生活随笔為你收集整理的Spark RDD使用详解1--RDD原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hive 基础(2):库、表、字段、交互
- 下一篇: Spark RDD使用详解3--Valu