Spark Shuffle 解析
生活随笔
收集整理的這篇文章主要介紹了
Spark Shuffle 解析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
5.Spark Shuffle 解析
5.1 Shuffle 的核心要點
5.1.1 ShuffleMapStage 與 FinalStage
在劃分 stage 時,最后一個 stage 稱為 FinalStage,它本質上是一個 ResultStage 對象,前面的所有 stage 被稱為 ShuffleMapStage。 ShuffleMapStage 的結束伴隨著 shuffle 文件的寫磁盤。 ResultStage 基本上對應代碼中的 action 算子,即將一個函數應用在 RDD 的各 個 partition 的數據集上,意味著一個 job 的運行結束。??
5.1.2 Shuffle 中的任務個數
我們知道,Spark Shuffle 分為 map 階段和 reduce 階段,或者稱之為 ShuffleRed 階段和 ShuffleWrite 階段,那么對于一次 Shuffle,map 過程和 reduce 過程都會由若 干個 task 來執行,那么 map task 和 reduce task 的數量是如何確定的呢? 假設 Spark 任務從 HDFS 中讀取數據,那么初始 RDD 分區個數由該文件的 split 個數決定,也就是一個 split 對應生成的 RDD 的一個 partition,我們假設初始 partition 個數為 N。 初始 RDD 經過一系列算子計算后(假設沒有執行 repartition 和 coalesce 算子進 行重分區,則分區個數不變,仍為 N,如果經過重分區算子,那么分區個數變為 M), 我們假設分區個數不變,當執行到 Shuffle 操作時,map 端的 task 個數和 partition 個數一致,即 map task 為 N 個。 reduce 端的 stage 默認取 spark.default.parallelism 這個配置項的值作為分區數, 如果沒有配置,則以 map 端的最后一個 RDD 的分區數作為其分區數(也就是 N), 那么分區數就決定了 reduce 端的 task 的個數。5.1.3 reduce 端數據的讀取
根據 stage 的劃分我們知道,map 端 task 和 reduce 端 task 不在相同的 stage 中, map task 位于 ShuffleMapStage,reduce task 位于 ResultStage,map task 會先執行, 那么后執行的 reduce task 如何知道從哪里去拉取 map task 落盤后的數據呢? reduce 端的數據拉取過程如下: 1. map task 執 行 完 畢 后 會 將 計 算 狀 態 以 及 磁 盤 小 文 件 位 置 等 信 息 封 裝 到 mapStatue 對象中,然后由本進程中的 MapOutPutTrackerWorker 對象將 mapStatus 對 象發送給 Driver 進程的 MapOutPutTrackerMaster 對象; 2. 在 reduce task 開始執行之前會先讓本進程中的 MapOutputTrackerWorker 向 Driver 進程中的 MapoutPutTrakcerMaster 發動請求,請求磁盤小文件位置信息; 3. 當所有的 Map task 執行完畢后,Driver 進程中的 MapOutPutTrackerMaster 就掌握了 所有的 磁盤小 文件的位 置信息 。此 時 MapOutPutTrackerMaster 會告訴 MapOutPutTrackerWorker 磁盤小文件的位置信息; 4. 完成之前的操作之后,由 BlockTransforService 去 Executor0 所在的節點拉 數據,默認會啟動五個子線程。每次拉取的數據量不能超過 48M(reduce task 每次 最多拉取 48M 數據,將拉來的數據存儲到 Executor 內存的 20%內存中)。?
?
5.2 HashShuffle 解析
以下的討論都假設每個 Executor 有 1 個 CPU core。 1. 未經優化的 HashShuffleManager shuffle write 階段,主要就是在一個 stage 結束計算之后,為了下一個 stage 可 以執行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數據按 key 進 行“劃分”。所謂“劃分”,就是對相同的 key 執行 hash 算法,從而將相同 key 都 寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游 stage 的一個 task。在將數 據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到 磁盤文件中去。 下一個 stage 的 task 有多少個,當前 stage 的每個 task 就要創建多少份磁盤文件。 比如下一個 stage 總共有 100 個 task,那么當前 stage 的每個 task 都要創建 100 份磁 盤文件。如果當前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,那么每個 Executor 上總共就要創建 500 個磁盤文件,所有 Executor 上會創 建 5000 個磁盤文件。由此可見,未經優化的 shuffle write 操作所產生的磁盤文件的 數量是極其驚人的。 shuffle read 階段,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每 一個 task 就需要將上一個 stage 的計算結果中的所有相同 key,從各個節點上通過網 絡都拉取到自己所在的節點上,然后進行 key 的聚合或連接等操作。由于 shuffle write 的過程中,map task 給下游 stage 的每個 reduce task 都創建了一個磁盤文件,因此 shuffle read 的過程中,每個 reduce task 只要從上游 stage 的所有 map task 所在節點 上,拉取屬于自己的那一個磁盤文件即可。 shuffle read 的拉取過程是一邊拉取一邊進行聚合的。每個 shuffle read task 都會 有一個自己的 buffer 緩沖,每次都只能拉取與 buffer 緩沖相同大小的數據,然后通 過內存中的一個 Map 進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并 放到 buffer 緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得 到最終的結果。 未優化的 HashShuffleManager 工作原理如圖 1-7 所示: 2. 優化后的 HashShuffleManager 為了優化 HashShuffleManager 我 們 可 以 設 置 一 個 參 數 , spark.shuffle. consolidateFiles,該參數默認值為 false,將其設置為 true 即可開啟優化機制,通常 來說,如果我們使用 HashShuffleManager,那么都建議開啟這個選項。 開啟 consolidate 機制之后,在 shuffle write 過程中,task 就不是為下游 stage 的 每 個 task 創 建 一 個 磁 盤 文 件 了 , 此 時 會 出 現 shuffleFileGroup 的 概 念 , 每 個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是 相同的。一個 Executor 上有多少個 CPU core,就可以并行執行多少個 task。而第一 批并行執行的每個 task 都會創建一個 shuffleFileGroup,并將數據寫入對應的磁盤文 件內。 當 Executor 的 CPU core 執行完一批 task,接著執行下一批 task 時,下一批 task 就會復用之前已有的 shuffleFileGroup,包括其中的磁盤文件,也就是說,此時 task 會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate 機制允許不同的 task 復用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件 進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升 shuffle write 的性能。 假設第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor(Executor CPU 個數為 1),每個 Executor 執行 5 個 task。那么原本使用未 經優化的 HashShuffleManager 時,每個 Executor 會產生 500 個磁盤文件,所有 Executor 會產生 5000 個磁盤文件的。但是此時經過優化之后,每個 Executor 創建的 磁盤文件的數量的計算公式為:CPU core 的數量 * 下一個 stage 的 task 數量,也就 是說,每個 Executor 此時只會創建 100 個磁盤文件,所有 Executor 只會創建 1000 個磁盤文件。 優化后的 HashShuffleManager 工作原理如圖 1-8 所示:5.3 SortShuffle 解析
SortShuffleManager 的運行機制主要分成兩種,一種是普通運行機制,另一種是 bypass 運 行 機 制 。 當 shuffle read task 的數量小于等于 spark.shuffle.sort. bypassMergeThreshold 參數的值時(默認為 200),就會啟用 bypass 機制。 1. 普通運行機制 在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子, 可能選用不同的數據結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會 選用 Map 數據結構,一邊通過 Map 進行聚合,一邊寫入內存;如果是 join 這種普 通的 shuffle 算子,那么會選用 Array 數據結構,直接寫入內存。接著,每寫一條數 據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨 界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數 據結構。 在溢寫到磁盤文件之前,會先根據 key 對內存數據結構中已有的數據進行排序。 排序過后,會分批將數據寫入磁盤文件。默認的 batch 數量是 10000 條,也就是說, 排序好的數據,會以每批 1 萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通 過 Java 的 BufferedOutputStream 實現的。BufferedOutputStream 是 Java 的緩沖輸出 流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這 樣可以減少磁盤 IO 次數,提升性能。 一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作, 也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是 merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最 終的磁盤文件之中。此外,由于一個 task 就只對應一個磁盤文件,也就意味著該 task 為下游 stage 的 task 準備的數據都在這一個文件中,因此還會單獨寫一份索引文件, 其中標識了下游各個 task 的數據在文件中的 start offset 與 end offset。 SortShuffleManager 由于有一個磁盤文件 merge 的過程,因此大大減少了文件數 量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由于每個 task 最終只有一個磁盤文件,因此 此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。 普通運行機制的 SortShuffleManager 工作原理如圖 1-9 所示: 2. bypass 運行機制 bypass 運行機制的觸發條件如下: ? shuffle map task 數量小于 spark.shuffle.sort.bypassMergeThreshold 參數的值。 ? 不是聚合類的 shuffle 算子。 此時,每個 task 會為每個下游 task 都創建一個臨時磁盤文件,并將數據按 key 進行 hash 然后根據 key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁 盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會 將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。 該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因 為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此 少量的最終磁盤文件,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。 而該機制與普通 SortShuffleManager 運行機制的不同在于:第一,磁盤寫機制 不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write 過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。 普通運行機制的 SortShuffleManager 工作原理如圖 1-10 所示:?
?
轉載于:https://www.cnblogs.com/LXL616/p/11165941.html
總結
以上是生活随笔為你收集整理的Spark Shuffle 解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 海南省保亭县保亭七仙绿源生态农业开发有限
- 下一篇: zz 递归算法转换为非递归算法