Spark详解(五):Spark作业执行原理
Spark的作業和任務調度系統是其核心,它能夠有效地進行調度的根本原因是對任務的劃分DGG和容錯。下面我們介紹一下相關術語:
- 作業(Job):RDD中由行動操作所生成的一個或者多個調度階段
- 調度階段(Stage):每個作業會因為RDD之間的依賴關系拆分成多組任務集合,稱為調度階段,也叫做任務集合(TaskSet)。調度階段的劃分是由DAGScheduler倆劃分的,調度階段由ShuffleMapStage和ResultStage兩種
- 任務(Task):分發到Executor的工作任務,是Spark實際執行應用的最小單元。在Spark中主要有兩種不同類型的Task,分別是ShuffleMapTask和ResultTask
- DAGScheduler:DAGScheduler是面向調度階段的任務調度器,負責接收Spark應用提交的作業,根據RDD的依賴關系劃分調度階段,并提交調度階段給TaskScheduler
- TaskScheduler:TaskScheduler是面向任務的調度器,它接收DAGScheduler提交過來的調度階段,然后把任務分發到Worker階段上運行,由Worker節點上的Executor進程來運行任務。
1. 概述
Spakr操作算子主要分為轉換操作和行動操作,對于轉換操作的計算是lazy級別的,也就是延遲加載,只有出現了行動操作才觸發了作業的提交。在Spark調度中最重要的是DAGScheduler和TaskScheduler兩個調度器,其中DAGScheduler負責任務的邏輯調度,將作業拆分為不同階段的具有依賴關系的任務集合,而TaskScheduler則負責具體任務的調度執行。
(1)Spark應用程序進程各種轉換操作,通過行動操作觸發作業的運行。提交之后根據RDD的依賴關系構建DAG圖,DAG圖提交給DAGScheduler進行解析。
(2)DAGScheduler是面向調度階段的高層次調度器,DAGScheduler把DAG拆分成相互依賴的調度階段,拆分調度階段是以RDD的依賴是否為寬依賴,當遇到寬依賴就劃分為新的調度階段。每個調度階段包含一個或者多個任務,這些任務形成任務集合,提交給底層TaskScheduler進行調度運行。另外DAGScheduler還記錄了哪些RDD被存入磁盤等物化動作,同時尋求任務的最優調度優化,列如數據本地型。DAGScheduler還監控運行調度過程,如果某個調度階段運行失敗,則需要重新提交該調度階段。
(3)每個TaskScheduler只為一個SparkContext實例服務,TaskScheduler接受來自DAGScheduler發送過阿里的任務集合,TaskScheduler收到任務集合之后就把該任務集合以任務的形式一個一個方法到Worker節點中運行,如果某個任務運行失敗,TaskScheduler要負責重試。如果TaskScheduler發現某個任務一直未運行完成,就可能啟動同樣的任務運行同樣的一個任務,哪個任務先運行完成就用哪個任務的結果。
(4)Worke中的Executor收到TaskScheduler發送過來的任務之后,以多線程的方式進行運行,每一個線程負責一個任務。任務結束之后要返回結果給TaskScheduler,不同的類型任務返回 的結果不同。ShuffleMapTask返回的是一個MapStatus對象,而不是結果本身;ResultTask根據結果大小的不同,返回的方式不同。
下面我們來看一下Spark 獨立運行模型下的執行類調用關系圖:
2. 提交作業
RDD的源碼的collect方法觸發了SparkContext的runJob方法來提交作業。SparkContext的runJob方法經過幾次調用后,
進入DAGScheduler的runJob方法,其中SparkContext中調用DAGScheduler類runJob方法代碼如下:
在DAGScheduler類內部會進行一列的方法調用,首先是在runJob方法里,調用submitJob方法來繼續提交作業,這里會發生阻塞,知道返回作業完成或失敗的結果;然后在submitJob方法里,創建了一個JobWaiter對象,
并借助內部消息處理進行把這個對象發送給DAGScheduler的內嵌類DAGSchedulerEventProcessLoop進行處理;最后在DAGSchedulerEventProcessLoop消息接收方法OnReceive中,接收到JobSubmitted樣例類完成模式匹配后,
繼續調用DAGScheduler的handleJobSubmitted方法來提交作業,在該方法中進行劃分階段。
在Spark應用程序中,會拆分成多個作業,然后對于這多個作業之間進行調度,Spark目前提供了兩種調度策略:一種是FIFO模式,也是目前默認的模式,還有一種是FAIR模式。
3. 劃分調度階段
Spark調度階段的劃分是由DAGScheduler實現的,DAGScheduler會從最后一個RDD出發使用廣度優先遍歷整個依賴樹,從而劃分調度階段,
調度階段的劃分是以是否為寬依賴進行的,即當某個RDD的操作是Shuffle時,以該Shuffle操作為界限劃分成前后兩個調度階段
代碼實現是在DAGScheduler的handleJobSubmitted方法中根據最后一個RDD生成ReusltStage開始的,具體方法從finalRDD使用getParentsStages找出其依賴的祖先RDD是否存在Shuffle操作
private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {// 第一步:使用最后一個RDD創建一個finalStagevar finalStage: ResultStage = nulltry {// 創建一個stage,并將它加入DAGScheduler內部的內存緩沖中, newResultStage的時候就已經得到了他所有的ParentStagefinalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)} catch {...}// 第二步:用finalStage,創建一個Jobval job = new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")logInfo("Parents of final stage: " + finalStage.parents)logInfo("Missing parents: " + getMissingParentStages(finalStage))// 第三步:將job加入內存緩存中val jobSubmissionTime = clock.getTimeMillis()jobIdToActiveJob(jobId) = jobactiveJobs += jobfinalStage.setActiveJob(job)val stageIds = jobIdToStageIds(jobId).toArrayval stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))// 第四步:使用submitStage方法來提交最后一個stage,// 最后的結果就是,第一個stage提交,其它stage都在等待隊列中submitStage(finalStage)// 提交等待的stagesubmitWaitingStages()}// newResultStage會調用getParentStagesAndId得到所有的父類stage以及它們的id private def newResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {// 得到所有的父stageval (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage}// 繼續調用getParentStages private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {val parentStages = getParentStages(rdd, firstJobId)val id = nextStageId.getAndIncrement()(parentStages, id)}private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {val parents = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {visited += rfor (dep <- r.dependencies) {dep match {// 所依賴RDD操作類型是ShuffleDependency,需要劃分ShuffleMap調度階段,// 以getShuffleMapStage方法為入口,向前遍歷劃分調度階段case shufDep: ShuffleDependency[_, _, _] =>parents += getShuffleMapStage(shufDep, firstJobId)case _ =>waitingForVisit.push(dep.rdd)}}}}// 從最后一個RDD向前遍歷這個依賴樹,如果該RDD依賴樹存在ShuffleDependency的RDD,// 則存在父調度階段,反之,不存在waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}parents.toList}// private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {shuffleToMapStage.get(shuffleDep.shuffleId) match {case Some(stage) => stagecase None =>// We are going to register ancestor shuffle dependenciesgetAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>if (!shuffleToMapStage.contains(dep.shuffleId)) {// 如果shuffleToMapStage中沒有,那么就new一個shuffleMapStageshuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)}}// Then register current shuffleDepval stage = newOrUsedShuffleStage(shuffleDep, firstJobId)shuffleToMapStage(shuffleDep.shuffleId) = stagestage}}// 可以對比這個算法和getParentStages,該方法返回所有的寬依賴 private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {val parents = new Stack[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]def visit(r: RDD[_]) {if (!visited(r)) {visited += rfor (dep <- r.dependencies) {dep match {// 所依賴RDD操作類型是ShuffleDependency,作為劃分ShuffleMap調度階段界限case shufDep: ShuffleDependency[_, _, _] =>if (!shuffleToMapStage.contains(shufDep.shuffleId)) {parents.push(shufDep)}case _ =>}waitingForVisit.push(dep.rdd)}}}waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}parents}我們結合一個圖來講解上面的代碼:如下圖,有7個RDD,分別是rddA~rddG,它們之間有5個操作,其劃分調度階段如下:
總結,語句finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)在生成finalStage的同時,建立起所有調度階段的依賴關系,最后通過finalStage生成一個作業實例,在該作業實例中按照順序提交調度階段進行執行。
4. 提交調度階段
通過handleJobSubmitted方法中的submitStage(finalStage)來提交作業。在submitStage方法中調用getMissingParentStages方法獲取finalStage父調度階段,如果不存在父調度階段,則使用submitMissingTasks方法提交執行,如果存在父調度階段,則把該調度階段存放到waitingStages列表中,同時遞歸調用submitStage。
/*** 提交stage的方法* stage劃分算法的入口,stage劃分算法是由submitStage()與getMissingParentStages()共同組成*/private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {// getMissingParentStages獲取當前stage的父stageval missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)// 直到最初的stage,它沒有父stage,那么此時,就會去首先提交第一個stage,stage0// 其余的stage,都在waitingStages里if (missing.isEmpty) {// 如果不存在父調度階段,直接把該調度階段提交運行logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {// 如果存在父調度階段,把該調度階段加入到等待運行調度階段列表中// 同時遞歸調用submitStage方法,直到找到開始的調度階段,即該調度階段沒有父調度階段for (parent <- missing) {// 遞歸調用submitStage方法去提交父stagesubmitStage(parent)}// 并且當前stage放入waitingStages等待執行的stage隊列中waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}}private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage]val visited = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new Stack[RDD[_]]def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rddval rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)if (rddHasUncachedPartitions) {// rdd的依賴for (dep <- rdd.dependencies) {dep match {// 如果是寬依賴,那么就創建一個shuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)// 判斷是否可用,也就是判斷父stage有沒有結果, 看源碼可以發現就是判斷_numAvailableOutputs == numPartitions// _numAvailableOutputs就是每個task成功后會+1if (!mapStage.isAvailable) {missing += mapStage}// 如果是窄依賴,那么將依賴的rdd放入棧中case narrowDep: NarrowDependency[_] =>waitingForVisit.push(narrowDep.rdd)}}}}}// 首先往棧中推入了stage最后一個rddwaitingForVisit.push(stage.rdd)while (waitingForVisit.nonEmpty) {visit(waitingForVisit.pop())}missing.toList}我們畫圖來講解提交調度階段,如下圖
4. 提交任務
在submitStage中會執行submitMissingTasks方法中,會根據調度階段partition個數生成對應個數task,這些任務組成一個任務集提交到TaskScheduler進行處理。
對于ResultStage生成ResultTask,對于ShuffleMapStage生成ShuffleMapTask。
當TaskSchedulerImpl收到發送過來的任務集時,在submitTasks方法中構建一個TaskSetManager的實例,用于管理這個任務集的生命周期,而該TaskSetManager會放入系統的調度池中,根據系統設置的調度算法進行調度,
TaskSchedulerImpl.submitTasks方法代碼如下:
StandaloneSchedulerBackend的reviveOffers方法是繼承于父類CoarseGrainedSchedulerBackend,該方法會向DriverEndpoint發送消息,調用makeOffers方法。在該方法中先會獲取集群中可用的Executor,
然后發送到TaskSchedulerImpl中進行對任務集的任務分配運行資源,最后提交到launchTasks方法中。CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers代碼如下:
第一步:在TaskSchedulerImpl的resourceOffers()方法里進行非常重要的步驟--資源分配, 在分配過程中會根據調度策略對TaskSetMannager進行排序,
然后依次對這些TaskSetManager按照就近原則分配資源,按照順序為PROCESS_LOCAL NODE_LOCAL NO_PREF RACK_LOCAL ANY
第二步:分配好資源的任務提交到CoarseGrainedSchedulerBackend的launchTasks方法中,在該方法中會把任務一個個發送到Worker節點上的CoarseGrainedExecutorBacken,
然后通過其內部的Executor來執行任務
ShuffleMapStage1是TaskSet1,每個TaskSet都有兩個任務在執行。
在TaskSchedulerImpl的resourceOffers方法中按照就近原則進行資源分配,使用CoarseGrainedSchedulerBackend的launchTasks方法把任務發送到Worker節點上的
當ShuffleMapStage2同理,ResultStage3生成的是ResultTask
4. 執行任務
當CoarseGrainedExecutorBackend接收到LaunchTask消息時,會調用Executor的launchTask方法進行處理。在Executor的launchTask方法中,初始化一個TaskRunner來封裝任務,
它用于管理任務運行時的細節,再把TaskRunner對象放入到ThreadPool中去執行。TaskRunner.run的前半部分代碼如下:
對于ShuffleMapTask, 它的計算結果會寫到BlockManager之中,最終返回給DAGScheduler的是一個MapStatus。該對象管理了ShuffleMapTask的運算結果存儲到BlockManager里的相關存儲信息,而不是計算結果本身,
這些存儲信息將會成為下一階段的任務需要獲得的輸入數據時的依據。ShuffleMapTask.runTask代碼如下:
ResultTask的runTask方法如下,它返回的是func函數的計算結果
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val deserializeStartTime = System.currentTimeMillis()val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTimefunc(context, rdd.iterator(partition, context))}7. 獲取執行結果
對于Executor的計算結果,會根據結果的大小有不同的策略
生成結果大于1GB,直接丟棄,可以通過spark.driver.maxResultSize來配置
生成結果大小在[128MB-200kB, 1GB], 會把結果以taskId為編號存入到BlockManager中,然后把編號通過Netty發送給DriverEndpoint,Netty傳輸框架最大值和預留空間的差值
生成結果大小在[0, 128MB-200KB],通過Netty直接發送到DriverEndpoint。
具體執行在TaskRunner的run方法后半部分:
Spark任務運行類調用關系圖
如果任務是ShuffleMapTask,那么它需要某種機制告訴下游的調度階段,以便其可以作為后續調度階段的輸入,對于ShuffleMapTask來說,其結果是MapStatus;其序列化后存入了DirectTaskResut或者IndirectTaskResult中。而DAGScheduler的handleTaskCompletion方法獲取這個結果,并把這個MapStatus注冊到MapOutputTrackerMaster中,從而完成了ShuffleMapTask的處理。
如果任務是ReusltTask,判斷改作業是否完成,如果完成,則標記該作業已經完成,清除作業依賴的資源并發送消息給系統監聽總線告知作業執行完畢。
總結
以上是生活随笔為你收集整理的Spark详解(五):Spark作业执行原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(四):Spark组件以及
- 下一篇: Spark详解(七):SparkCont