日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析

發布時間:2024/1/23 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

上一篇《深入理解Spark(一):RDD實現及源碼分析 》提到:

定義RDD之后,程序員就可以在動作(注:即action操作)中使用RDD了。動作是向應用程序返回值,或向存儲系統導出數據的那些操作,例如,count(返回RDD中的元素個數),collect(返回元素本身),save(將RDD輸出到存儲系統)。在Spark中,只有在動作第一次使用RDD時,才會計算RDD(即延遲計算)。這樣在構建RDD的時候,運行時通過管道的方式傳輸多個轉換。

一次action操作會觸發RDD的延遲計算,我們把這樣的一次計算稱作一個Job。我們還提到了窄依賴和寬依賴的概念:

窄依賴指的是:每個parent RDD 的 partition 最多被 child RDD的一個partition使用
寬依賴指的是:每個parent RDD 的 partition 被多個 child RDD的partition使用

窄依賴每個child RDD 的partition的生成操作都是可以并行的,而寬依賴則需要所有的parent partition shuffle結果得到后再進行。

由于在RDD的一系類轉換中,若其中一些連續的轉換都是窄依賴,那么它們是可以并行的,而有寬依賴則不行。所有,Spark將寬依賴為劃分界限,將Job換分為多個Stage。而一個Stage里面的轉換任務,我們可以把它抽象成TaskSet。一個TaskSet中有很多個Task,它們的轉換操作都是相同的,不同只是操作的對象是對數據集中的不同子數據集。

接下來,Spark就可以提交這些任務了。但是,如何對這些任務進行調度和資源分配呢?如何通知worker去執行這些任務呢?接下來,我們會一一講解。

根據以上兩個階段,我們會來詳細介紹兩個Scheduler,一個是DAGScheduler,另外一個是TaskScheduler。

我們先來看一來在SparkContext中是如何創建它們的:

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)_schedulerBackend = sched_taskScheduler = ts_dagScheduler = new DAGScheduler(this)

可以看到,我們是先用函數createTaskScheduler創建了taskScheduler,再new了一個DAGScheduler。這個順序可以改變嗎?答案是否定的,我們看下DAGScheduler類就知道了:

class DAGScheduler(private[scheduler] val sc: SparkContext,private[scheduler] val taskScheduler: TaskScheduler,listenerBus: LiveListenerBus,mapOutputTracker: MapOutputTrackerMaster,blockManagerMaster: BlockManagerMaster,env: SparkEnv,clock: Clock = new SystemClock())extends Logging {def this(sc: SparkContext, taskScheduler: TaskScheduler) = {this(sc,taskScheduler,sc.listenerBus,sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],sc.env.blockManager.master,sc.env)}def this(sc: SparkContext) = this(sc, sc.taskScheduler)***}

SparkContext中創建的TaskScheduler,會傳入DAGScheduler賦值給它的成員變量,再DAG階段結束后,使用它進行下一步對任務調度等的操作。

提交Job

調用棧如下:

  • rdd.count
    • SparkContext.runJob
      • DAGScheduler.runJob
        • DAGScheduler.submitJob
          • DAGSchedulerEventProcessLoop.doOnReceive
            • DAGScheduler.handleJobSubmitted

接下來,我們來逐個深入:

rdd.count

RDD的一些action操作都會觸發SparkContext的runJob函數,如count()

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
  • 1
  • 1

SparkContext.runJob

SparkContext的runJob會觸發 DAGScheduler的runJob:

def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()}

這里的rdd.doCheckpoint()并不是對自己Checkpoint,而是遞歸的回溯parent rdd 檢查checkpointData是否被定義了,若定義了就將該rdd Checkpoint:

private[spark] def doCheckpoint(): Unit = {RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {if (!doCheckpointCalled) {doCheckpointCalled = trueif (checkpointData.isDefined) {if (checkpointAllMarkedAncestors) {//若想要把checkpointData定義過的RDD的parents也進行checkpoint的話,//那么我們需要先對parents checkpoint。//這是因為,如果RDD把自己checkpoint了,//那么它就將lineage中它的parents給切除了。dependencies.foreach(_.rdd.doCheckpoint())}checkpointData.get.checkpoint()} else {dependencies.foreach(_.rdd.doCheckpoint())}}}

具體的checkpoint實現可見上一篇博文。

DAGScheduler.runJob

DAGScheduler的runJob會觸發DAGScheduler的submitJob:

/*** 參數介紹:* @param rdd: 執行任務的目標TDD* @param func: 在RDD的分區上所執行的函數* @param partitions: 需要執行的分區集合;有些job并不會對RDD的所有分區都進行計算的,比如說first()* @param callSite:用戶程序的調用點* @param resultHandler:回調結果* @param properties:關于這個job的調度器特征,比如說公平調度的pool名字,這個會在后續講到 */def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = {val start = System.nanoTimeval waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)***waiter.completionFuture.value.get match {case scala.util.Success(_) =>logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))case scala.util.Failure(exception) =>logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))val callerStackTrace = Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace ++ callerStackTrace)throw exception}}

DAGScheduler.submitJob

我們接下來看看submitJob里面做了什么:

def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {// 確認沒在不存在的partition上執行任務val maxPartitions = rdd.partitions.lengthpartitions.find(p => p >= maxPartitions || p < 0).foreach { p =>throw new IllegalArgumentException("Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)}//遞增得到jobIdval jobId = nextJobId.getAndIncrement()if (partitions.size == 0) {//若Job沒對任何一個partition執行任務,//則立即返回return new JobWaiter[U](this, jobId, 0, resultHandler)}assert(partitions.size > 0)val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter}

DAGSchedulerEventProcessLoop.doOnReceive

eventProcessLoop是一個DAGSchedulerEventProcessLoop類對象,即一個DAG調度事件處理的監聽。eventProcessLoop中調用doOnReceive來進行監聽

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {//當事件為JobSubmitted時,//會調用DAGScheduler.handleJobSubmittedcase JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) *** }
  • 1

DAGScheduler.handleJobSubmitted

自此Job的提交就完成了:

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: ResultStage = nulltry {finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))val jobSubmissionTime = clock.getTimeMillis()jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.setActiveJob(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))submitStage(finalStage)submitWaitingStages()}
  • 1

接下來我們來看看handleJobSubmitted中的newResultStage,一個非常有趣的劃分Stage過程。

劃分Stage

如我們之前提到的:Spark將寬依賴為劃分界限,將Job換分為多個Stage。調用棧為:

  • DAGScheduler.newResultStage
    • DAGScheduler.getParentStagesAndId
      • DAGScheduler.getParentStages
        • DAGScheduler.getShuffleMapStage
          • DAGScheduler.getAncestorShuffleDependencies
          • DAGScheduler.newOrUsedShuffleStage
            • DAGScheduler.newShuffleMapStage

接下來,我們來逐個深入:

DAGScheduler.newResultStage

Spark的Stage調用是從最后一個RDD所在的Stage,ResultStage開始劃分的,這里即為G所在的Stage。但是在生成這個Stage之前會生成它的parent Stage,就這樣遞歸的把parent Stage都先生成了。

private def newResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage}
  • 1

DAGScheduler.getParentStagesAndId

getParentStagesAndId中得到了ParentStages以及其StageId:

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {val parentStages = getParentStages(rdd, firstJobId)val id = nextStageId.getAndIncrement()(parentStages, id)}
  • 1

DAGScheduler.getParentStages

我們再來深入看看getParentStages做了什么:

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {//將存儲ParentStagesval parents = new HashSet[Stage]//存儲已將訪問過了的RDDval visited = new HashSet[RDD[_]]// 存儲需要被處理的RDDval waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {//加入訪問集合visited += r//遍歷該RDD所有的依賴for (dep <- r.dependencies) {dep match {//若是寬依賴則生成新的Stagecase shufDep: ShuffleDependency[_, _, _] =>parents += getShuffleMapStage(shufDep, firstJobId)//若是窄依賴則加入Stack,等待處理case _ =>waitingForVisit.push(dep.rdd)}}}}//在Stack中加入最后一個RDDwaitingForVisit.push(rdd)//廣度優先遍歷while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}//返回ParentStages Listparents.toList}

其實getParentStages使用的就是廣度優先遍歷的算法,若知道這點也容易理解了。雖然現在Stage并沒有生成,但是我們可以看到劃分策略是:廣度遍歷方式的劃分parent RDD 的Stage。

若parent RDD 和 child RDD 為窄依賴,則將parent RDD 納入 child RDD 所在的Stage中。如圖,B被納入了Stage3中。

若parent RDD 和 child RDD 為寬依賴,則parent RDD將納入一新的Stage中。如圖,F被納入了Stage2中。

DAGScheduler.getShuffleMapStage

下面我們來看下getShuffleMapStage是如何生成新的Stage的。
首先shuffleToMapStage中保存了關于Stage的HashMap

private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]

getShuffleMapStage會先去根據shuffleId去查找shuffleToMapStage

private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {shuffleToMapStage.get(shuffleDep.shuffleId) match {//若找到則直接返回case Some(stage) => stagecase None =>// 檢查這個Stage的Parent Stage是否生成// 若沒有,則生成它們 getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>if (!shuffleToMapStage.contains(dep.shuffleId)) {shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)}}// 生成新的Stageval stage = newOrUsedShuffleStage(shuffleDep, firstJobId)//將新的Stage 加入到 HashMapshuffleToMapStage(shuffleDep.shuffleId) = stage//返回新的Stagestage}}
  • 22

可以發現這部分的代碼和上述的newResultStage部分很像,所以可以看成一種遞歸的方法。

DAGScheduler.getAncestorShuffleDependencies

我們再來看下getAncestorShuffleDependencies,可想而知,它應該會和newResultStage中的getParentStages會非常類似:

private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {val parents = new Stack[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {visited += rfor (dep <- r.dependencies) {dep match {case shufDep: ShuffleDependency[_, _, _] =>if (!shuffleToMapStage.contains(shufDep.shuffleId)) {parents.push(shufDep)}case _ =>}waitingForVisit.push(dep.rdd)}}}waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}parents}

可以看到的確和newResultStage中的getParentStages會非常類似,不同的是這里會先判斷shuffleToMapStage是否存在這個Stage,不存在的話會push到parents這個Stack,最會返回給上述的getShuffleMapStage,調用newOrUsedShuffleStage生成新的Stage。

DAGScheduler.newOrUsedShuffleStage

那現在就來看newOrUsedShuffleStage是如何生成新的Stage的。
首先ShuffleMapTask的計算結果(其實是計算結果數據所在的位置、大小等元數據信息)都會傳給Driver的mapOutputTracker。所以需要先判斷Stage是否已經被計算過:

private def newOrUsedShuffleStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {val rdd = shuffleDep.rddval numTasks = rdd.partitions.length//生成新的Stageval stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)//判斷Stage是否已經被計算過//若計算過,則把結果復制到新的stageif (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)val locs = MapOutputTracker.deserializeMapStatuses(serLocs)(0 until locs.length).foreach { i =>if (locs(i) ne null) {stage.addOutputLoc(i, locs(i))}}} else {logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")//如果沒計算過,就在注冊mapOutputTracker Stage//為存儲元數據占位mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)}stage}

DAGScheduler.newShuffleMapStage

遞歸就發生在newShuffleMapStage,它的實現和最一開始的newResultStage類似,也是先getParentStagesAndId,然后生成一個ShuffleMapStage:

private def newShuffleMapStage(rdd: RDD[_],numTasks: Int,shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int,callSite: CallSite): ShuffleMapStage = {val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,firstJobId, callSite, shuffleDep)stageIdToStage(id) = stageupdateJobIdStageIdMaps(firstJobId, stage)stage}

回顧

到此,Stage劃分過程就結束了。我們在根據一開始的圖,舉例回顧下:

  • 首先,我們想 newResultStage RDD_G所在的Stage3
  • 但在new Stage之前會調用getParentStagesAndId
  • getParentStagesAndId中又會調用getParentStages,來廣度優先的遍歷RDD_G所依賴的RDD。如果是窄依賴,就納入G所在的Stage3,如RDD_B就納入了Stage3
  • 若過是寬依賴,我們這里以RDD_F為例(與RDD_A處理過程相同)。我們就會調用getShuffleMapStage,來判斷RDD_F所在的Stage2是否已經生成了,如果生成了就直接返回。
  • 若還沒生成,我們先調用getAncestorShuffleDependencies。getAncestorShuffleDependencies類似于getParentStages,也是用廣度優先的遍歷RDD_F所依賴的RDD。如果是窄依賴,如RDD_C、RDD_D和RDD_E,都被納入了F所在的Stage2。但是假設RDD_E有個parent RDD ``RDD_H,RDD_H和RDD_E之間是寬依賴,那么該怎么辦呢?我們會先判斷RDD_H所在的Stage是否已經生成。若還沒生成,我們把它put到一個parents Stack 中,最后返回。
  • 對于那些返回的還沒生成的Stage我們會調用newOrUsedShuffleStage
  • newOrUsedShuffleStage會調用newShuffleMapStage,來生成新的Stage。而newShuffleMapStage的實現類似于newResultStage。這樣我們就可以遞歸下去,使得每個Stage所依賴的Stage都已經生成了,再來生成這個的Stage。如這里,會將RDD_H所在的Stage生成了,然后在再生成Stage2。
  • newOrUsedShuffleStage生成新的Stage后,會判斷Stage是否被計算過。若已經被計算過,就從mapOutPutTracker中復制計算結果。若沒計算過,則向mapOutPutTracker注冊占位。
  • 最后,回到newResultStage中,new ResultStage,這里即生成了Stage3。至此,Stage劃分過程就結束了。

生成任務

調用棧如下:

  • DAGScheduler.handleJobSubmitted
    • DAGScheduler.submitStage
      • DAGScheduler.getMissingParentStages
      • DAGScheduler.submitMissingTasks

DAGScheduler.handleJobSubmitted

我們再回過頭來看“提交Job”的最后一步handleJobSubmitted:

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: ResultStage = nulltry {finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)} catch {case e: Exception =>logWarning("Creating new stage failed due to exception - job: " + jobId, e)listener.jobFailed(e)return}***}

“劃分Stage”中我們已經深入的講解了finalStage的生成:

finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

接下來,我們繼續往下看handleJobSubmitted的代碼:

//生成新的jobval job = new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))//得到job提交的時間val jobSubmissionTime = clock.getTimeMillis()//得到job idjobIdToActiveJob(jobId) = job//添加到activeJobs HashSetactiveJobs += job//將finalStage甚至ActiveJob為該jobfinalStage.setActiveJob(job)//得到stage 的id 信息val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))//監聽listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))//提交submitStage(finalStage)//等待submitWaitingStages()
  • 16

DAGScheduler.submitStage

接下來我們來看Stage是如何提交的。我們需要找到哪些parent Stage缺失,然后我們先運行生成這些Stage。這是一個深度優先遍歷的過程:

private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {//得到缺失的Parent Stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")//如果沒有缺失的Parent Stage,//那么代表著該Stage可以運行了//submitMissingTasks會完成DAGScheduler最后的工作,//向TaskScheduler 提交 TasksubmitMissingTasks(stage, jobId.get)} else {//深度優先遍歷for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}}
  • 27

DAGScheduler.getMissingParentStages

getMissingParentStages類似于getParentStages,也是使用廣度優先遍歷:

private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddval rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)if (rddHasUncachedPartitions) {for (dep <- rdd.dependencies) {dep match {//若是寬依賴 并且 不可用 ,//則加入 missing HashSetcase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {missing += mapStage}//若是窄依賴//則加入等待訪問的 HashSetcase narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}waitingForVisit.push(stage.rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}missing.toList}
  • 1

DAGScheduler.submitMissingTasks

最后,我們來看下DAGScheduler最后的工作,提交Task:

private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// pendingPartitions 是 HashSet[Int]//存儲待處理的Taskstage.pendingPartitions.clear()// 找出還未就算的Partitionval partitionsToCompute: Seq[Int] = stage.findMissingPartitions()//從一個ActiveJob中得到關于這個Stage的//調度池,job組描述等信息val properties = jobIdToActiveJob(jobId).properties// runningStages 是 HashSet[Stage]//將當前Stage加入到運行中Stage集合runningStages += stagestage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)}val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>partitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn}stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) //向listenerBus發送SparkListenerStageSubmitted事件 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))var taskBinary: Broadcast[Array[Byte]] = nulltry {//對于最后一個Stage的Task,//序列化并廣播(rdd, func)。//若是其他的Stage的Task,//序列化并廣播(rdd, shuffleDep)val taskBinaryBytes: Array[Byte] = stage match {case stage: ShuffleMapStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))case stage: ResultStage =>JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))}taskBinary = sc.broadcast(taskBinaryBytes)} catch {//若序列化失敗,停止這個stagecase e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// 停止執行returncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn}val tasks: Seq[Task[_]] = try {//對于最后一個Stage的Task,//則創建ResultTask。//若是其他的Stage的Task,//則創建ShuffleMapTask。stage match {case stage: ShuffleMapStage =>partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}} catch {case NonFatal(e) =>abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))runningStages -= stagereturn}if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingPartitions ++= tasks.map(_.partitionId)logDebug("New pending partitions: " + stage.pendingPartitions)//創建TaskSet并提交taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {markStageAsFinished(stage, None)val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"case stage : ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"}logDebug(debugString)submitWaitingChildStages(stage)}}
  • 1

TaskSet保存了Stage包含的一組完全相同的Task,每個Task的處理邏輯完全相同,不同的是處理的數據,每個Task負責一個Partition。

至此,DAGScheduler就完成了它的任務了。接下來一篇博文,我們會從上述代碼中的:

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

開始講起,深入理解TaskScheduler的工作過程。

總結

以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。