RDD浅谈
RDD概念:Resilient Distributed Datasets
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可并行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
同時,RDD還提供了一組豐富的操作來操作數據。RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(transformation操作)而創建。RDD可看作一個spark的對象,它本身存在于內存中,如對文件計算是一個RDD,等。一個RDD可以包含多個分區,每個分區就是一個dataset片段。但是RDD抽象出來的東西里面實際的數據,是分散在各個節點上面的,RDD可分區,分區的個數是我們可以指定的。默認情況下,一個hdfs塊就是一個分區。
RDD的屬性
wordcount粗解RDD
懶惰調用
RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。這就是Spark的惰性調用機制。
正式這種執行機制為是spark的高效計算的基礎,正是因為懶惰執行,spark才能更有效的運行于于內存,使得高效的共享內存機制避免了大量中間結果,從而避免了磁盤寫入寫出帶來的性能消耗,同時內部的存儲對象可以是JAVA對象也避免了不必要的序列化和反序列化。
而在這基礎之上,spark又實現了高效的容錯機制,以及根據依賴關系進行工作優化。
寬依賴和窄依賴
由于RDD是粗粒度的操作數據集,每個Transformation操作都會生成一個新的RDD,所以RDD之間就會形成類似流水線的前后依賴關系;RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。如圖所示顯示了RDD之間的依賴關系。
從圖中可知:
窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操作都會產生窄依賴;(獨生子女)
寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產生寬依賴;(超生)
需要特別說明的是對join操作有兩種情況:
(1)圖中左半部分join:如果兩個RDD在進行join操作時,一個RDD的partition僅僅和另一個RDD中已知個數的Partition進行join,那么這種類型的join操作就是窄依賴,例如圖1中左半部分的join操作(join with inputs co-partitioned);
(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition進行join的轉換,這就涉及到了shuffle,因此這種類型的join操作也是寬依賴。
同時從這兩種依賴關系我們也可以明確的看出了,窄依賴在運行上要優于寬依賴,但是在很多執行中寬依賴又是不可避免的。為了能夠更好更高效的運行數據,spark基于這兩種依賴關系對工作劃分進行可設計(具體內容請見文末)
RDD的高效容錯機制
一般來說,分布式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新。 面向大規模數據分析,數據檢查點操作成本很高,需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源。
因此,Spark選擇記錄更新的方式,簡單點來說就是RDD運行之后保存RDD之間的依賴關系,當出現錯誤時,通過重新計算的方式來恢復數據。但是,如果更新粒度太細太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操作,然后將創建RDD的一系列變換序列(每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統(Lineage)”容錯)記錄下來,以便恢復丟失的分區。
Lineage本質上很類似于數據庫中的重做日志(Redo Log),只不過這個重做日志粒度很大,是對全局數據做同樣的重做進而恢復數據。
除此之外RDD中有時候還需要設置檢查點來提高性能:
Spark 的任務劃分
RDD運行原理
?
那么 RDD在Spark架構中是如何運行的呢?總高層次來看,主要分為三步:
對于寬依賴與窄依賴進行的認任務分發優化
*在spark中,會根據RDD之間的依賴關系將DAG圖(有向無環圖)劃分為不同的階段,對于窄依賴,由于partition依賴關系的確定性,partition的轉換處理就可以在同一個線程里完成,窄依賴就被spark劃分到同一個stage中,而對于寬依賴,只能等父RDD shuffle處理完成后,下一個stage才能開始接下來的計算。
因此spark劃分stage的整體思路是:從后往前推,遇到寬依賴就斷開,劃分為一個stage;遇到窄依賴就將這個RDD加入該stage中。因此在圖2中RDD C,RDD D,RDD E,RDDF被構建在一個stage中,RDD A被構建在一個單獨的Stage中,而RDD B和RDD G又被構建在同一個stage中。
這種分發方式可以使不同分區實現不同的流水線操作,有利于高效的運行與容錯機制,想象一下,當運行時當C1-->D1運行結束時候,就可以直接運行D1-->F1的操作了,這樣進行的并行計算有效的提升了計算性能。同時當出現錯誤時候,良好的stage劃分也減少了重新計算所帶來的成本。
在spark中,Task的類型分為2種:ShuffleMapTask和ResultTask;
簡單來說,DAG的最后一個階段會為每個結果的partition生成一個ResultTask,即每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition的數量所決定的!而其余所有階段都會生成ShuffleMapTask;之所以稱之為ShuffleMapTask是因為它需要將自己的計算結果通過shuffle到下一個stage中;也就是說上圖中的stage1和stage2相當于mapreduce中的Mapper,而ResultTask所代表的stage3就相當于mapreduce中的reducer。
Hadoop中MapReduce操作中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區別在于:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據Key進行reduce,但spark除了這兩個算子還有其他的算子;因此從這個意義上來說,Spark比Hadoop的計算算子更為豐富。
參考
本文原為個人筆記,參考了眾多資料,包括但不止于以下內容。
總結
- 上一篇: Jupyter notebook 编写s
- 下一篇: 2018 亚太数学建模大赛B题解题思路