日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark详解(五):Spark作业执行原理

發布時間:2025/4/16 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark详解(五):Spark作业执行原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark的作業和任務調度系統是其核心,它能夠有效地進行調度的根本原因是對任務的劃分DGG和容錯。下面我們介紹一下相關術語:

  • 作業(Job):RDD中由行動操作所生成的一個或者多個調度階段
  • 調度階段(Stage):每個作業會因為RDD之間的依賴關系拆分成多組任務集合,稱為調度階段,也叫做任務集合(TaskSet)。調度階段的劃分是由DAGScheduler倆劃分的,調度階段由ShuffleMapStage和ResultStage兩種
  • 任務(Task):分發到Executor的工作任務,是Spark實際執行應用的最小單元。在Spark中主要有兩種不同類型的Task,分別是ShuffleMapTask和ResultTask
  • DAGScheduler:DAGScheduler是面向調度階段的任務調度器,負責接收Spark應用提交的作業,根據RDD的依賴關系劃分調度階段,并提交調度階段給TaskScheduler
  • TaskScheduler:TaskScheduler是面向任務的調度器,它接收DAGScheduler提交過來的調度階段,然后把任務分發到Worker階段上運行,由Worker節點上的Executor進程來運行任務。

1. 概述

Spakr操作算子主要分為轉換操作和行動操作,對于轉換操作的計算是lazy級別的,也就是延遲加載,只有出現了行動操作才觸發了作業的提交。在Spark調度中最重要的是DAGScheduler和TaskScheduler兩個調度器,其中DAGScheduler負責任務的邏輯調度,將作業拆分為不同階段的具有依賴關系的任務集合,而TaskScheduler則負責具體任務的調度執行。

(1)Spark應用程序進程各種轉換操作,通過行動操作觸發作業的運行。提交之后根據RDD的依賴關系構建DAG圖,DAG圖提交給DAGScheduler進行解析。
(2)DAGScheduler是面向調度階段的高層次調度器,DAGScheduler把DAG拆分成相互依賴的調度階段,拆分調度階段是以RDD的依賴是否為寬依賴,當遇到寬依賴就劃分為新的調度階段。每個調度階段包含一個或者多個任務,這些任務形成任務集合,提交給底層TaskScheduler進行調度運行。另外DAGScheduler還記錄了哪些RDD被存入磁盤等物化動作,同時尋求任務的最優調度優化,列如數據本地型。DAGScheduler還監控運行調度過程,如果某個調度階段運行失敗,則需要重新提交該調度階段。
(3)每個TaskScheduler只為一個SparkContext實例服務,TaskScheduler接受來自DAGScheduler發送過阿里的任務集合,TaskScheduler收到任務集合之后就把該任務集合以任務的形式一個一個方法到Worker節點中運行,如果某個任務運行失敗,TaskScheduler要負責重試。如果TaskScheduler發現某個任務一直未運行完成,就可能啟動同樣的任務運行同樣的一個任務,哪個任務先運行完成就用哪個任務的結果。
(4)Worke中的Executor收到TaskScheduler發送過來的任務之后,以多線程的方式進行運行,每一個線程負責一個任務。任務結束之后要返回結果給TaskScheduler,不同的類型任務返回 的結果不同。ShuffleMapTask返回的是一個MapStatus對象,而不是結果本身;ResultTask根據結果大小的不同,返回的方式不同。

下面我們來看一下Spark 獨立運行模型下的執行類調用關系圖:

2. 提交作業

RDD的源碼的collect方法觸發了SparkContext的runJob方法來提交作業。SparkContext的runJob方法經過幾次調用后,
進入DAGScheduler的runJob方法,其中SparkContext中調用DAGScheduler類runJob方法代碼如下:

def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)...// 調用DAGScheduler的runJob進行處理dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())// 做checkpoint,以后再說rdd.doCheckpoint()}

在DAGScheduler類內部會進行一列的方法調用,首先是在runJob方法里,調用submitJob方法來繼續提交作業,這里會發生阻塞,知道返回作業完成或失敗的結果;然后在submitJob方法里,創建了一個JobWaiter對象,
并借助內部消息處理進行把這個對象發送給DAGScheduler的內嵌類DAGSchedulerEventProcessLoop進行處理;最后在DAGSchedulerEventProcessLoop消息接收方法OnReceive中,接收到JobSubmitted樣例類完成模式匹配后,
繼續調用DAGScheduler的handleJobSubmitted方法來提交作業,在該方法中進行劃分階段。

def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {// Check to make sure we are not launching a task on a partition that does not exist.// 判斷任務處理分區是否存在,如果不存在,則拋出異常val maxPartitions = rdd.partitions.lengthpartitions.find(p => p >= maxPartitions || p < 0).foreach { p =>throw new IllegalArgumentException("Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)}// 如果作業只包含0個分區,則創建0個任務的JobWaiter,并立即返回val jobId = nextJobId.getAndIncrement()if (partitions.size == 0) {// Return immediately if the job is running 0 tasksreturn new JobWaiter[U](this, jobId, 0, resultHandler)}assert(partitions.size > 0)val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]// 創建JobWaiter對象,等待作業運行完畢,使用內部類提交作業val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter}

在Spark應用程序中,會拆分成多個作業,然后對于這多個作業之間進行調度,Spark目前提供了兩種調度策略:一種是FIFO模式,也是目前默認的模式,還有一種是FAIR模式。

3. 劃分調度階段

Spark調度階段的劃分是由DAGScheduler實現的,DAGScheduler會從最后一個RDD出發使用廣度優先遍歷整個依賴樹,從而劃分調度階段,
調度階段的劃分是以是否為寬依賴進行的,即當某個RDD的操作是Shuffle時,以該Shuffle操作為界限劃分成前后兩個調度階段

代碼實現是在DAGScheduler的handleJobSubmitted方法中根據最后一個RDD生成ReusltStage開始的,具體方法從finalRDD使用getParentsStages找出其依賴的祖先RDD是否存在Shuffle操作

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {// 第一步:使用最后一個RDD創建一個finalStagevar finalStage: ResultStage = nulltry {// 創建一個stage,并將它加入DAGScheduler內部的內存緩沖中, newResultStage的時候就已經得到了他所有的ParentStagefinalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)} catch {...}// 第二步:用finalStage,創建一個Jobval job = new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))// 第三步:將job加入內存緩存中val jobSubmissionTime = clock.getTimeMillis()jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.setActiveJob(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 第四步:使用submitStage方法來提交最后一個stage,// 最后的結果就是,第一個stage提交,其它stage都在等待隊列中submitStage(finalStage)// 提交等待的stagesubmitWaitingStages()}// newResultStage會調用getParentStagesAndId得到所有的父類stage以及它們的id private def newResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {// 得到所有的父stageval (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage}// 繼續調用getParentStages private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {val parentStages = getParentStages(rdd, firstJobId)val id = nextStageId.getAndIncrement()(parentStages, id)}private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {val parents = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {visited += rfor (dep <- r.dependencies) {dep match {// 所依賴RDD操作類型是ShuffleDependency,需要劃分ShuffleMap調度階段,// 以getShuffleMapStage方法為入口,向前遍歷劃分調度階段case shufDep: ShuffleDependency[_, _, _] =>parents += getShuffleMapStage(shufDep, firstJobId)case _ =>waitingForVisit.push(dep.rdd)}}}}// 從最后一個RDD向前遍歷這個依賴樹,如果該RDD依賴樹存在ShuffleDependency的RDD,// 則存在父調度階段,反之,不存在waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}parents.toList}// private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {shuffleToMapStage.get(shuffleDep.shuffleId) match {case Some(stage) => stagecase None =>// We are going to register ancestor shuffle dependenciesgetAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>if (!shuffleToMapStage.contains(dep.shuffleId)) {// 如果shuffleToMapStage中沒有,那么就new一個shuffleMapStageshuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)}}// Then register current shuffleDepval stage = newOrUsedShuffleStage(shuffleDep, firstJobId)shuffleToMapStage(shuffleDep.shuffleId) = stagestage}}// 可以對比這個算法和getParentStages,該方法返回所有的寬依賴 private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {val parents = new Stack[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {visited += rfor (dep <- r.dependencies) {dep match {// 所依賴RDD操作類型是ShuffleDependency,作為劃分ShuffleMap調度階段界限case shufDep: ShuffleDependency[_, _, _] =>if (!shuffleToMapStage.contains(shufDep.shuffleId)) {parents.push(shufDep)}case _ =>}waitingForVisit.push(dep.rdd)}}}waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}parents}

我們結合一個圖來講解上面的代碼:如下圖,有7個RDD,分別是rddA~rddG,它們之間有5個操作,其劃分調度階段如下:

  • 在SparkContext中提交運行時,會調用DAGScheduler的handleJobSubmitted進行處理,在該方法中會先找到最后一個RDD(即rddG),并調用getParentStages方法
  • 在getParentStages方法判斷rddG的依賴RDD樹中是否存在shuffle操作,在該例子中發現join操作為shuffle操作,則獲取該操作的RDD為rddB和rddF
  • 使用getAncestorShuffleDependencies方法從rddB向前遍歷,發現該依賴分支上沒有其他的寬依賴,調用newOrUsedShuffleStage方法生成調度階段ShuffleMapStage0
  • 使用getAncestorShuffleDependencies方法從rddB向前遍歷,發現groupByKey寬依賴操作,以此為分界劃分rddC和rddD為ShuffleMapStage1, rddE和rddF為ShuffleMapStage2
  • 最后生成rddG的ResultStage3。
  • 總結,語句finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)在生成finalStage的同時,建立起所有調度階段的依賴關系,最后通過finalStage生成一個作業實例,在該作業實例中按照順序提交調度階段進行執行。

    4. 提交調度階段

    通過handleJobSubmitted方法中的submitStage(finalStage)來提交作業。在submitStage方法中調用getMissingParentStages方法獲取finalStage父調度階段,如果不存在父調度階段,則使用submitMissingTasks方法提交執行,如果存在父調度階段,則把該調度階段存放到waitingStages列表中,同時遞歸調用submitStage。

    /*** 提交stage的方法* stage劃分算法的入口,stage劃分算法是由submitStage()與getMissingParentStages()共同組成*/private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// getMissingParentStages獲取當前stage的父stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 直到最初的stage,它沒有父stage,那么此時,就會去首先提交第一個stage,stage0// 其余的stage,都在waitingStages里if (missing.isEmpty) {// 如果不存在父調度階段,直接把該調度階段提交運行logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {// 如果存在父調度階段,把該調度階段加入到等待運行調度階段列表中// 同時遞歸調用submitStage方法,直到找到開始的調度階段,即該調度階段沒有父調度階段for (parent <- missing) {// 遞歸調用submitStage方法去提交父stagesubmitStage(parent)}// 并且當前stage放入waitingStages等待執行的stage隊列中waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}}private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddval rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)if (rddHasUncachedPartitions) {// rdd的依賴for (dep <- rdd.dependencies) {dep match {// 如果是寬依賴,那么就創建一個shuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)// 判斷是否可用,也就是判斷父stage有沒有結果, 看源碼可以發現就是判斷_numAvailableOutputs == numPartitions// _numAvailableOutputs就是每個task成功后會+1if (!mapStage.isAvailable) {missing += mapStage}// 如果是窄依賴,那么將依賴的rdd放入棧中case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}// 首先往棧中推入了stage最后一個rddwaitingForVisit.push(stage.rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}missing.toList}

    我們畫圖來講解提交調度階段,如下圖

  • 在handleJobSubmitted方法中獲取了該例子的最后一個調度階段ReusltStage3,通過submitStage方法提交該調度階段。
  • 在submitStage方法中,先創建作業實例,然后判斷該調度階段是否存在父調度階段,由于ReusltStage3有兩個父調度階段ShuffleMapStage0和ShuffleMapStage2,所以不能立即提交調度階段運行,把ReusltStage3放入到等待隊列中等待waitingStages中。
  • 遞歸調用submitStage方法可以知道ShuffleMapStage0不存在父調度階段,而ShuffleMapStage2存在父調度階段ShuffleMapStage1,這樣ShuffleMapStage2加入到等待執行調度階段列表waitingStages中,而ShuffleMapStage0和ShuffleMapStage1兩個調度階段作為第一次調度使用submitMissingTasks方法運行。
  • Executor任務執行完成時發送消息,DAGScheduler等調度器更新狀態時,檢查調度階段運行情況,如果存在執行失敗的任務,則重新提交調度階段;如果所有任務完成,則繼續提交調度階段運行。由于ReusltStage3的父調度階段沒有全部完成,第二次調度階段只提交ShuffleMapStage2運行。
  • 當ShuffleMapStage2運行完畢之后,此時ResultStage3的父調度階段全部完成,提交該調度運行完成。
  • 4. 提交任務

    在submitStage中會執行submitMissingTasks方法中,會根據調度階段partition個數生成對應個數task,這些任務組成一個任務集提交到TaskScheduler進行處理。
    對于ResultStage生成ResultTask,對于ShuffleMapStage生成ShuffleMapTask。

    private def submitMissingTasks(stage: Stage, jobId: Int) {...// 為stage創建指定數量的task// 這里有一個很關鍵,就是task最佳位置的計算val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>partitionsToCompute.map { id =>// 給每個partition創建一個task// 給每個task計算最佳位置val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)}}} catch {...}if (tasks.size > 0) {logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")stage.pendingPartitions ++= tasks.map(_.partitionId)logDebug("New pending partitions: " + stage.pendingPartitions)// 提交taskSet,Standalone模式,使用的是TaskSchedulerImpltaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark// the stage as completed here in case there are no tasks to run// 如果調度階段不存在任務標記,則標記調度階段已經完成markStageAsFinished(stage, None)...} }

    當TaskSchedulerImpl收到發送過來的任務集時,在submitTasks方法中構建一個TaskSetManager的實例,用于管理這個任務集的生命周期,而該TaskSetManager會放入系統的調度池中,根據系統設置的調度算法進行調度,
    TaskSchedulerImpl.submitTasks方法代碼如下:

    override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {// 為每一個taskSet創建一個taskSetManager// taskSetManager在后面負責,TaskSet的任務執行狀況的監視和管理val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])// 把manager加入內存緩存中stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}// 將該任務集的管理器加入到系統調度池中,由系統統一調配,該調度器屬于應用級別// 支持FIFO和FAIR兩種,默認FIFOschedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)...}// 在創建SparkContext,創建TaskScheduler的時候,創建了StandaloneSchedulerBackend,這個backend是負責// 創建AppClient,向Master注冊Application的, 詳見Spark運行時消息通信backend.reviveOffers()

    StandaloneSchedulerBackend的reviveOffers方法是繼承于父類CoarseGrainedSchedulerBackend,該方法會向DriverEndpoint發送消息,調用makeOffers方法。在該方法中先會獲取集群中可用的Executor,
    然后發送到TaskSchedulerImpl中進行對任務集的任務分配運行資源,最后提交到launchTasks方法中。CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers代碼如下:

    private def makeOffers() {// 獲取集群中可用的Executor列表val activeExecutors = executorDataMap.filterKeys(executorIsAlive)// workOffers是每個Executor可用的cpu資源數量val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq// 第一步:調用TaskSchedulerImpl的resourceOffers()方法,執行任務分配算法,將各個task分配到executor上去// 第二步:分配好task到executor之后,執行自己的launchTasks方法,將分配的task發送LaunchTask消息到對應executor上去,由executor啟動并執行launchTasks(scheduler.resourceOffers(workOffers)) }

    第一步:在TaskSchedulerImpl的resourceOffers()方法里進行非常重要的步驟--資源分配, 在分配過程中會根據調度策略對TaskSetMannager進行排序,
    然后依次對這些TaskSetManager按照就近原則分配資源,按照順序為PROCESS_LOCAL NODE_LOCAL NO_PREF RACK_LOCAL ANY

    def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) {executorIdToHost(o.executorId) = o.hostexecutorIdToTaskCount.getOrElseUpdate(o.executorId, 0)if (!executorsByHost.contains(o.host)) {executorsByHost(o.host) = new HashSet[String]()executorAdded(o.executorId, o.host)newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// 首先,將可用的executor進行shuffleval shuffledOffers = Random.shuffle(offers)// tasks,是一個序列,其中的每個元素又是一個ArrayBuffer// 并且每個子ArrayBuffer的數量是固定的,也就是這個executor可用的cpu數量val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArray// 從rootPool中取出排序了的TaskSetManager// 在創建完TaskScheduler StandaloneSchedulerBackend之后,會執行initialize()方法,其實會創建一個調度池// 這里就是所有提交的TaskSetManager,首先會放入這個調度池中,然后再執行task分配算法的時候,會從這個調度池中,取出排好隊的TaskSetManagerval sortedTaskSets = rootPool.getSortedTaskSetQueue// 如果有新加入的Executor,需要重新計算數據本地性for (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYvar launchedTask = falsefor (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {do {// 對當前taskset嘗試使用最小本地化級別,將taskset的task,在executor上進行啟動// 如果啟動不了,就跳出這個do while,進入下一級本地化級別,一次類推launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)} while (launchedTask)}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}

    第二步:分配好資源的任務提交到CoarseGrainedSchedulerBackend的launchTasks方法中,在該方法中會把任務一個個發送到Worker節點上的CoarseGrainedExecutorBacken,
    然后通過其內部的Executor來執行任務

    // 根據分配好的tasks,在executor上啟動相應的task private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {// 序列化val serializedTask = ser.serialize(task)if (serializedTask.limit >= maxRpcMessageSize) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = ...taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}} else {val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASK// 向Worker節點的CoarseGrainedExecutorBackend發送消息執行TaskexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}} }

  • 在提交stage中,第一次調用的是ShuffleMapStage0和ShuffleMapStage1,假設都只有兩個partition,ShuffleMapStage0是TaskSet0,
    ShuffleMapStage1是TaskSet1,每個TaskSet都有兩個任務在執行。
  • TaskScheduler收到發送過來的任務集TaskSet0和TaskSet1,在submitTasks方法中分別構建TaskSetManager0和TaskSetManager1,并把它們兩放到系統的調度池,
  • 根據系統設置的調度算法進行調度(FIFO或者FAIR)
    在TaskSchedulerImpl的resourceOffers方法中按照就近原則進行資源分配,使用CoarseGrainedSchedulerBackend的launchTasks方法把任務發送到Worker節點上的
  • CoarseGrainedExecutorBackend調用其Executor來執行任務
    當ShuffleMapStage2同理,ResultStage3生成的是ResultTask
  • 4. 執行任務

    當CoarseGrainedExecutorBackend接收到LaunchTask消息時,會調用Executor的launchTask方法進行處理。在Executor的launchTask方法中,初始化一個TaskRunner來封裝任務,
    它用于管理任務運行時的細節,再把TaskRunner對象放入到ThreadPool中去執行。TaskRunner.run的前半部分代碼如下:

    override def run(): Unit = {// 生成內存管理taskMemoryManager實例,用于任務運行期間的內存管理val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)val deserializeStartTime = System.currentTimeMillis()// 設置當前類加載器,使用類加載器的原因,用反射的方式來動態加載一個類,然后創建這個類的對象Thread.currentThread.setContextClassLoader(replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(s"Running $taskName (TID $taskId)")// 向DriverEndpoint發送狀態更新信息execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStart: Long = 0startGCTime = computeTotalGcTime()try {// 對任務運行時需要的文件、Jar包、代碼等進行反序列化val (taskFiles, taskJars, taskProps, taskBytes) =Task.deserializeWithDependencies(serializedTask)// Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskProps)updateDependencies(taskFiles, taskJars)task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)task.localProperties = taskPropstask.setTaskMemoryManager(taskMemoryManager)// If this task has been killed before we deserialized it, let's quit now. Otherwise,// continue executing the task.// 任務在反序列化之前被殺死,則拋出異常并退出if (killed) {throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)env.mapOutputTracker.updateEpoch(task.epoch)// Run the actual task and measure its runtime.// 調用Task的runTask方法,由于Task本身是一個抽象類,具體的runTask方法是由它的兩個子類ShuffleMapTask和ResultTask來實現的taskStart = System.currentTimeMillis()var threwException = true// value對于shuffleMapTask來說,其實就是MapStatus,封裝了shuffleMapTask計算的數據,輸出的位置val value = try {val res = task.run( // 具體實現在ShuffleMapTask和ResultTask中taskAttemptId = taskId,attemptNumber = attemptNumber,metricsSystem = env.metricsSystem)threwException = falseres} finally {...}...

    對于ShuffleMapTask, 它的計算結果會寫到BlockManager之中,最終返回給DAGScheduler的是一個MapStatus。該對象管理了ShuffleMapTask的運算結果存儲到BlockManager里的相關存儲信息,而不是計算結果本身,
    這些存儲信息將會成為下一階段的任務需要獲得的輸入數據時的依據。ShuffleMapTask.runTask代碼如下:

    override def runTask(context: TaskContext): MapStatus = {// 反序列化獲取rdd和rdd的依賴// 通過broadcast variable拿到rdd的數據val deserializeStartTime = System.currentTimeMillis()val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimevar writer: ShuffleWriter[Any, Any] = nulltry {// 從shuffleManager中獲取shuffleWriterval manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// rdd.iterator傳入當前task要處理的哪個partition,執行我們自己定義的算子或者是函數// 如果rdd已經cache或者checkpoint,那么直接讀取,否則計算,計算結果保存在本地系統的blockmanager中writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])// 返回結果MapStatus,其實就是blockmanager相關信息writer.stop(success = true).get} catch {...}

    ResultTask的runTask方法如下,它返回的是func函數的計算結果

    override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val deserializeStartTime = System.currentTimeMillis()val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimefunc(context, rdd.iterator(partition, context))}

    7. 獲取執行結果

    對于Executor的計算結果,會根據結果的大小有不同的策略

    生成結果大于1GB,直接丟棄,可以通過spark.driver.maxResultSize來配置
    生成結果大小在[128MB-200kB, 1GB], 會把結果以taskId為編號存入到BlockManager中,然后把編號通過Netty發送給DriverEndpoint,Netty傳輸框架最大值和預留空間的差值
    生成結果大小在[0, 128MB-200KB],通過Netty直接發送到DriverEndpoint。
    具體執行在TaskRunner的run方法后半部分:

    // 對生成的結果序列化,并將結果放入DirectTaskResult中 val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis()// Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)// Note: accumulator updates must be collected after TaskMetrics is updated // 對生成的結果序列化,并將結果放入DirectTaskResult中 val accumUpdates = task.collectAccumulatorUpdates() // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit// directSend = sending directly back to the driver val serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")// 如果序列化結果大于最大值(默認為1GB)直接丟棄ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)// 如果生成結果在[1GB,128MB-200KB]之間,放到BlockManager,然后把該編號通過Netty發送給Driver終端點env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")// IndirectTaskResult間接結果ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")// 通過Nettty之間發送到Driver終端點serializedDirectResult} }execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {...}

    Spark任務運行類調用關系圖

    如果任務是ShuffleMapTask,那么它需要某種機制告訴下游的調度階段,以便其可以作為后續調度階段的輸入,對于ShuffleMapTask來說,其結果是MapStatus;其序列化后存入了DirectTaskResut或者IndirectTaskResult中。而DAGScheduler的handleTaskCompletion方法獲取這個結果,并把這個MapStatus注冊到MapOutputTrackerMaster中,從而完成了ShuffleMapTask的處理。

    如果任務是ReusltTask,判斷改作業是否完成,如果完成,則標記該作業已經完成,清除作業依賴的資源并發送消息給系統監聽總線告知作業執行完畢。

    總結

    以上是生活随笔為你收集整理的Spark详解(五):Spark作业执行原理的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 好姑娘在线观看高清完整版电影 | 成人亚洲一区 | 人人干狠狠干 | 一级毛片黄色 | 浪漫樱花在线观看高清动漫 | 95精品视频 | aaa毛片视频 | 亚洲av综合色区无码二区爱av | 色婷婷综合在线 | 污网站免费 | 99久久亚洲精品日本无码 | 躁躁躁日日躁 | 少妇情理伦片丰满午夜在线观看 | www伊人网 | 成年人看的黄色片 | 国产永久在线观看 | 国产91丝袜在线播放 | 日韩视频欧美视频 | 亚洲天堂性 | 日韩在线中文字幕 | 日本黄a| 在线视频www | 国产精品亚洲专区无码牛牛 | 欧美久久伊人 | 久久久久一 | 艳妇乳肉豪妇荡乳av | 亚洲18在线看污www麻豆 | av大帝在线观看 | 殴美毛片| 国产精品美女自拍视频 | 日韩国产在线一区 | 超碰av在线播放 | 黑人精品xxx一区一二区 | 男女免费观看视频 | 激情一区二区三区 | 亚欧在线| 在线观看麻豆 | 中文字幕日韩精品在线 | 一卡二卡国产 | 玖玖爱免费视频 | 久久国产精品电影 | 黄色在线一区 | 国产6区| 综合影院 | 天天av综合 | 亚洲熟女一区 | 友田真希一区二区 | 翔田千里一区二区三区av | 成人在线网址 | 成人av免费 | 国产精品啪啪啪视频 | av免费观看网址 | 亚洲国产精品999 | 国产精品xxx在线观看 | 人妻少妇精品一区二区三区 | 国产日韩一区二区三免费高清 | 久久久无码人妻精品一区 | 久久久久亚洲av片无码下载蜜桃 | 日韩av色| 亚洲欧美日韩综合一区二区 | 一本色道久久88亚洲精品综合 | 女人特黄大aaaaaa大片 | 99色影院 | youjizz亚洲 | 国产高清视频在线观看 | 精品人妻二区中文字幕 | 91人人爱| 日韩激情第一页 | 欧美亚洲色图视频 | 中文日韩字幕 | 成人国产在线 | 污污的视频网站在线观看 | 亚洲性天堂 | 免费一区二区视频 | 夜夜操夜夜 | 日韩欧美少妇 | 麻豆精品免费观看 | 久久久久国产精品人妻 | 国产免费观看视频 | 日韩精品aaa| 国产99久久久久 | 毛片基地站 | 永久免费,视频 | 亚洲精品乱码久久久久久久久久久久 | 草逼网站 | 亚洲图片视频小说 | 白丝少妇 | 亚洲国产一区二区在线 | 色多多视频在线 | 男同志毛片特黄毛片 | 中文字幕人妻色偷偷久久 | 涩视频在线观看 | 色呦呦网站| 男女拍拍拍网站 | 欧美亚洲一区二区在线观看 | 91秘密入口| 免费美女毛片 | 久久露脸国语精品国产91 | 亚洲欧美另类日韩 |