spark shuffle的写操作之准备工作
前言
在前三篇文章中,spark 源碼分析之十九 -- DAG的生成和Stage的劃分?剖析了DAG的構(gòu)建和Stage的劃分,spark 源碼分析之二十 -- Stage的提交?剖析了TaskSet任務(wù)的提交,以及spark 源碼分析之二十一 -- Task的執(zhí)行細(xì)節(jié)剖析了Task執(zhí)行的整個(gè)流程。在第三篇文章中側(cè)重剖析了Task的整個(gè)執(zhí)行的流程是如何的,對(duì)于Task本身是如何執(zhí)行的?ResultTask?和?ShuffleMapTask兩部分并沒(méi)有做過(guò)多詳細(xì)的剖析。本篇文章我們針對(duì)Task執(zhí)行的細(xì)節(jié)展開(kāi),包括Task、ResultTask、ShuffleMapTask的深入剖析以及Spark底層的shuffle的實(shí)現(xiàn)機(jī)制等等。
Spark的任務(wù)劃分為ResultTask和ShuffleMapTask兩種任務(wù)。
其中ResultTask相對(duì)來(lái)說(shuō)比較簡(jiǎn)單,只是讀取上一個(gè)Stage的執(zhí)行結(jié)果或者是從數(shù)據(jù)源讀取任務(wù),最終將結(jié)果返回給driver。
ShuffleMapTask相對(duì)復(fù)雜一些,中間涉及了shuffle過(guò)程。
緊接上篇
我們?cè)賮?lái)看一下,ResultTask和ShuffleMapTask的runTask方法?,F(xiàn)在只關(guān)注數(shù)據(jù)處理邏輯,下面的兩張圖都做了標(biāo)注。
ResultTask
類(lèi)名:org.apache.spark.scheduler.ResultTask
其runTask方法如下:
ShuffleMapTask
類(lèi)名:org.apache.spark.scheduler.ShuffleMapTask
其runTask方法如下:
兩種Task執(zhí)行的相同和差異
相同點(diǎn)
差異點(diǎn)
總結(jié)關(guān)注點(diǎn)
由兩種Task執(zhí)行的相同和差異點(diǎn)可以總結(jié)出,要想對(duì)這兩種類(lèi)型的任務(wù)執(zhí)行有非常深刻的理解,必須搞明白shuffle 數(shù)據(jù)的讀寫(xiě)。這也是spark 計(jì)算的核心的關(guān)注點(diǎn) -- Shuffle的寫(xiě)操作、Shuffle的讀操作。
shuffle數(shù)據(jù)分類(lèi)
shuffle過(guò)程中寫(xiě)入Spark存儲(chǔ)系統(tǒng)的數(shù)據(jù)分為兩種,一種是shuffle數(shù)據(jù),一種是shuffle索引數(shù)據(jù),如下:
shuffle數(shù)據(jù)的管理類(lèi)--IndexShuffleBlockResolver
下面說(shuō)一下?IndexShuffleBlockResolver 類(lèi)。這個(gè)類(lèi)負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。
IndexShuffleBlockResolver繼承關(guān)系如下:
我們先來(lái)看父類(lèi)ShuffleBlockResolver。
ShuffleBlockResolver
主要是負(fù)責(zé)根據(jù)邏輯的shuffle的標(biāo)識(shí)(比如mapId、reduceId或shuffleId)來(lái)獲取shuffle的block。shuffle數(shù)據(jù)一般都被File或FileSegment包裝。
其接口定義如下:
其中,getBlockData根據(jù)shuffleId獲取shuffle數(shù)據(jù)。
下面來(lái)看?IndexShuffleBlockResolver的實(shí)現(xiàn)。
IndexShuffleBlockResolver
這個(gè)類(lèi)負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。
類(lèi)結(jié)構(gòu)如下:
blockManager是executor上的BlockManager類(lèi)。
transportCpnf主要是包含了關(guān)于shuffle的一些參數(shù)配置。
NOOP_REDUCE_ID是0,因?yàn)榇藭r(shí)還不知道reduce的id。
核心方法如下:
1. 獲取shuffle數(shù)據(jù)文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle的blockId對(duì)應(yīng)的物理文件。
2. 獲取shuffle索引文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle索引的blockId對(duì)應(yīng)的物理文件。
3.根據(jù)mapId將shuffle數(shù)據(jù)移除,源碼如下,思路:根據(jù)shuffleId和mapId刪除shuffle數(shù)據(jù)和索引文件
4.校驗(yàn)shuffle索引和數(shù)據(jù),源碼如下。
從上面可以看出,文件里第一個(gè)long型數(shù)是占位符,必為0.
后面的保存的數(shù)據(jù)是每一個(gè)block的大小,可以看出來(lái),每次讀的long型數(shù),是前面所有block的大小總和。
所以,當(dāng)前block的大小=這次讀取到的offset - 上次讀取到的offset
這種索引的設(shè)計(jì)非常巧妙。每一個(gè)block大小合起來(lái)就是整個(gè)文件的大小。每一個(gè)block的在整個(gè)文件中的offset也都記錄在索引文件中。
?
5. 寫(xiě)索引文件,源碼如下。
思路:首先先獲取shuffle的數(shù)據(jù)文件并創(chuàng)建索引的臨時(shí)文件。
獲取索引文件的每一個(gè)block 的大小。如果索引存在,則更新新的索引數(shù)組,刪除臨時(shí)數(shù)據(jù)文件,返回。
若索引不存在,將新的數(shù)據(jù)的索引數(shù)據(jù)寫(xiě)入臨時(shí)索引文件,最終刪除歷史數(shù)據(jù)文件和歷史索引文件,然后臨時(shí)數(shù)據(jù)文件和臨時(shí)數(shù)據(jù)索引文件重命名為新的數(shù)據(jù)和索引文件。
這樣的設(shè)計(jì),確保了數(shù)據(jù)索引隨著數(shù)據(jù)的更新而更新。
?
6. 根據(jù)shuffleId獲取block數(shù)據(jù),源碼如下。
?
思路:
先獲取shuffle數(shù)據(jù)的索引數(shù)據(jù),然后調(diào)用position位上,獲取block 的大小,然后初始化FileSegmentManagedBuffer,讀取文件的對(duì)應(yīng)segment的數(shù)據(jù)。
可以看出?reduceId就是block物理文件中的小的block(segment)的索引。
7. 停止blockResolver,空實(shí)現(xiàn)。
總結(jié),在這個(gè)類(lèi)中,可以學(xué)習(xí)到spark shuffle索引的設(shè)計(jì)思路,在工作中需要設(shè)計(jì)File和FileSegment的索引文件,這也是一種參考思路。
Shuffle的寫(xiě)數(shù)據(jù)前的準(zhǔn)備工作
直接來(lái)看?org.apache.spark.scheduler.ShuffleMapTask 的runTask的關(guān)鍵代碼如下:
這里的manager是SortShuffleManager,是ShuffleManager的唯一實(shí)現(xiàn)。
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter 源碼如下:
其中,numMapsForShuffle 定義如下:
它保存了shuffleID和mapper數(shù)量的映射關(guān)系。
獲取ShuffleHandle
首先,先來(lái)了解一下ShuffleHandle類(lèi)。
ShuffleHandle
下面大致了解一下ShuffleHandle的相關(guān)內(nèi)容。
類(lèi)說(shuō)明:
這個(gè)類(lèi)是Spark內(nèi)部使用的一個(gè)類(lèi),包含了關(guān)于Shuffle的一些信息,主要給ShuffleManage 使用。本質(zhì)上來(lái)說(shuō),它是一個(gè)標(biāo)志位,除了包含一些用于shuffle的一些屬性之外,沒(méi)有其他額外的方法,用case class來(lái)實(shí)現(xiàn)更好一點(diǎn)。
類(lèi)源碼如下:
繼承關(guān)系如下:
BaseShuffleHandle
全稱(chēng):org.apache.spark.shuffle.BaseShuffleHandle
類(lèi)說(shuō)明:
它是ShuffleHandle的基礎(chǔ)實(shí)現(xiàn)。
類(lèi)源碼如下:
下面來(lái)看一下它的兩個(gè)子類(lèi)實(shí)現(xiàn)。
BypassMergeSortShuffleHandle
全稱(chēng):org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle
類(lèi)說(shuō)明:
如果想用于序列化的shuffle實(shí)現(xiàn),可以使用這個(gè)標(biāo)志類(lèi)。其源碼如下:
?
SerializedShuffleHandle
全稱(chēng):org.apache.spark.shuffle.sort.SerializedShuffleHandle
類(lèi)說(shuō)明:
used to identify when we've chosen to use the bypass merge sort shuffle path.
類(lèi)源碼如下:?
獲取ShuffleHandle
在org.apache.spark.ShuffleDependency中有如下定義:
shuffleId是SparkContext生成的唯一全局id。
org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle 源碼如下:
可以看出,mapper的數(shù)量等于父RDD的分區(qū)的數(shù)量。
下面,看一下使用bypassMergeSort的條件,即org.apache.spark.shuffle.sort.SortShuffleWriter#shouldBypassMergeSort 源碼如下:
思路:首先如果父RDD沒(méi)有啟用mapSideCombine并且父RDD的結(jié)果分區(qū)數(shù)量小于bypassMergeSort閥值,則使用?bypassMergeSort。其中bypassMergeSort閥值 默認(rèn)是200,可以通過(guò)?spark.shuffle.sort.bypassMergeThreshold 參數(shù)設(shè)定。
使用serializedShuffle的條件,即org.apache.spark.shuffle.sort.SortShuffleManager#canUseSerializedShuffle 源碼如下:
思路:序列化類(lèi)支持支持序列化對(duì)象的遷移,并且不使用mapSideCombine操作以及父RDD的分區(qū)數(shù)不大于?(1 << 24) 即可使用該模式的shuffle。
根據(jù)ShuffleHandle獲取ShuffleWriter
首先先對(duì)ShuffleWriter做一下簡(jiǎn)單說(shuō)明。
ShuffleWriter
類(lèi)說(shuō)明:它負(fù)責(zé)將map任務(wù)的輸出寫(xiě)入到shuffle系統(tǒng)。其繼承關(guān)系如下,對(duì)應(yīng)著ShuffleHandle的三種shuffle實(shí)現(xiàn)標(biāo)志。
獲取ShuffleWriter
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter源碼如下:
一個(gè)mapper對(duì)應(yīng)一個(gè)writer,一個(gè)writer往一個(gè)分區(qū)上的寫(xiě)數(shù)據(jù)。
總結(jié)
本篇文章主要從Task 的差異和相同點(diǎn)出發(fā),引出spark shuffle的重要性,接著對(duì)Spark shuffle數(shù)據(jù)的類(lèi)型以及spark shuffle的管理類(lèi)做了剖析。最后介紹了三種shuffle類(lèi)型的標(biāo)志位以及如何確定使用哪種類(lèi)型的數(shù)據(jù)的。
接下來(lái),正式進(jìn)入mapper寫(xiě)數(shù)據(jù)部分。spark內(nèi)部有三種實(shí)現(xiàn),每一種寫(xiě)方式會(huì)有一篇文章專(zhuān)門(mén)剖析,我們逐一來(lái)看其實(shí)現(xiàn)機(jī)制。
轉(zhuǎn)載于:https://www.cnblogs.com/johnny666888/p/11265502.html
總結(jié)
以上是生活随笔為你收集整理的spark shuffle的写操作之准备工作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: [算法]最小差值
- 下一篇: 混合图的欧拉路径和欧拉回路判断