日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

spark shuffle的写操作之准备工作

發(fā)布時(shí)間:2024/4/15 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark shuffle的写操作之准备工作 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

在前三篇文章中,spark 源碼分析之十九 -- DAG的生成和Stage的劃分?剖析了DAG的構(gòu)建和Stage的劃分,spark 源碼分析之二十 -- Stage的提交?剖析了TaskSet任務(wù)的提交,以及spark 源碼分析之二十一 -- Task的執(zhí)行細(xì)節(jié)剖析了Task執(zhí)行的整個(gè)流程。在第三篇文章中側(cè)重剖析了Task的整個(gè)執(zhí)行的流程是如何的,對(duì)于Task本身是如何執(zhí)行的?ResultTask?和?ShuffleMapTask兩部分并沒(méi)有做過(guò)多詳細(xì)的剖析。本篇文章我們針對(duì)Task執(zhí)行的細(xì)節(jié)展開(kāi),包括Task、ResultTask、ShuffleMapTask的深入剖析以及Spark底層的shuffle的實(shí)現(xiàn)機(jī)制等等。

Spark的任務(wù)劃分為ResultTask和ShuffleMapTask兩種任務(wù)。

其中ResultTask相對(duì)來(lái)說(shuō)比較簡(jiǎn)單,只是讀取上一個(gè)Stage的執(zhí)行結(jié)果或者是從數(shù)據(jù)源讀取任務(wù),最終將結(jié)果返回給driver。

ShuffleMapTask相對(duì)復(fù)雜一些,中間涉及了shuffle過(guò)程。

緊接上篇

我們?cè)賮?lái)看一下,ResultTask和ShuffleMapTask的runTask方法?,F(xiàn)在只關(guān)注數(shù)據(jù)處理邏輯,下面的兩張圖都做了標(biāo)注。

ResultTask

類(lèi)名:org.apache.spark.scheduler.ResultTask

其runTask方法如下:

ShuffleMapTask

類(lèi)名:org.apache.spark.scheduler.ShuffleMapTask

其runTask方法如下:

兩種Task執(zhí)行的相同和差異

相同點(diǎn)

  • 這兩種Task都是在RDD的分區(qū)上執(zhí)行的。
  • 兩種Task都需要調(diào)用父RDD的iterator方法來(lái)獲取父RDD對(duì)應(yīng)分區(qū)的數(shù)據(jù)。
  • 這些數(shù)據(jù)可以直接來(lái)自于數(shù)據(jù)源,也可以直接來(lái)自于上一個(gè)ShuffleMapTask執(zhí)行的結(jié)果。
  • 當(dāng)一個(gè)Stage中所有分區(qū)的Task都執(zhí)行完畢,這個(gè)Stage才算執(zhí)行完畢。
  • 差異點(diǎn)

  • ResultTask獲取父RDD分區(qū)數(shù)據(jù)之后,把分區(qū)數(shù)據(jù)作為參數(shù)輸入到action函數(shù)中,最終計(jì)算出特定的結(jié)果返回給driver。
  • ShuffleMapTask獲取父RDD分區(qū)數(shù)據(jù)之后,把分區(qū)數(shù)據(jù)作為參數(shù)傳入分區(qū)函數(shù),最終形成新的RDD中的分區(qū)數(shù)據(jù),保存在各個(gè)Executor節(jié)點(diǎn)中,并將分區(qū)數(shù)據(jù)信息MapStatus返回給driver。
  • 總結(jié)關(guān)注點(diǎn)

    由兩種Task執(zhí)行的相同和差異點(diǎn)可以總結(jié)出,要想對(duì)這兩種類(lèi)型的任務(wù)執(zhí)行有非常深刻的理解,必須搞明白shuffle 數(shù)據(jù)的讀寫(xiě)。這也是spark 計(jì)算的核心的關(guān)注點(diǎn) -- Shuffle的寫(xiě)操作、Shuffle的讀操作。

    shuffle數(shù)據(jù)分類(lèi)

    shuffle過(guò)程中寫(xiě)入Spark存儲(chǔ)系統(tǒng)的數(shù)據(jù)分為兩種,一種是shuffle數(shù)據(jù),一種是shuffle索引數(shù)據(jù),如下:

    shuffle數(shù)據(jù)的管理類(lèi)--IndexShuffleBlockResolver

    下面說(shuō)一下?IndexShuffleBlockResolver 類(lèi)。這個(gè)類(lèi)負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。

    IndexShuffleBlockResolver繼承關(guān)系如下:

    我們先來(lái)看父類(lèi)ShuffleBlockResolver。

    ShuffleBlockResolver

    主要是負(fù)責(zé)根據(jù)邏輯的shuffle的標(biāo)識(shí)(比如mapId、reduceId或shuffleId)來(lái)獲取shuffle的block。shuffle數(shù)據(jù)一般都被File或FileSegment包裝。

    其接口定義如下:

    其中,getBlockData根據(jù)shuffleId獲取shuffle數(shù)據(jù)。

    下面來(lái)看?IndexShuffleBlockResolver的實(shí)現(xiàn)。

    IndexShuffleBlockResolver

    這個(gè)類(lèi)負(fù)責(zé)shuffle數(shù)據(jù)的獲取和刪除,以及shuffle索引數(shù)據(jù)的更新和刪除。

    類(lèi)結(jié)構(gòu)如下:

    blockManager是executor上的BlockManager類(lèi)。

    transportCpnf主要是包含了關(guān)于shuffle的一些參數(shù)配置。

    NOOP_REDUCE_ID是0,因?yàn)榇藭r(shí)還不知道reduce的id。

    核心方法如下:

    1. 獲取shuffle數(shù)據(jù)文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle的blockId對(duì)應(yīng)的物理文件。

    2. 獲取shuffle索引文件,源碼如下,思路:根據(jù)blockManager的DiskBlockManager獲取shuffle索引的blockId對(duì)應(yīng)的物理文件。

    3.根據(jù)mapId將shuffle數(shù)據(jù)移除,源碼如下,思路:根據(jù)shuffleId和mapId刪除shuffle數(shù)據(jù)和索引文件

    4.校驗(yàn)shuffle索引和數(shù)據(jù),源碼如下。

    從上面可以看出,文件里第一個(gè)long型數(shù)是占位符,必為0.

    后面的保存的數(shù)據(jù)是每一個(gè)block的大小,可以看出來(lái),每次讀的long型數(shù),是前面所有block的大小總和。

    所以,當(dāng)前block的大小=這次讀取到的offset - 上次讀取到的offset

    這種索引的設(shè)計(jì)非常巧妙。每一個(gè)block大小合起來(lái)就是整個(gè)文件的大小。每一個(gè)block的在整個(gè)文件中的offset也都記錄在索引文件中。

    ?

    5. 寫(xiě)索引文件,源碼如下。

    思路:首先先獲取shuffle的數(shù)據(jù)文件并創(chuàng)建索引的臨時(shí)文件。

    獲取索引文件的每一個(gè)block 的大小。如果索引存在,則更新新的索引數(shù)組,刪除臨時(shí)數(shù)據(jù)文件,返回。

    若索引不存在,將新的數(shù)據(jù)的索引數(shù)據(jù)寫(xiě)入臨時(shí)索引文件,最終刪除歷史數(shù)據(jù)文件和歷史索引文件,然后臨時(shí)數(shù)據(jù)文件和臨時(shí)數(shù)據(jù)索引文件重命名為新的數(shù)據(jù)和索引文件。

    這樣的設(shè)計(jì),確保了數(shù)據(jù)索引隨著數(shù)據(jù)的更新而更新。

    ?

    6. 根據(jù)shuffleId獲取block數(shù)據(jù),源碼如下。

    ?

    思路:

    先獲取shuffle數(shù)據(jù)的索引數(shù)據(jù),然后調(diào)用position位上,獲取block 的大小,然后初始化FileSegmentManagedBuffer,讀取文件的對(duì)應(yīng)segment的數(shù)據(jù)。

    可以看出?reduceId就是block物理文件中的小的block(segment)的索引。

    7. 停止blockResolver,空實(shí)現(xiàn)。

    總結(jié),在這個(gè)類(lèi)中,可以學(xué)習(xí)到spark shuffle索引的設(shè)計(jì)思路,在工作中需要設(shè)計(jì)File和FileSegment的索引文件,這也是一種參考思路。

    Shuffle的寫(xiě)數(shù)據(jù)前的準(zhǔn)備工作

    直接來(lái)看?org.apache.spark.scheduler.ShuffleMapTask 的runTask的關(guān)鍵代碼如下:

    這里的manager是SortShuffleManager,是ShuffleManager的唯一實(shí)現(xiàn)。

    org.apache.spark.shuffle.sort.SortShuffleManager#getWriter 源碼如下:

    其中,numMapsForShuffle 定義如下:

    它保存了shuffleID和mapper數(shù)量的映射關(guān)系。

    獲取ShuffleHandle

    首先,先來(lái)了解一下ShuffleHandle類(lèi)。

    ShuffleHandle

    下面大致了解一下ShuffleHandle的相關(guān)內(nèi)容。

    類(lèi)說(shuō)明:

    這個(gè)類(lèi)是Spark內(nèi)部使用的一個(gè)類(lèi),包含了關(guān)于Shuffle的一些信息,主要給ShuffleManage 使用。本質(zhì)上來(lái)說(shuō),它是一個(gè)標(biāo)志位,除了包含一些用于shuffle的一些屬性之外,沒(méi)有其他額外的方法,用case class來(lái)實(shí)現(xiàn)更好一點(diǎn)。

    類(lèi)源碼如下:

    繼承關(guān)系如下:

    BaseShuffleHandle

    全稱(chēng):org.apache.spark.shuffle.BaseShuffleHandle

    類(lèi)說(shuō)明:

    它是ShuffleHandle的基礎(chǔ)實(shí)現(xiàn)。

    類(lèi)源碼如下:

    下面來(lái)看一下它的兩個(gè)子類(lèi)實(shí)現(xiàn)。

    BypassMergeSortShuffleHandle

    全稱(chēng):org.apache.spark.shuffle.sort.BypassMergeSortShuffleHandle

    類(lèi)說(shuō)明:

    如果想用于序列化的shuffle實(shí)現(xiàn),可以使用這個(gè)標(biāo)志類(lèi)。其源碼如下:

    ?

    SerializedShuffleHandle

    全稱(chēng):org.apache.spark.shuffle.sort.SerializedShuffleHandle

    類(lèi)說(shuō)明:

    used to identify when we've chosen to use the bypass merge sort shuffle path.

    類(lèi)源碼如下:?

    獲取ShuffleHandle

    在org.apache.spark.ShuffleDependency中有如下定義:

    shuffleId是SparkContext生成的唯一全局id。

    org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle 源碼如下:

    可以看出,mapper的數(shù)量等于父RDD的分區(qū)的數(shù)量。

    下面,看一下使用bypassMergeSort的條件,即org.apache.spark.shuffle.sort.SortShuffleWriter#shouldBypassMergeSort 源碼如下:

    思路:首先如果父RDD沒(méi)有啟用mapSideCombine并且父RDD的結(jié)果分區(qū)數(shù)量小于bypassMergeSort閥值,則使用?bypassMergeSort。其中bypassMergeSort閥值 默認(rèn)是200,可以通過(guò)?spark.shuffle.sort.bypassMergeThreshold 參數(shù)設(shè)定。

    使用serializedShuffle的條件,即org.apache.spark.shuffle.sort.SortShuffleManager#canUseSerializedShuffle 源碼如下:

    思路:序列化類(lèi)支持支持序列化對(duì)象的遷移,并且不使用mapSideCombine操作以及父RDD的分區(qū)數(shù)不大于?(1 << 24) 即可使用該模式的shuffle。

    根據(jù)ShuffleHandle獲取ShuffleWriter

    首先先對(duì)ShuffleWriter做一下簡(jiǎn)單說(shuō)明。

    ShuffleWriter

    類(lèi)說(shuō)明:它負(fù)責(zé)將map任務(wù)的輸出寫(xiě)入到shuffle系統(tǒng)。其繼承關(guān)系如下,對(duì)應(yīng)著ShuffleHandle的三種shuffle實(shí)現(xiàn)標(biāo)志。

    獲取ShuffleWriter

    org.apache.spark.shuffle.sort.SortShuffleManager#getWriter源碼如下:

    一個(gè)mapper對(duì)應(yīng)一個(gè)writer,一個(gè)writer往一個(gè)分區(qū)上的寫(xiě)數(shù)據(jù)。

    總結(jié)

    本篇文章主要從Task 的差異和相同點(diǎn)出發(fā),引出spark shuffle的重要性,接著對(duì)Spark shuffle數(shù)據(jù)的類(lèi)型以及spark shuffle的管理類(lèi)做了剖析。最后介紹了三種shuffle類(lèi)型的標(biāo)志位以及如何確定使用哪種類(lèi)型的數(shù)據(jù)的。

    接下來(lái),正式進(jìn)入mapper寫(xiě)數(shù)據(jù)部分。spark內(nèi)部有三種實(shí)現(xiàn),每一種寫(xiě)方式會(huì)有一篇文章專(zhuān)門(mén)剖析,我們逐一來(lái)看其實(shí)現(xiàn)機(jī)制。

    轉(zhuǎn)載于:https://www.cnblogs.com/johnny666888/p/11265502.html

    總結(jié)

    以上是生活随笔為你收集整理的spark shuffle的写操作之准备工作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。