日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark技术内幕:Stage划分及提交源码分析

發布時間:2023/12/13 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark技术内幕:Stage划分及提交源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

當觸發一個RDD的action后,以count為例,調用關系如下:

  • org.apache.spark.rdd.RDD#count

  • org.apache.spark.SparkContext#runJob

  • org.apache.spark.scheduler.DAGScheduler#runJob

  • org.apache.spark.scheduler.DAGScheduler#submitJob

  • org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)

  • org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

  • 其 中步驟五的DAGSchedulerEventProcessActor是DAGScheduler 的與外部交互的接口代理,DAGScheduler在創建時會創建名字為eventProcessActor的actor。這個actor的作用看它的實 現就一目了然了:

    ?

    [java] view plaincopy

    ?

  • /**?

  • ?*?The?main?event?loop?of?the?DAG?scheduler.?

  • ?*/??

  • def?receive?=?{??

  • ??case?JobSubmitted(jobId,?rdd,?func,?partitions,?allowLocal,?callSite,?listener,?properties)?=>??

  • ????dagScheduler.handleJobSubmitted(jobId,?rdd,?func,?partitions,?allowLocal,?callSite,??

  • ??????listener,?properties)?//?提交job,來自與RDD->SparkContext->DAGScheduler的消息。之所以在這需要在這里中轉一下,是為了模塊功能的一致性。??

  • ??

  • ??case?StageCancelled(stageId)?=>?//?消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上顯示一個SparkContext的Job的執行狀態。??

  • ????//?用戶可以cancel一個Stage,會通過SparkContext->DAGScheduler?傳遞到這里。??

  • ????dagScheduler.handleStageCancellation(stageId)??

  • ??

  • ??case?JobCancelled(jobId)?=>?//?來自于org.apache.spark.scheduler.JobWaiter的消息。取消一個Job??

  • ????dagScheduler.handleJobCancellation(jobId)??

  • ??

  • ??case?JobGroupCancelled(groupId)?=>?//?取消整個Job?Group??

  • ????dagScheduler.handleJobGroupCancelled(groupId)??

  • ??

  • ??case?AllJobsCancelled?=>?//取消所有Job??

  • ????dagScheduler.doCancelAllJobs()??

  • ??

  • ??case?ExecutorAdded(execId,?host)?=& gt;?//?TaskScheduler得到一個Executor被添加的消息。具體來自 org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers??

  • ????dagScheduler.handleExecutorAdded(execId,?host)??

  • ??

  • ??case?ExecutorLost(execId)?=>?//來自TaskScheduler??

  • ????dagScheduler.handleExecutorLost(execId)??

  • ??

  • ??case?BeginEvent(task,?taskInfo)?=>?//?來自TaskScheduler??

  • ????dagScheduler.handleBeginEvent(task,?taskInfo)??

  • ??

  • ??case?GettingResultEvent(taskInfo)?=>?//處理獲得TaskResult信息的消息??

  • ????dagScheduler.handleGetTaskResult(taskInfo)??

  • ??

  • ??case?completion?@?CompletionEvent(task,?reason,?_,?_,?taskInfo,?taskMetrics)?=>?//來自TaskScheduler,報告task是完成或者失敗??

  • ????dagScheduler.handleTaskCompletion(completion)??

  • ??

  • ??case?TaskSetFailed(taskSet,?reason)?=>?//來自TaskScheduler,要么TaskSet失敗次數超過閾值或者由于Job?Cancel。??

  • ????dagScheduler.handleTaskSetFailed(taskSet,?reason)??

  • ??

  • ??case?ResubmitFailedStages?=>?//當一個Stage處理失敗時,重試。來自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion??

  • ????dagScheduler.resubmitFailedStages()??

  • }??

  • ?

    總 結一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成 DAGScheduler的對外的功能接口。它對外隱藏了自己內部實現的細節,也更易于理解其邏輯;也降低了維護成本,將DAGScheduler的比較 復雜功能接口化。

    ?

    handleJobSubmitted

    org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted 首先會根據RDD創建finalStage。finalStage,顧名思義,就是最后的那個Stage。然后創建job,最后提交。提交的job如果滿 足一下條件,那么它將以本地模式運行:

    1)spark.localExecution.enabled設置為true ?并且 2)用戶程序顯式指定可以本地運行 并且 3)finalStage的沒有父Stage 并且 4)僅有一個partition

    3)和 4)的話主要為了任務可以快速執行;如果有多個stage或者多個partition的話,本地運行可能會因為本機的計算資源的問題而影響任務的計算速度。

    要 理解什么是Stage,首先要搞明白什么是Task。Task是在集群上運行的基本單位。一個Task負責處理RDD的一個partition。RDD的 多個patition會分別由不同的Task去處理。當然了這些Task的處理邏輯完全是一致的。這一組Task就組成了一個Stage。有兩種 Task:

  • ?org.apache.spark.scheduler.ShuffleMapTask

  • ?org.apache.spark.scheduler.ResultTask

  • ShuffleMapTask 根據Task的partitioner將計算結果放到不同的bucket中。而ResultTask將計算結果發送回Driver Application。一個Job包含了多個Stage,而Stage是由一組完全相同的Task組成的。最后的Stage包含了一組 ResultTask。

    在用戶觸發了一個action后, 比如count,collect,SparkContext會通過runJob的函數開始進行任務提交。最后會通過DAG的event processor 傳遞到DAGScheduler本身的handleJobSubmitted,它首先會劃分Stage,提交Stage,提交Task。至此,Task就 開始在運行在集群上了。

    一個Stage的開始就是從外部存儲或者shuffle結果中讀取數據;一個Stage的結束就是由于發生shuffle或者生成結果時。

    ?

    創建finalStage

    handleJobSubmitted 通過調用newStage來創建finalStage:

    ?

    [java] view plaincopy

    ?

  • finalStage?=?newStage(finalRDD,?partitions.size,?None,?jobId,?callSite)??


  • 創 建一個result stage,或者說finalStage,是通過調用 org.apache.spark.scheduler.DAGScheduler#newStage完成的;而創建一個shuffle stage,需要通過調用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。?

    [java] view plaincopy

    ?

  • private?def?newStage(??

  • ??????rdd:?RDD[_],??

  • ??????numTasks:?Int,??

  • ??????shuffleDep:?Option[ShuffleDependency[_,?_,?_]],??

  • ??????jobId:?Int,??

  • ??????callSite:?CallSite)??

  • ????:?Stage?=??

  • ??{??

  • ????val?id?=?nextStageId.getAndIncrement()??

  • ????val?stage?=??

  • ??????new?Stage(id,?rdd,?numTasks,?shuffleDep,?getParentStages(rdd,?jobId),?jobId,?callSite)??

  • ????stageIdToStage(id)?=?stage??

  • ????updateJobIdStageIdMaps(jobId,?stage)??

  • ????stage??

  • ??}??


  • 對于result 的final stage來說,傳入的shuffleDep是None。

    我們知道,RDD通過org.apache.spark.rdd.RDD#getDependencies可以獲得它依賴的parent RDD。而Stage也可能會有parent Stage。看一個RDD論文的Stage劃分吧:

    ?

    ?

    ?

    一個stage的邊界,輸入是外部的存儲或者一個stage shuffle的結果;輸入則是Job的結果(result task對應的stage)或者shuffle的結果。

    上圖的話stage3的輸入則是RDD A和RDD F shuffle的結果。而A和F由于到B和G需要shuffle,因此需要劃分到不同的stage。

    從源碼實現的角度來看,通過觸發action也就是最后一個RDD創建final stage(上圖的stage 3),我們注意到new Stage的第五個參數就是該Stage的parent Stage:通過rdd和job id獲取:

    ?

    [java] view plaincopy

    ?

  • //?生成rdd的parent?Stage。沒遇到一個ShuffleDependency,就會生成一個Stage??

  • ??private?def?getParentStages(rdd:?RDD[_],?jobId:?Int):?List[Stage]?=?{??

  • ????val?parents?=?new?HashSet[Stage]?//存儲parent?stage??

  • ????val?visited?=?new?HashSet[RDD[_]]?//存儲已經被訪問到得RDD??

  • ????//?We?are?manually?maintaining?a?stack?here?to?prevent?StackOverflowError??

  • ????//?caused?by?recursively?visiting?//?存儲需要被處理的RDD。Stack中得RDD都需要被處理。??

  • ????val?waitingForVisit?=?new?Stack[RDD[_]]??

  • ????def?visit(r:?RDD[_])?{??

  • ??????if?(!visited(r))?{??

  • ????????visited?+=?r??

  • ????????//?Kind?of?ugly:?need?to?register?RDDs?with?the?cache?here?since??

  • ????????//?we?can't?do?it?in?its?constructor?because?#?of?partitions?is?unknown??

  • ????????for?(dep?<-?r.dependencies)?{??

  • ??????????dep?match?{??

  • ????????????case?shufDep:?ShuffleDependency[_,?_,?_]?=>?//?在ShuffleDependency時需要生成新的stage??

  • ??????????????parents?+=?getShuffleMapStage(shufDep,?jobId)??

  • ????????????case?_?=>??

  • ??????????????waitingForVisit.push(dep.rdd)?//不是ShuffleDependency,那么就屬于同一個Stage??

  • ??????????}??

  • ????????}??

  • ??????}??

  • ????}??

  • ????waitingForVisit.push(rdd)?//?輸入的rdd作為第一個需要處理的RDD。然后從該rdd開始,順序訪問其parent?rdd??

  • ????while?(!waitingForVisit.isEmpty)?{?//只要stack不為空,則一直處理。??

  • ??????visit(waitingForVisit.pop())?//每次visit如果遇到了ShuffleDependency,那么就會形成一個Stage,否則這些RDD屬于同一個Stage??

  • ????}??

  • ????parents.toList??

  • ??}??

  • ?

    生成了finalStage后,就需要提交Stage了。

    ?

    [java] view plaincopy

    ?

  • //?提交Stage,如果有parent?Stage沒有提交,那么遞歸提交它。??

  • private?def?submitStage(stage:?Stage)?{??

  • ??val?jobId?=?activeJobForStage(stage)??

  • ??if?(jobId.isDefined)?{??

  • ????logDebug("submitStage("?+?stage?+?")")??

  • ????//?如果當前stage不在等待其parent?stage的返回,并且?不在運行的狀態,?并且?沒有已經失敗(失敗會有重試機制,不會通過這里再次提交)??

  • ????if?(!waitingStages(stage)?&&?!runningStages(stage)?&&?!failedStages(stage))?{??

  • ??????val?missing?=?getMissingParentStages(stage).sortBy(_.id)??

  • ??????logDebug("missing:?"?+?missing)??

  • ??????if?(missing?==?Nil)?{?//?如果所有的parent?stage都已經完成,那么提交該stage所包含的task??

  • ????????logInfo("Submitting?"?+?stage?+?"?("?+?stage.rdd?+?"),?which?has?no?missing?parents")??

  • ????????submitMissingTasks(stage,?jobId.get)??

  • ??????}?else?{??

  • ????????for?(parent?<-?missing)?{?//?有parent?stage為完成,則遞歸提交它??

  • ??????????submitStage(parent)??

  • ????????}??

  • ????????waitingStages?+=?stage??

  • ??????}??

  • ????}??

  • ??}?else?{??

  • ????abortStage(stage,?"No?active?job?for?stage?"?+?stage.id)??

  • ??}??

  • }??

  • ?

    DAGScheduler將Stage劃分完成后,提交實際上是通過把Stage轉換為TaskSet,然后通過TaskScheduler將計算任務最終提交到集群。其所在的位置如下圖所示。

    ?

    接下來,將分析Stage是如何轉換為TaskSet,并最終提交到Executor去運行的。

    ?

    轉載于:https://www.cnblogs.com/wzyxidian/p/4853618.html

    總結

    以上是生活随笔為你收集整理的Spark技术内幕:Stage划分及提交源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。