Spark源码分析之七:Task运行(一)
在Task調度相關的兩篇文章《Spark源碼分析之五:Task調度(一)》與《Spark源碼分析之六:Task調度(二)》中,我們大致了解了Task調度相關的主要邏輯,并且在Task調度邏輯的最后,CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的makeOffers()方法的最后,我們通過調用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],相關代碼如下:
?
[java]?view plain?copy ?- //?調用scheduler的resourceOffers()方法,分配資源,并在得到資源后,調用launchTasks()方法,啟動tasks??
- ??????//?這個scheduler就是TaskSchedulerImpl??
- ??????launchTasks(scheduler.resourceOffers(workOffers))??
- /**?
- ???*?Called?by?cluster?manager?to?offer?resources?on?slaves.?We?respond?by?asking?our?active?task?
- ???*?sets?for?tasks?in?order?of?priority.?We?fill?each?node?with?tasks?in?a?round-robin?manner?so?
- ???*?that?tasks?are?balanced?across?the?cluster.?
- ???*?
- ???*?被集群manager調用以提供slaves上的資源。我們通過按照優先順序詢問活動task集中的task來回應。?
- ???*?我們通過循環的方式將task調度到每個節點上以便tasks在集群中可以保持大致的均衡。?
- ???*/??
- ??def?resourceOffers(offers:?Seq[WorkerOffer]):?Seq[Seq[TaskDescription]]?=?synchronized?{??
? ? ? ? 這個TaskDescription很簡單,是傳遞到executor上即將被執行的Task的描述,通常由TaskSetManager的resourceOffer()方法生成。代碼如下:
?
?
[java]?view plain?copy ?- /**?
- ?*?Description?of?a?task?that?gets?passed?onto?executors?to?be?executed,?usually?created?by?
- ?*?[[TaskSetManager.resourceOffer]].?
- ?*/??
- private[spark]?class?TaskDescription(??
- ????val?taskId:?Long,??
- ????val?attemptNumber:?Int,??
- ????val?executorId:?String,??
- ????val?name:?String,??
- ????val?index:?Int,????//?Index?within?this?task's?TaskSet??
- ????_serializedTask:?ByteBuffer)??
- ??extends?Serializable?{??
- ??
- ??//?Because?ByteBuffers?are?not?serializable,?wrap?the?task?in?a?SerializableBuffer??
- ??//?由于ByteBuffers不可以被序列化,所以將task包裝在SerializableBuffer中,_serializedTask為ByteBuffer類型的Task??
- ??private?val?buffer?=?new?SerializableBuffer(_serializedTask)??
- ????
- ??//?序列化后的Task,?取buffer的value??
- ??def?serializedTask:?ByteBuffer?=?buffer.value??
- ??
- ??
- ??override?def?toString:?String?=?"TaskDescription(TID=%d,?index=%d)".format(taskId,?index)??
- }??
? ? ? ? 此時,得到Seq[Seq[TaskDescription]],即Task被調度到相應executor上后(僅是邏輯調度,實際上并未分配到executor上執行),接下來要做的,便是真正的將Task分配到指定的executor上去執行,也就是本篇我們將要講的Task的運行。而這部分的開端,源于上述提到的CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法,代碼如下:
?
?
[java]?view plain?copy ?- //?Launch?tasks?returned?by?a?set?of?resource?offers??
- ????private?def?launchTasks(tasks:?Seq[Seq[TaskDescription]])?{??
- ??????
- ??????//?循環每個task??
- ??????for?(task?<-?tasks.flatten)?{??
- ??????????
- ????????//?序列化Task??
- ????????val?serializedTask?=?ser.serialize(task)??
- ??????????
- ????????//?序列化后的task的大小超出規定的上限??
- ????????//?即如果序列化后task的大小大于等于框架配置的Akka消息最大大小減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值??
- ????????if?(serializedTask.limit?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
- ????????????
- ??????????//?根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager??
- ??????????scheduler.taskIdToTaskSetManager.get(task.taskId).foreach?{?taskSetMgr?=>??
- ????????????try?{??
- ??????????????var?msg?=?"Serialized?task?%s:%d?was?%d?bytes,?which?exceeds?max?allowed:?"?+??
- ????????????????"spark.akka.frameSize?(%d?bytes)?-?reserved?(%d?bytes).?Consider?increasing?"?+??
- ????????????????"spark.akka.frameSize?or?using?broadcast?variables?for?large?values."??
- ??????????????msg?=?msg.format(task.taskId,?task.index,?serializedTask.limit,?akkaFrameSize,??
- ????????????????AkkaUtils.reservedSizeBytes)??
- ????????????????
- ??????????????//?調用TaskSetManager的abort()方法,標記對應TaskSetManager為失敗??
- ??????????????taskSetMgr.abort(msg)??
- ????????????}?catch?{??
- ??????????????case?e:?Exception?=>?logError("Exception?in?error?callback",?e)??
- ????????????}??
- ??????????}??
- ????????}??
- ????????else?{//?序列化后task的大小在規定的大小內??
- ????????????
- ??????????//?從executorDataMap中,根據task.executorId獲取executor描述信息executorData??
- ??????????val?executorData?=?executorDataMap(task.executorId)??
- ????????????
- ??????????//?executorData中,freeCores做相應減少??
- ??????????executorData.freeCores?-=?scheduler.CPUS_PER_TASK??
- ????????????
- ??????????//?利用executorData中的executorEndpoint,發送LaunchTask事件,LaunchTask事件中包含序列化后的task??
- ??????????executorData.executorEndpoint.send(LaunchTask(new?SerializableBuffer(serializedTask)))??
- ????????}??
- ??????}??
- ????}??
? ? ? ??launchTasks的執行邏輯很簡單,針對傳入的TaskDescription序列,循環每個Task,做以下處理:
?
? ? ? ? 1、首先對Task進行序列化,得到serializedTask;
? ? ? ? 2、針對序列化后的Task:serializedTask,判斷其大小:
? ? ? ? ? ? ? 2.1、序列化后的task的大小達到或超出規定的上限,即框架配置的Akka消息最大大小,減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值,則根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager,并調用其abort()方法,標記對應TaskSetManager為失敗;
? ? ? ? ? ? ? 2.2、序列化后的task的大小未達到上限,在規定的大小范圍內,則:
? ? ? ? ? ? ? ? ? ? ? ?2.2.1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;
? ? ? ? ? ? ? ? ? ? ? ?2.2.2、在executorData中,freeCores做相應減少;
? ? ? ? ? ? ? ? ? ? ? ?2.2.3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。
? ? ? ? 接下來,我們重點分析下上述流程。
? ? ? ? 先說下異常流程,即序列化后Task的大小超過上限時,對TaskSet標記為失敗的處理。入口方法為TaskSetManager的abort()方法,代碼如下:
?
[java]?view plain?copy ?- def?abort(message:?String,?exception:?Option[Throwable]?=?None):?Unit?=?sched.synchronized?{??
- ??????
- ????//?TODO:?Kill?running?tasks?if?we?were?not?terminated?due?to?a?Mesos?error??
- ????//?調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗??
- ????sched.dagScheduler.taskSetFailed(taskSet,?message,?exception)??
- ??????
- ????//?標志位isZombie設置為true??
- ????isZombie?=?true??
- ??????
- ????//?滿足一定條件的情況下,將TaskSet標記為Finished??
- ????maybeFinishTaskSet()??
- ??}??
? ? ? ??abort()方法處理邏輯共分三步:
?
? ? ? ? 第一,調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗;
? ? ? ? 第二,標志位isZombie設置為true;
? ? ? ? 第三,滿足一定條件的情況下,將TaskSet標記為Finished。
? ? ? ? 首先看下DAGScheduler的taskSetFailed()方法,代碼如下:
?
[java]?view plain?copy ?- /**?
- ???*?Called?by?the?TaskSetManager?to?cancel?an?entire?TaskSet?due?to?either?repeated?failures?or?
- ???*?cancellation?of?the?job?itself.?
- ???*/??
- ??def?taskSetFailed(taskSet:?TaskSet,?reason:?String,?exception:?Option[Throwable]):?Unit?=?{??
- ????eventProcessLoop.post(TaskSetFailed(taskSet,?reason,?exception))??
- ??}??
? ? ? ? 和第二篇文章《Spark源碼分析之二:Job的調度模型與運行反饋》中Job的調度模型一致,都是依靠事件隊列eventProcessLoop來完成事件的調度執行的,這里,我們在事件隊列eventProcessLoop中放入了一個TaskSetFailed事件。在DAGScheduler的事件處理調度函數doOnReceive()方法中,明確規定了事件的處理方法,代碼如下:
?
?
[java]?view plain?copy ?- //?如果是TaskSetFailed事件,調用dagScheduler.handleTaskSetFailed()方法處理??
- ????case?TaskSetFailed(taskSet,?reason,?exception)?=>??
- ??????dagScheduler.handleTaskSetFailed(taskSet,?reason,?exception)??
? ? ? ? 下面,我們看下handleTaskSetFailed()這個方法。
?
[java]?view plain?copy ?- private[scheduler]?def?handleTaskSetFailed(??
- ??????taskSet:?TaskSet,??
- ??????reason:?String,??
- ??????exception:?Option[Throwable]):?Unit?=?{??
- ??????
- ????//?根據taskSet的stageId獲取到對應的Stage,循環調用abortStage,終止該Stage??
- ????stageIdToStage.get(taskSet.stageId).foreach?{?abortStage(_,?reason,?exception)?}??
- ??????
- ????//?提交等待的Stages??
- ????submitWaitingStages()??
- ??}??
? ? ? ? 很簡單,首先通過taskSet的stageId獲取到對應的Stage,針對Stage,循環調用abortStage()方法,終止該Stage,然后調用submitWaitingStages()方法提交等待的Stages。我們先看下abortStage()方法,代碼如下:
?
?
[java]?view plain?copy ?- /**?
- ???*?Aborts?all?jobs?depending?on?a?particular?Stage.?This?is?called?in?response?to?a?task?set?
- ???*?being?canceled?by?the?TaskScheduler.?Use?taskSetFailed()?to?inject?this?event?from?outside.?
- ???*?終止給定Stage上的所有Job。?
- ???*/??
- ??private[scheduler]?def?abortStage(??
- ??????failedStage:?Stage,??
- ??????reason:?String,??
- ??????exception:?Option[Throwable]):?Unit?=?{??
- ??????
- ????//?如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回??
- ????if?(!stageIdToStage.contains(failedStage.id))?{??
- ??????//?Skip?all?the?actions?if?the?stage?has?been?removed.??
- ??????return??
- ????}??
- ??????
- ????//?遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs??
- ????val?dependentJobs:?Seq[ActiveJob]?=??
- ??????activeJobs.filter(job?=>?stageDependsOn(job.finalStage,?failedStage)).toSeq??
- ??????
- ????//?標記failedStage的完成時間completionTime??
- ????failedStage.latestInfo.completionTime?=?Some(clock.getTimeMillis())??
- ??????
- ????//?遍歷dependentJobs,調用failJobAndIndependentStages()??
- ????for?(job?<-?dependentJobs)?{??
- ??????failJobAndIndependentStages(job,?s"Job?aborted?due?to?stage?failure:?$reason",?exception)??
- ????}??
- ????if?(dependentJobs.isEmpty)?{??
- ??????logInfo("Ignoring?failure?of?"?+?failedStage?+?"?because?all?jobs?depending?on?it?are?done")??
- ????}??
- ??}??
? ? ? ? 這個方法的處理邏輯主要分為四步:
?
? ? ? ? 1、如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回,這是對異常情況下的一種特殊處理;
? ? ? ? 2、遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs;
? ? ? ? 3、標記failedStage的完成時間completionTime;
? ? ? ? 4、遍歷dependentJobs,調用failJobAndIndependentStages()。
? ? ? ? 其它都好說,我們主要看下stageDependsOn()和failJobAndIndependentStages()這兩個方法。首先看下stageDependsOn()方法,代碼如下:
?
[java]?view plain?copy ?- /**?Return?true?if?one?of?stage's?ancestors?is?target.?*/??
- ??//?如果參數stage的祖先是target,返回true??
- ??private?def?stageDependsOn(stage:?Stage,?target:?Stage):?Boolean?=?{??
- ??????
- ????//?如果stage即為target,返回true??
- ????if?(stage?==?target)?{??
- ??????return?true??
- ????}??
- ??????
- ????//?存儲處理過的RDD??
- ????val?visitedRdds?=?new?HashSet[RDD[_]]??
- ??????
- ????//?We?are?manually?maintaining?a?stack?here?to?prevent?StackOverflowError??
- ????//?caused?by?recursively?visiting??
- ????//?存儲待處理的RDD??
- ????val?waitingForVisit?=?new?Stack[RDD[_]]??
- ??????
- ????//?定義一個visit()方法??
- ????def?visit(rdd:?RDD[_])?{??
- ??????//?如果該RDD未被處理過的話,繼續處理??
- ??????if?(!visitedRdds(rdd))?{??
- ????????//?將RDD添加到visitedRdds中??
- ????????visitedRdds?+=?rdd??
- ??????????
- ????????//?遍歷RDD的依賴??
- ????????for?(dep?<-?rdd.dependencies)?{??
- ??????????dep?match?{??
- ????????????//?如果是ShuffleDependency??
- ????????????case?shufDep:?ShuffleDependency[_,?_,?_]?=>??
- ??????????????
- ??????????????//?獲得mapStage,并且如果stage的isAvailable為false的話,將其壓入waitingForVisit??
- ??????????????val?mapStage?=?getShuffleMapStage(shufDep,?stage.firstJobId)??
- ??????????????if?(!mapStage.isAvailable)?{??
- ????????????????waitingForVisit.push(mapStage.rdd)??
- ??????????????}??//?Otherwise?there's?no?need?to?follow?the?dependency?back??
- ????????????//?如果是NarrowDependency,直接將其壓入waitingForVisit??
- ????????????case?narrowDep:?NarrowDependency[_]?=>??
- ??????????????waitingForVisit.push(narrowDep.rdd)??
- ??????????}??
- ????????}??
- ??????}??
- ????}??
- ??????
- ????//?從stage的rdd開始處理,將其入棧waitingForVisit??
- ????waitingForVisit.push(stage.rdd)??
- ??????
- ????//?當waitingForVisit中存在數據,就調用visit()方法進行處理??
- ????while?(waitingForVisit.nonEmpty)?{??
- ??????visit(waitingForVisit.pop())??
- ????}??
- ??????
- ????//?根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target??
- ????visitedRdds.contains(target.rdd)??
- ??}??
? ? ? ? 這個方法主要是判斷參數stage是否為參數target的祖先stage,其代碼風格與stage劃分和提交中的部分代碼一樣,這在前面的兩篇文章中也提到過,在此不再贅述。而它主要是通過stage的rdd,并遍歷其上層依賴的rdd鏈,將每個stage的rdd加入到visitedRdds中,最后根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target。值得一提的是,如果RDD的依賴是NarrowDependency,直接將其壓入waitingForVisit,如果為ShuffleDependency,則需要判斷stage的isAvailable,如果為false,則將對應RDD壓入waitingForVisit。關于isAvailable,我在《Spark源碼分析之四:Stage提交》一文中具體闡述過,這里不再贅述。
? ? ? ? 接下來,我們再看下failJobAndIndependentStages()方法,這個方法的主要作用就是使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態。代碼如下:
?
[java]?view plain?copy ?- /**?Fails?a?job?and?all?stages?that?are?only?used?by?that?job,?and?cleans?up?relevant?state.?*/??
- ??//?使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態??
- ??private?def?failJobAndIndependentStages(??
- ??????job:?ActiveJob,??
- ??????failureReason:?String,??
- ??????exception:?Option[Throwable]?=?None):?Unit?=?{??
- ??????
- ????//?構造一個異常,內容為failureReason??
- ????val?error?=?new?SparkException(failureReason,?exception.getOrElse(null))??
- ??????
- ????//?標志位,是否能取消Stages??
- ????var?ableToCancelStages?=?true??
- ??
- ????//?標志位,是否應該中斷線程??
- ????val?shouldInterruptThread?=??
- ??????if?(job.properties?==?null)?false??
- ??????else?job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false").toBoolean??
- ??
- ????//?Cancel?all?independent,?running?stages.??
- ????//?取消所有獨立的,正在運行的stages??
- ??????
- ????//?根據Job的jobId,獲取其stages??
- ????val?stages?=?jobIdToStageIds(job.jobId)??
- ??????
- ????//?如果stages為空,記錄錯誤日志??
- ????if?(stages.isEmpty)?{??
- ??????logError("No?stages?registered?for?job?"?+?job.jobId)??
- ????}??
- ??????
- ????//?遍歷stages,循環處理??
- ????stages.foreach?{?stageId?=>??
- ????????
- ??????//?根據stageId,獲取jobsForStage,即每個Job所包含的Stage信息??
- ??????val?jobsForStage:?Option[HashSet[Int]]?=?stageIdToStage.get(stageId).map(_.jobIds)??
- ????????
- ??????//?首先處理異常情況,即jobsForStage為空,或者jobsForStage中不包含當前Job??
- ??????if?(jobsForStage.isEmpty?||?!jobsForStage.get.contains(job.jobId))?{??
- ????????logError(??
- ??????????"Job?%d?not?registered?for?stage?%d?even?though?that?stage?was?registered?for?the?job"??
- ????????????.format(job.jobId,?stageId))??
- ??????}?else?if?(jobsForStage.get.size?==?1)?{??
- ????????//?如果stageId對應的stage不存在??
- ????????if?(!stageIdToStage.contains(stageId))?{??
- ??????????logError(s"Missing?Stage?for?stage?with?id?$stageId")??
- ????????}?else?{??
- ??????????//?This?is?the?only?job?that?uses?this?stage,?so?fail?the?stage?if?it?is?running.??
- ??????????//???
- ??????????val?stage?=?stageIdToStage(stageId)??
- ??????????if?(runningStages.contains(stage))?{??
- ????????????try?{?//?cancelTasks?will?fail?if?a?SchedulerBackend?does?not?implement?killTask??
- ????????????????
- ??????????????//?調用taskScheduler的cancelTasks()方法,取消stage內的tasks??
- ??????????????taskScheduler.cancelTasks(stageId,?shouldInterruptThread)??
- ????????????????
- ??????????????//?標記Stage為完成??
- ??????????????markStageAsFinished(stage,?Some(failureReason))??
- ????????????}?catch?{??
- ??????????????case?e:?UnsupportedOperationException?=>??
- ????????????????logInfo(s"Could?not?cancel?tasks?for?stage?$stageId",?e)??
- ??????????????ableToCancelStages?=?false??
- ????????????}??
- ??????????}??
- ????????}??
- ??????}??
- ????}??
- ??
- ????if?(ableToCancelStages)?{//?如果能取消Stages??
- ??????
- ??????//?調用job監聽器的jobFailed()方法??
- ??????job.listener.jobFailed(error)??
- ????????
- ??????//?為Job和獨立Stages清空狀態,獨立Stages的意思為該stage僅為該Job使用??
- ??????cleanupStateForJobAndIndependentStages(job)??
- ????????
- ??????//?發送一個SparkListenerJobEnd事件??
- ??????listenerBus.post(SparkListenerJobEnd(job.jobId,?clock.getTimeMillis(),?JobFailed(error)))??
- ????}??
- ??}??
? ? ? ? 處理過程還是很簡單的,讀者可以通過上述源碼和注釋自行補腦,這里就先略過了。
?
? ? ? ? 下面,再說下正常情況下,即序列化后Task大小未超過上限時,LaunchTask事件的發送及executor端的響應。代碼再跳轉到CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法。正常情況下處理流程主要分為三大部分:
? ? ? ? 1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;
? ? ? ? 2、在executorData中,freeCores做相應減少;
? ? ? ? 3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。
? ? ? ? 我們重點看下第3步,利用Driver端持有的executor描述信息executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件給executor,將Task傳遞到executor中去執行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。
? ? ? ? 我們先說下這個CoarseGrainedExecutorBackend,類的定義如下所示:
?
[java]?view plain?copy ?- private[spark]?class?CoarseGrainedExecutorBackend(??
- ????override?val?rpcEnv:?RpcEnv,??
- ????driverUrl:?String,??
- ????executorId:?String,??
- ????hostPort:?String,??
- ????cores:?Int,??
- ????userClassPath:?Seq[URL],??
- ????env:?SparkEnv)??
- ??extends?ThreadSafeRpcEndpoint?with?ExecutorBackend?with?Logging?{??
? ? ? ? 由上面的代碼我們可以知道,它實現了ThreadSafeRpcEndpoint和ExecutorBackend兩個trait,而ExecutorBackend的定義如下:
?
?
[java]?view plain?copy ?- /**?
- ?*?A?pluggable?interface?used?by?the?Executor?to?send?updates?to?the?cluster?scheduler.?
- ?*?一個被Executor用來發送更新到集群調度器的可插拔接口。?
- ?*/??
- private[spark]?trait?ExecutorBackend?{??
- ????
- ??//?唯一的一個statusUpdate()方法??
- ??//?需要Long類型的taskId、TaskState類型的state、ByteBuffer類型的data三個參數??
- ??def?statusUpdate(taskId:?Long,?state:?TaskState,?data:?ByteBuffer)??
- }??
?
? ? ? ? 那么它自然就有兩種主要的任務,第一,作為endpoint提供driver與executor間的通訊功能;第二,提供了executor任務執行時狀態匯報的功能。
? ? ? ??CoarseGrainedExecutorBackend到底是什么呢?這里我們先不深究,留到以后分析,你只要知道它是Executor的一個后臺輔助進程,和Executor是一對一的關系,向Executor提供了與Driver通訊、任務執行時狀態匯報兩個基本功能即可。
? ? ? ? 接下來,我們看下CoarseGrainedExecutorBackend是如何處理LaunchTask事件的。做為RpcEndpoint,在其處理各類事件或消息的receive()方法中,定義如下:
?
[java]?view plain?copy ?- case?LaunchTask(data)?=>??
- ??????if?(executor?==?null)?{??
- ????????logError("Received?LaunchTask?command?but?executor?was?null")??
- ????????System.exit(1)??
- ??????}?else?{??
- ????????
- ????????//?反序列話task,得到taskDesc??
- ????????val?taskDesc?=?ser.deserialize[TaskDescription](data.value)??
- ????????logInfo("Got?assigned?task?"?+?taskDesc.taskId)??
- ??????????
- ????????//?調用executor的launchTask()方法加載task??
- ????????executor.launchTask(this,?taskId?=?taskDesc.taskId,?attemptNumber?=?taskDesc.attemptNumber,??
- ??????????taskDesc.name,?taskDesc.serializedTask)??
- ??????}??
? ? ? ? 首先,會判斷對應的executor是否為空,為空的話,記錄錯誤日志并退出,不為空的話,則按照如下流程處理:
?
? ? ? ? 1、反序列話task,得到taskDesc;
? ? ? ? 2、調用executor的launchTask()方法加載task。
? ? ? ? 那么,重點就落在了Executor的launchTask()方法中,代碼如下:
?
[java]?view plain?copy ?- def?launchTask(??
- ??????context:?ExecutorBackend,??
- ??????taskId:?Long,??
- ??????attemptNumber:?Int,??
- ??????taskName:?String,??
- ??????serializedTask:?ByteBuffer):?Unit?=?{??
- ????????
- ????//?新建一個TaskRunner??
- ????val?tr?=?new?TaskRunner(context,?taskId?=?taskId,?attemptNumber?=?attemptNumber,?taskName,??
- ??????serializedTask)??
- ????????
- ????//?將taskId與TaskRunner的對應關系存入runningTasks??
- ????runningTasks.put(taskId,?tr)??
- ??????
- ????//?線程池執行TaskRunner??
- ????threadPool.execute(tr)??
- ??}??
? ? ? ? 非常簡單,創建一個TaskRunner對象,然后將taskId與TaskRunner的對應關系存入runningTasks,將TaskRunner扔到線程池中去執行即可。
?
? ? ? ? 我們先看下這個TaskRunner類。我們先看下Class及其成員變量的定義,如下:
?
[java]?view plain?copy ?- class?TaskRunner(??
- ??????execBackend:?ExecutorBackend,??
- ??????val?taskId:?Long,??
- ??????val?attemptNumber:?Int,??
- ??????taskName:?String,??
- ??????serializedTask:?ByteBuffer)??
- ????extends?Runnable?{??
- ??????
- ????//?TaskRunner繼承了Runnable??
- ??
- ????/**?Whether?this?task?has?been?killed.?*/??
- ????//?標志位,task是否被殺掉??
- ????@volatile?private?var?killed?=?false??
- ??
- ????/**?How?much?the?JVM?process?has?spent?in?GC?when?the?task?starts?to?run.?*/??
- ????@volatile?var?startGCTime:?Long?=?_??
- ??
- ????/**?
- ?????*?The?task?to?run.?This?will?be?set?in?run()?by?deserializing?the?task?binary?coming?
- ?????*?from?the?driver.?Once?it?is?set,?it?will?never?be?changed.?
- ?????*??
- ?????*?需要運行的task。它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變。?
- ?????*/??
- ????@volatile?var?task:?Task[Any]?=?_??
- }??
? ? ? ? 由類的定義我們可以看出,TaskRunner繼承了Runnable,所以它本質上是一個線程,故其可以被放到線程池中去運行。它所包含的成員變量,主要有以下幾個:
?
? ? ? ? 1、execBackend:Executor后臺輔助進程,提供了與Driver通訊、狀態匯報等兩大基本功能,實際上傳入的是CoarseGrainedExecutorBackend實例;
? ? ? ? 2、taskId:Task的唯一標識;
? ? ? ? 3、attemptNumber:Task運行的序列號,Spark與MapReduce一樣,可以為拖后腿任務啟動備份任務,即推測執行原理,如此,就需要通過taskId加attemptNumber來唯一標識一個Task運行實例;
? ? ? ? 4、serializedTask:ByteBuffer類型,序列化后的Task,包含的是Task的內容,通過發序列化它來得到Task,并運行其中的run()方法來執行Task;
? ? ? ? 5、killed:Task是否被殺死的標志位;
? ? ? ? 6、task:Task[Any]類型,需要運行的Task,它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變;
? ? ? ?7、startGCTime:JVM在task開始運行后,進行垃圾回收的時間。
? ? ? ? 另外,既然是一個線程,TaskRunner必須得提供run()方法,該run()方法就是TaskRunner線程在線程池中被調度時,需要執行的方法,我們來看下它的定義:
?
[java]?view plain?copy ?- override?def?run():?Unit?=?{??
- ??????
- ??????//?Step1:Task及其運行時需要的輔助對象構造??
- ????????
- ??????//?獲取任務內存管理器??
- ??????val?taskMemoryManager?=?new?TaskMemoryManager(env.memoryManager,?taskId)??
- ????????
- ??????//?反序列化開始時間??
- ??????val?deserializeStartTime?=?System.currentTimeMillis()??
- ????????
- ??????//?當前線程設置上下文類加載器??
- ??????Thread.currentThread.setContextClassLoader(replClassLoader)??
- ????????
- ??????//?從SparkEnv中獲取序列化器??
- ??????val?ser?=?env.closureSerializer.newInstance()??
- ??????logInfo(s"Running?$taskName?(TID?$taskId)")??
- ????????
- ??????//?execBackend更新狀態TaskState.RUNNING??
- ??????execBackend.statusUpdate(taskId,?TaskState.RUNNING,?EMPTY_BYTE_BUFFER)??
- ??????var?taskStart:?Long?=?0??
- ????????
- ??????//?計算垃圾回收的時間??
- ??????startGCTime?=?computeTotalGcTime()??
- ??
- ??????try?{??
- ????????//?調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes??
- ????????val?(taskFiles,?taskJars,?taskBytes)?=?Task.deserializeWithDependencies(serializedTask)??
- ????????updateDependencies(taskFiles,?taskJars)??
- ??????????
- ????????//?反序列化Task二進制數據taskBytes,得到task實例??
- ????????task?=?ser.deserialize[Task[Any]](taskBytes,?Thread.currentThread.getContextClassLoader)??
- ??????????
- ????????//?設置Task的任務內存管理器??
- ????????task.setTaskMemoryManager(taskMemoryManager)??
- ??
- ????????//?If?this?task?has?been?killed?before?we?deserialized?it,?let's?quit?now.?Otherwise,??
- ????????//?continue?executing?the?task.??
- ????????//?如果此時Task被kill,拋出異常,快速退出??
- ????????if?(killed)?{??
- ??????????//?Throw?an?exception?rather?than?returning,?because?returning?within?a?try{}?block??
- ??????????//?causes?a?NonLocalReturnControl?exception?to?be?thrown.?The?NonLocalReturnControl??
- ??????????//?exception?will?be?caught?by?the?catch?block,?leading?to?an?incorrect?ExceptionFailure??
- ??????????//?for?the?task.??
- ??????????throw?new?TaskKilledException??
- ????????}??
- ??
- ????????logDebug("Task?"?+?taskId?+?"'s?epoch?is?"?+?task.epoch)??
- ????????//?mapOutputTracker更新Epoch??
- ????????env.mapOutputTracker.updateEpoch(task.epoch)??
- ??
- ????????//?Run?the?actual?task?and?measure?its?runtime.??
- ????????//?運行真正的task,并度量它的運行時間??
- ??????????
- ????????//?Step2:Task運行??
- ??????????
- ????????//?task開始時間??
- ????????taskStart?=?System.currentTimeMillis()??
- ??????????
- ????????//?標志位threwException設置為true,標識Task真正執行過程中是否拋出異常??
- ????????var?threwException?=?true??
- ??????????
- ????????//?調用Task的run()方法,真正執行Task,并獲得運行結果value??
- ????????val?(value,?accumUpdates)?=?try?{??
- ??????????
- ??????????//?調用Task的run()方法,真正執行Task??
- ??????????val?res?=?task.run(??
- ????????????taskAttemptId?=?taskId,??
- ????????????attemptNumber?=?attemptNumber,??
- ????????????metricsSystem?=?env.metricsSystem)??
- ????????????
- ??????????//?標志位threwException設置為false??
- ??????????threwException?=?false??
- ????????????
- ??????????//?返回res,Task的run()方法中,res的定義為(T,?AccumulatorUpdates)??
- ??????????//?這里,前者為任務運行結果,后者為累加器更新??
- ??????????res??
- ????????}?finally?{??
- ????????????
- ??????????//?通過任務內存管理器清理所有的分配的內存??
- ??????????val?freedMemory?=?taskMemoryManager.cleanUpAllAllocatedMemory()??
- ??????????if?(freedMemory?>?0)?{??
- ????????????val?errMsg?=?s"Managed?memory?leak?detected;?size?=?$freedMemory?bytes,?TID?=?$taskId"??
- ????????????if?(conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak",?false)?&&?!threwException)?{??
- ??????????????throw?new?SparkException(errMsg)??
- ????????????}?else?{??
- ??????????????logError(errMsg)??
- ????????????}??
- ??????????}??
- ????????}??
- ??????????
- ????????//?task完成時間??
- ????????val?taskFinish?=?System.currentTimeMillis()??
- ??
- ????????//?If?the?task?has?been?killed,?let's?fail?it.??
- ????????//?如果task被殺死,拋出TaskKilledException異常??
- ????????if?(task.killed)?{??
- ??????????throw?new?TaskKilledException??
- ????????}??
- ??
- ????????//?Step3:Task運行結果處理??
- ??????????
- ????????//?通過Spark獲取Task運行結果序列化器??
- ????????val?resultSer?=?env.serializer.newInstance()??
- ??????????
- ????????//?結果序列化前的時間點??
- ????????val?beforeSerialization?=?System.currentTimeMillis()??
- ??????????
- ????????//?利用Task運行結果序列化器序列化Task運行結果,得到valueBytes??
- ????????val?valueBytes?=?resultSer.serialize(value)??
- ??????????
- ????????//?結果序列化后的時間點??
- ????????val?afterSerialization?=?System.currentTimeMillis()??
- ??
- ????????//?度量指標體系相關,暫不介紹??
- ????????for?(m?<-?task.metrics)?{??
- ??????????//?Deserialization?happens?in?two?parts:?first,?we?deserialize?a?Task?object,?which??
- ??????????//?includes?the?Partition.?Second,?Task.run()?deserializes?the?RDD?and?function?to?be?run.??
- ??????????m.setExecutorDeserializeTime(??
- ????????????(taskStart?-?deserializeStartTime)?+?task.executorDeserializeTime)??
- ??????????//?We?need?to?subtract?Task.run()'s?deserialization?time?to?avoid?double-counting??
- ??????????m.setExecutorRunTime((taskFinish?-?taskStart)?-?task.executorDeserializeTime)??
- ??????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
- ??????????m.setResultSerializationTime(afterSerialization?-?beforeSerialization)??
- ??????????m.updateAccumulators()??
- ????????}??
- ??
- ????????//?構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator?updates??
- ????????val?directResult?=?new?DirectTaskResult(valueBytes,?accumUpdates,?task.metrics.orNull)??
- ??????????
- ????????//?序列化DirectTaskResult,得到serializedDirectResult??
- ????????val?serializedDirectResult?=?ser.serialize(directResult)??
- ??????????
- ????????//?獲取Task運行結果大小??
- ????????val?resultSize?=?serializedDirectResult.limit??
- ??
- ????????//?directSend?=?sending?directly?back?to?the?driver??
- ????????//?directSend的意思就是直接發送結果至Driver端??
- ????????val?serializedResult:?ByteBuffer?=?{??
- ??????????
- ??????????//?如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult??
- ??????????//?IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用??
- ??????????if?(maxResultSize?>?0?&&?resultSize?>?maxResultSize)?{??
- ????????????logWarning(s"Finished?$taskName?(TID?$taskId).?Result?is?larger?than?maxResultSize?"?+??
- ??????????????s"(${Utils.bytesToString(resultSize)}?>?${Utils.bytesToString(maxResultSize)}),?"?+??
- ??????????????s"dropping?it.")??
- ????????????ser.serialize(new?IndirectTaskResult[Any](TaskResultBlockId(taskId),?resultSize))??
- ??????????}??
- ??????????//?如果?Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager??
- ??????????//?即運行結果無法通過消息傳遞??
- ??????????else?if?(resultSize?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
- ??????????????
- ????????????val?blockId?=?TaskResultBlockId(taskId)??
- ????????????env.blockManager.putBytes(??
- ??????????????blockId,?serializedDirectResult,?StorageLevel.MEMORY_AND_DISK_SER)??
- ????????????logInfo(??
- ??????????????s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?via?BlockManager)")??
- ????????????ser.serialize(new?IndirectTaskResult[Any](blockId,?resultSize))??
- ??????????}???
- ??????????//?Task運行結果比較小的話,直接返回,通過消息傳遞??
- ??????????else?{??
- ????????????logInfo(s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?to?driver")??
- ????????????serializedDirectResult??
- ??????????}??
- ????????}??
- ??
- ????????//?execBackend更新狀態TaskState.FINISHED??
- ????????execBackend.statusUpdate(taskId,?TaskState.FINISHED,?serializedResult)??
- ??
- ??????}?catch?{//?處理各種異常信息??
- ??????????
- ????????case?ffe:?FetchFailedException?=>??
- ??????????val?reason?=?ffe.toTaskEndReason??
- ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
- ??
- ????????case?_:?TaskKilledException?|?_:?InterruptedException?if?task.killed?=>??
- ??????????logInfo(s"Executor?killed?$taskName?(TID?$taskId)")??
- ??????????execBackend.statusUpdate(taskId,?TaskState.KILLED,?ser.serialize(TaskKilled))??
- ??
- ????????case?cDE:?CommitDeniedException?=>??
- ??????????val?reason?=?cDE.toTaskEndReason??
- ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
- ??
- ????????case?t:?Throwable?=>??
- ??????????//?Attempt?to?exit?cleanly?by?informing?the?driver?of?our?failure.??
- ??????????//?If?anything?goes?wrong?(or?this?was?a?fatal?exception),?we?will?delegate?to??
- ??????????//?the?default?uncaught?exception?handler,?which?will?terminate?the?Executor.??
- ??????????logError(s"Exception?in?$taskName?(TID?$taskId)",?t)??
- ??
- ??????????val?metrics:?Option[TaskMetrics]?=?Option(task).flatMap?{?task?=>??
- ????????????task.metrics.map?{?m?=>??
- ??????????????m.setExecutorRunTime(System.currentTimeMillis()?-?taskStart)??
- ??????????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
- ??????????????m.updateAccumulators()??
- ??????????????m??
- ????????????}??
- ??????????}??
- ??????????val?serializedTaskEndReason?=?{??
- ????????????try?{??
- ??????????????ser.serialize(new?ExceptionFailure(t,?metrics))??
- ????????????}?catch?{??
- ??????????????case?_:?NotSerializableException?=>??
- ????????????????//?t?is?not?serializable?so?just?send?the?stacktrace??
- ????????????????ser.serialize(new?ExceptionFailure(t,?metrics,?false))??
- ????????????}??
- ??????????}??
- ????????????
- ??????????//?execBackend更新狀態TaskState.FAILED??
- ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?serializedTaskEndReason)??
- ??
- ??????????//?Don't?forcibly?exit?unless?the?exception?was?inherently?fatal,?to?avoid??
- ??????????//?stopping?other?tasks?unnecessarily.??
- ??????????if?(Utils.isFatalError(t))?{??
- ????????????SparkUncaughtExceptionHandler.uncaughtException(t)??
- ??????????}??
- ??
- ??????}?finally?{??
- ????????
- ????????//?最后,無論運行成功還是失敗,將task從runningTasks中移除??
- ????????runningTasks.remove(taskId)??
- ??????}??
- ????}??
? ? ? ? 如此長的一個方法,好長好大,哈哈!不過,縱觀全篇,無非三個Step就可搞定:
?
? ? ? ? 1、Step1:Task及其運行時需要的輔助對象構造;
? ? ? ? 2、Step2:Task運行;
? ? ? ? 3、Step3:Task運行結果處理。
? ? ? ? 對, 就這么簡單!鑒于時間與篇幅問題,我們這里先講下主要流程,細節方面的東西留待下節繼續。
? ? ? ? 下面,我們一個個Step來看,首先看下Step1:Task及其運行時需要的輔助對象構造,主要包括以下步驟:
? ? ? ? 1.1、構造TaskMemoryManager任務內存管理器,即taskMemoryManager;
? ? ? ??1.2、記錄反序列化開始時間;
? ? ? ??1.3、當前線程設置上下文類加載器;
? ? ? ??1.4、從SparkEnv中獲取序列化器ser;
? ? ? ??1.5、execBackend更新狀態TaskState.RUNNING;
? ? ? ??1.6、計算垃圾回收時間;
? ? ? ??1.7、調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes;
? ? ? ? 1.8、反序列化Task二進制數據taskBytes,得到task實例;
? ? ? ??1.9、設置Task的任務內存管理器;
? ? ? ? 1.10、如果此時Task被kill,拋出異常,快速退出;
? ? ? ??
? ? ? ? 接下來,是Step2:Task運行,主要流程如下:
? ? ? ??2.1、獲取task開始時間;
? ? ? ??2.2、標志位threwException設置為true,標識Task真正執行過程中是否拋出異常;
? ? ? ??2.3、調用Task的run()方法,真正執行Task,并獲得運行結果value,和累加器更新accumUpdates;
? ? ? ??2.4、標志位threwException設置為false;
? ? ? ??2.5、通過任務內存管理器taskMemoryManager清理所有的分配的內存;
? ? ? ??2.6、獲取task完成時間;
? ? ? ? 2.7、如果task被殺死,拋出TaskKilledException異常。
? ? ? ??
? ? ? ? 最后一步,Step3:Task運行結果處理,大體流程如下:
? ? ? ? 3.1、通過SparkEnv獲取Task運行結果序列化器;
? ? ? ? 3.2、獲取結果序列化前的時間點;
? ? ? ? 3.3、利用Task運行結果序列化器序列化Task運行結果value,得到valueBytes;
? ? ? ??3.4、獲取結果序列化后的時間點;
? ? ? ??3.5、度量指標體系相關,暫不介紹;
? ? ? ??3.6、構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator updates;
? ? ? ? 3.7、序列化DirectTaskResult,得到serializedDirectResult;
? ? ? ? 3.8、獲取Task運行結果大小;
? ? ? ? 3.9、處理Task運行結果:
? ? ? ? ? ? ? ? ?3.9.1、如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult,IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用;
? ? ? ? ? ? ? ? ?3.9.2、如果 Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager,Task運行結果比較小的話,直接返回,通過消息傳遞;
? ? ? ? ? ? ? ? ?3.9.3、Task運行結果比較小的話,直接返回,通過消息傳遞
? ? ? ??3.10、execBackend更新狀態TaskState.FINISHED;
? ? ? ??最后,無論運行成功還是失敗,將task從runningTasks中移除。
? ? ? ? 至此,Task的運行主體流程已經介紹完畢,剩余的部分細節,包括Task內run()方法的具體執行,還有任務內存管理器、序列化器、累加更新,還有部分異常情況處理,狀態匯報等等其他更為詳細的內容留到下篇再講吧!
? ? ? ? 明天還要工作,洗洗睡了!
?
博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50726216
轉載于:https://www.cnblogs.com/jirimutu01/p/5274461.html
總結
以上是生活随笔為你收集整理的Spark源码分析之七:Task运行(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《机电传动控制》学习笔记03-1
- 下一篇: MySQL基本了解与使用