spark中的容错
一般來說,分布式數據集的容錯性有兩種方式:數據檢查點和記錄數據的更新。面向大規模數據分析,數據檢查點操作成本很高,需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源。因此,Spark選擇記錄更新的方式。但是,如果更新粒度太
細太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。Lineage本質上很類似于數據庫中的重做日志(Redo Log),只不過這個重做日志粒度很大,是對全局數據做同樣的重做進而恢復數據。
Lineage
相比其他系統的細顆粒度的內存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據Transformation操作(如filter、map、join等)行為。當這個RDD的部分分區數據丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數據分區。因為這種粗顆粒的數據模型,限制了Spark的運用場合,所以Spark并不適用于所有高性能要求的場景,但同時相比細顆粒度的數據模型,也帶來了性能的提升。
RDD在Lineage依賴方面分為兩種:Narrow Dependencies與Shuffle Dependencies,用來解決數據容錯的高效性。Narrow Dependencies是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。Shuffle Dependencies是指子RDD的分區依賴于父RDD的多個分區或所有分區,即存在一個父RDD的一個分區對應一個子RDD的多個分區。
本質理解:根據父RDD分區是對應1個還是多個子RDD分區來區分NarrowDependency(父分區對應一個子分區)和Shuffle Dependency(父分區對應多個子分區)。如果對應多個,則當容錯重算分區時,因為父分區數據只有一部分是需要重算子分區的,其余數據重算就造成了冗余計算。
如果一個節點死機了,而且運算Narrow Dependency,則只要把丟失的父RDD分區重算即可,不依賴于其他節點。而Shuffle Dependency需要父RDD的所有分區都存在,重算就很昂貴了。可以這樣理解開銷的經濟與否:在Narrow Dependency中,在子RDD的分區丟失、重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,并不存在冗余計算。在Shuffle Dependency情況下,丟失一個子RDD分區重算的每個父RDD的每個分區的所有數據并不是都給丟失的子RDD分區用的,會有一部分數據相當于對應的是
未丟失的子RDD分區中需要的數據,這樣就會產生冗余計算開銷,這也是ShuffleDependency開銷更大的原因。因此如果使用Checkpoint算子來做檢查點,不僅要考慮Lineage是否足夠長,也要考慮是否有寬依賴,對Shuffle Dependency加Checkpoint是最物有所值的
CheckPoint
以下兩種情況下,RDD需要加檢查點。
1)DAG中的Lineage過長,如果重算,則開銷太大。
2)在Shuffle Dependency上做Checkpoint(檢查點)獲得的收益更大
由于RDD是只讀的,所以Spark的RDD計算中一致性不是主要關心的內容,內存相對容易管理
傳統做檢查點有兩種方式:通過冗余數據和日志記錄更新操作。在RDD中的doCheckPoint方法相當于通過冗余數據來緩存數據,而之前介紹的血統就是通過相當粗粒度的記錄更新操作來實現容錯的
可以通過SparkContext.setCheckPointDir()設置檢查點數據的存儲路徑,進而將數據存儲備份,然后Spark刪除所有已經做檢查點的RDD的祖先RDD依賴。這個操作需要在所有需要對這個RDD所做的操作完成之后再做,因為數據會寫入持久化存儲造成I/O開銷。官方建議,做檢查點的RDD最好是在內存中已經緩存的RDD,否則保存這個RDD在持久化的文件中需要重新計算,產生I/O開銷
總結
- 上一篇: artTemplate的使用总结
- 下一篇: 深入理解Spark 2.1 Core (