深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
調(diào)用棧如下:
- TaskSchedulerImpl.statusUpdate
- TaskResultGetter.enqueueSuccessfulTask
- TaskSchedulerImpl.handleSuccessfulTask
- TaskSetManager.handleSuccessfulTask
- DAGScheduler.taskEnded
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.handleTaskCompletion
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.taskEnded
- TaskSetManager.handleSuccessfulTask
- TaskSchedulerImpl.handleSuccessfulTask
- TaskResultGetter.enqueueFailedTask
- TaskSchedulerImpl.handleFailedTask
- TaskSetManager.handleFailedTask
- DAGScheduler.taskEnded
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.handleTaskCompletion
- DAGSchedulerEventProcessLoop.doOnReceive
- DAGScheduler.taskEnded
- TaskSetManager.handleFailedTask
- TaskSchedulerImpl.handleFailedTask
- TaskResultGetter.enqueueSuccessfulTask
TaskSchedulerImpl.statusUpdate
TaskRunner將任務(wù)的執(zhí)行結(jié)果發(fā)送給DriverEndPoint,DriverEndPoint會(huì)轉(zhuǎn)給TaskSchedulerImpl的statusUpdate:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonevar reason: Option[ExecutorLossReason] = Nonesynchronized {try {taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>//這只針對(duì)Mesos調(diào)度模式if (state == TaskState.LOST) {val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException("taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))if (executorIdToRunningTaskIds.contains(execId)) {reason = Some(SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))removeExecutor(execId, reason.get)failedExecutor = Some(execId)}}//FINISHED KILLED LOST 都屬于 isFinishedif (TaskState.isFinished(state)) {cleanupTaskState(tid)taskSet.removeRunningTask(tid)//若FINISHED調(diào)用taskResultGetter.enqueueSuccessfulTask,//否則調(diào)用taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)if (state == TaskState.FINISHED) {taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates) or its " +"executor has been marked as failed.").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}if (failedExecutor.isDefined) {assert(reason.isDefined)dagScheduler.executorLost(failedExecutor.get, reason.get)backend.reviveOffers()}}處理執(zhí)行成功的結(jié)果
我們先來看下處理執(zhí)行成功的結(jié)果的運(yùn)行機(jī)制:
TaskResultGetter.enqueueSuccessfulTask
def enqueueSuccessfulTask(taskSetManager: TaskSetManager,tid: Long,serializedData: ByteBuffer): Unit = {//通過線程池來獲取結(jié)果getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {try {val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {//可以直接獲取到的結(jié)果case directResult: DirectTaskResult[_] =>//判斷大小是否符合要求if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {return}directResult.value(taskResultSerializer.get())(directResult, serializedData.limit())//若不能直接獲取到結(jié)果case IndirectTaskResult(blockId, size) =>if (!taskSetManager.canFetchMoreResults(size)) {// 判斷大小是否符合要求,//若不符合則遠(yuǎn)程的刪除計(jì)算結(jié)果sparkEnv.blockManager.master.removeBlock(blockId)return}logDebug("Fetching indirect task result for TID %s".format(tid))scheduler.handleTaskGettingResult(taskSetManager, tid)val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)//從遠(yuǎn)程獲取計(jì)算結(jié)果if (!serializedTaskResult.isDefined) {//若在任務(wù)執(zhí)行結(jié)束后與我們?nèi)カ@取結(jié)果之間機(jī)器出現(xiàn)故障了//或者block manager 不得不刷新結(jié)果了//那么我們將不能夠獲取到結(jié)果scheduler.handleFailedTask(taskSetManager, tid, TaskState.FINISHED, TaskResultLost)return}val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](serializedTaskResult.get.toByteBuffer)// 反序列化deserializedResult.value(taskResultSerializer.get())sparkEnv.blockManager.master.removeBlock(blockId)(deserializedResult, size)}result.accumUpdates = result.accumUpdates.map { a =>if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {val acc = a.asInstanceOf[LongAccumulator]assert(acc.sum == 0L, "task result size should not have been set on the executors")acc.setValue(size.toLong)acc} else {a}}//處理獲取到的計(jì)算結(jié)果scheduler.handleSuccessfulTask(taskSetManager, tid, result)} catch {case cnf: ClassNotFoundException =>val loader = Thread.currentThread.getContextClassLoadertaskSetManager.abort("ClassNotFound with classloader: " + loader)case NonFatal(ex) =>logError("Exception while getting task result", ex)taskSetManager.abort("Exception while getting task result: %s".format(ex))}}})}- 4
TaskSchedulerImpl.handleSuccessfulTask
調(diào)用taskSetManager.handleSuccessfulTask
def handleSuccessfulTask(taskSetManager: TaskSetManager,tid: Long,taskResult: DirectTaskResult[_]): Unit = synchronized {taskSetManager.handleSuccessfulTask(tid, taskResult)}- 1
TaskSetManager.handleSuccessfulTask
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {val info = taskInfos(tid)val index = info.indexinfo.markFinished(TaskState.FINISHED)//從RunningTask中移除該taskremoveRunningTask(tid)//通知dagScheduler該task完成sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)//殺死所有其他與之相同的task的嘗試for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)}if (!successful(index)) {//計(jì)數(shù)tasksSuccessful += 1logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +s" ($tasksSuccessful/$numTasks)")//若果有所task成功了,//那么標(biāo)記successful,并且停止successful(index) = trueif (tasksSuccessful == numTasks) {isZombie = true}} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")}maybeFinishTaskSet()}- 1
DAGScheduler.taskEnded
我們?cè)偕钊肟聪率侨绾瓮ㄖ猟agScheduler該task完成的:
def taskEnded(task: Task[_],reason: TaskEndReason,result: Any,accumUpdates: Seq[AccumulatorV2[_, _]],taskInfo: TaskInfo): Unit = {//發(fā)送CompletionEvent信號(hào)eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo))}- 1
DAGSchedulerEventProcessLoop.doOnReceive
上一篇博文講過,DAGSchedulerEventProcessLoop的doOnReceive會(huì)對(duì)信號(hào)進(jìn)行監(jiān)聽:
case completion: CompletionEvent =>dagScheduler.handleTaskCompletion(completion)- 1
DAGScheduler.handleTaskCompletion
我們來看下DAGScheduler.handleTaskCompletion部分核心代碼:
***//根據(jù)stageId 得到stageval stage = stageIdToStage(task.stageId)//這里的event就是completionevent.reason match {//這里只看成功的流程case Success =>//將這個(gè)task 從stage等待處理分區(qū)中刪去stage.pendingPartitions -= task.partitionIdtask match {//若是最后一個(gè)Stage的taskcase rt: ResultTask[_, _] =>//將stage 轉(zhuǎn)為 ResultStageval resultStage = stage.asInstanceOf[ResultStage]resultStage.activeJob match {//獲取這Stage的jobcase Some(job) =>if (!job.finished(rt.outputId)) {updateAccumulators(event)//標(biāo)記狀態(tài)job.finished(rt.outputId) = true//計(jì)數(shù)job.numFinished += 1// 若Job的所有partition都完成了,//移除這個(gè)Jobif (job.numFinished == job.numPartitions) {markStageAsFinished(resultStage)cleanupStateForJobAndIndependentStages(job)listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))}//通知 JobWaiter 有任務(wù)成功//但 taskSucceeded 會(huì)運(yùn)行用戶自定義的代碼//因此可能拋出異常 try {job.listener.taskSucceeded(rt.outputId, event.result)} catch {case e: Exception =>// 標(biāo)記為失敗job.listener.jobFailed(new SparkDriverExecutionException(e))}}case None =>logInfo("Ignoring result from " + rt + " because its job has finished")}//若不是最后一個(gè)Stage的Taskcase smt: ShuffleMapTask =>val shuffleStage = stage.asInstanceOf[ShuffleMapStage]updateAccumulators(event)val status = event.result.asInstanceOf[MapStatus]val execId = status.location.executorIdlogDebug("ShuffleMapTask finished on " + execId)if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")} else {//將Task的partitionId和status//追加到OutputLocshuffleStage.addOutputLoc(smt.partitionId, status)}if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {markStageAsFinished(shuffleStage)logInfo("looking for newly runnable stages")logInfo("running: " + runningStages)logInfo("waiting: " + waitingStages)logInfo("failed: " + failedStages)//將outputLoc信息注冊(cè)到mapOutputTracker//上篇博文中有提到://首先ShuffleMapTask的計(jì)算結(jié)果(其實(shí)是計(jì)算結(jié)果數(shù)據(jù)所在的位置、大小等元數(shù)據(jù)信息)都會(huì)傳給Driver的mapOutputTracker。// 所以 DAGScheduler.newOrUsedShuffleStage需要先判斷Stage是否已經(jīng)被計(jì)算過///若計(jì)算過,DAGScheduler.newOrUsedShuffleStage則把結(jié)果復(fù)制到新創(chuàng)建的stage//如果沒計(jì)算過,DAGScheduler.newOrUsedShuffleStage就向注冊(cè)mapOutputTracker Stage,為存儲(chǔ)元數(shù)據(jù)占位mapOutputTracker.registerMapOutputs(shuffleStage.shuffleDep.shuffleId,shuffleStage.outputLocInMapOutputTrackerFormat(),changeEpoch = true)clearCacheLocs()if (!shuffleStage.isAvailable) {//若Stage不可用(一些任務(wù)失敗),則從新提交Stage logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +") because some of its tasks had failed: " +shuffleStage.findMissingPartitions().mkString(", "))submitStage(shuffleStage)} else {// 若該Stage的所有分區(qū)都完成了if (shuffleStage.mapStageJobs.nonEmpty) {val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)//將各個(gè)Task的標(biāo)記為Finishedfor (job <- shuffleStage.mapStageJobs) {markMapStageJobAsFinished(job, stats)}}//提交該Stage的正在等在的Child StagessubmitWaitingChildStages(shuffleStage)}}}***- 1
- 2
- 10
處理執(zhí)行失敗的結(jié)果
TaskResultGetter.enqueueFailedTask
下面,我們回歸頭來看如何處理失敗的結(jié)果。
def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,serializedData: ByteBuffer) {var reason : TaskFailedReason = UnknownReasontry {//通過線程池來處理結(jié)果getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions {val loader = Utils.getContextOrSparkClassLoadertry {//若序列化數(shù)據(jù),即TaskFailedReason,存在且長(zhǎng)度大于0//則反序列化獲取它if (serializedData != null && serializedData.limit() > 0) {reason = serializer.get().deserialize[TaskFailedReason](serializedData, loader)}} catch {//若是ClassNotFoundException,//打印logcase cnd: ClassNotFoundException =>logError("Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)//若其他異常,//不進(jìn)行操作case ex: Exception => }//處理失敗的任務(wù)scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)}})} catch {case e: RejectedExecutionException if sparkEnv.isStopped =>}}TaskSchedulerImpl.handleFailedTask
def handleFailedTask(taskSetManager: TaskSetManager,tid: Long,taskState: TaskState,reason: TaskFailedReason): Unit = synchronized {//處理失敗任務(wù)taskSetManager.handleFailedTask(tid, taskState, reason)if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {//handleFailedTask會(huì)將失敗任務(wù)放入待運(yùn)行的隊(duì)列等待下一次調(diào)度//所以這里開始新的一輪調(diào)度backend.reviveOffers()}}TaskSetManager.handleFailedTask
我們來看下handleFailedTask核心代碼:
***//調(diào)用dagScheduler處理失敗任務(wù)sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)if (successful(index)) {logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +"but another instance of the task has already succeeded, " +"so not re-queuing the task to be re-executed.")} else {//將這個(gè)任務(wù)重新加入到等待隊(duì)列中addPendingTask(index)}if (!isZombie && reason.countTowardsTaskFailures) {taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(info.host, info.executorId, index))assert (null != failureReason)//計(jì)數(shù) 這個(gè)任務(wù)的重試次數(shù)numFailures(index) += 1//若大于等于最大重試次數(shù),默認(rèn)為4,//則取消這個(gè)任務(wù)if (numFailures(index) >= maxTaskFailures) {logError("Task %d in stage %s failed %d times; aborting job".format(index, taskSet.id, maxTaskFailures))abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:".format(index, taskSet.id, maxTaskFailures, failureReason), failureException)return}}maybeFinishTaskSet()}- 33
DAGScheduler.handleTaskCompletion
與處理成功結(jié)果的過程相同,接下來也會(huì)調(diào)用DAGScheduler.taskEnded。DAGSchedulerEventProcessLoop的doOnReceive接收CompletionEvent信號(hào),調(diào)用dagScheduler.handleTaskCompletion(completion)
我們來看下DAGScheduler.handleTaskCompletion 處理失敗任務(wù)部分的核心代碼:
//重新提交任務(wù)case Resubmitted =>logInfo("Resubmitted " + task + ", so marking it as still running")//把任務(wù)加入的等待隊(duì)列stage.pendingPartitions += task.partitionId//獲取結(jié)果失敗case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>val failedStage = stageIdToStage(task.stageId)val mapStage = shuffleIdToMapStage(shuffleId)//若失敗的嘗試ID 不是 stage嘗試ID,//則忽略這個(gè)失敗if (failedStage.latestInfo.attemptId != task.stageAttemptId) {logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +s"(attempt ID ${failedStage.latestInfo.attemptId}) running")} else {//若失敗的Stage還在運(yùn)行隊(duì)列,//標(biāo)記這個(gè)Stage完成if (runningStages.contains(failedStage)) {logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +s"due to a fetch failure from $mapStage (${mapStage.name})")markStageAsFinished(failedStage, Some(failureMessage))} else {logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +s"longer running")}//若不允許重試,//則停止這個(gè)Stageif (disallowStageRetryForTest) {abortStage(failedStage, "Fetch failure will not retry stage due to testing config",None)} //若達(dá)到最大重試次數(shù),//則停止這個(gè)Stageelse if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {abortStage(failedStage, s"$failedStage (${failedStage.name}) " +s"has failed the maximum allowable number of " +s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +s"Most recent failure reason: ${failureMessage}", None)} else {if (failedStages.isEmpty) {//若失敗的Stage中,沒有個(gè)task完成了,//則重新提交Stage。//若果有完成的task的話,我們不能重新提交Stage,//因?yàn)橛行﹖ask已經(jīng)被調(diào)度過了。//task級(jí)別的重新提交是在TaskSetManager.handleFailedTask進(jìn)行的logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +s"$failedStage (${failedStage.name}) due to fetch failure")messageScheduler.schedule(new Runnable {override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)}failedStages += failedStagefailedStages += mapStage}// 移除OutputLoc中的數(shù)據(jù)// 取消注冊(cè)mapOutputTrackerif (mapId != -1) {mapStage.removeOutputLoc(mapId, bmAddress)mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)}//當(dāng)有executor上發(fā)生多次獲取結(jié)果失敗,//則標(biāo)記這個(gè)executor丟失if (bmAddress != null) {handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))}}//拒絕處理case commitDenied: TaskCommitDenied =>// 不做任何事,//讓 TaskScheduler 來決定如何處理//異常case exceptionFailure: ExceptionFailure =>// 更新accumulatorupdateAccumulators(event)//task結(jié)果丟失case TaskResultLost =>// 不做任何事,// 讓 TaskScheduler 處理這些錯(cuò)誤和重新提交任務(wù)// executor 丟失// 任務(wù)被殺死// 未知錯(cuò)誤case _: ExecutorLostFailure | TaskKilled | UnknownReason =>// 不做任何事,// 若這task不斷的錯(cuò)誤,// TaskScheduler 會(huì)停止 job總結(jié)
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (