深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析
這篇博文,我們就來(lái)講講Executor啟動(dòng)后,是如何在Executor上執(zhí)行Task的,以及其后續(xù)處理。
執(zhí)行Task
我們?cè)凇渡钊肜斫釹park 2.1 Core (三):任務(wù)調(diào)度器的原理與源碼分析 》中提到了,任務(wù)調(diào)度完成后,CoarseGrainedSchedulerBackend.DriverEndpoint會(huì)調(diào)用launchTasks向CoarseGrainedExecutorBackend發(fā)送帶著serializedTask的LaunchTask信號(hào)。接下來(lái),我們就來(lái)講講CoarseGrainedExecutorBackend接收到LaunchTask信號(hào)后,是如何執(zhí)行Task的。
調(diào)用棧如下:
- CoarseGrainedExecutorBackend.receive
- Executor.launchTask
- Executor.TaskRunner.run
- Executor.updateDependencies
- Task.run
- ShuffleMapTask.runTask
- ResultTask.runTask
- Executor.TaskRunner.run
- Executor.launchTask
CoarseGrainedExecutorBackend.receive
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {// 反序列話task描述val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId)// 調(diào)用executor.launchTaskexecutor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}Executor.launchTask
def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {// 創(chuàng)建TaskRunnerval tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)// 把taskID 以及 對(duì)應(yīng)的 TaskRunner,// 加入到 ConcurrentHashMap[Long, TaskRunner]runningTasks.put(taskId, tr)// 線程池 執(zhí)行 TaskRunnerthreadPool.execute(tr)}- 1
Executor.TaskRunner.run
override def run(): Unit = {val threadMXBean = ManagementFactory.getThreadMXBeanval taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)// 記錄開(kāi)始反序列化的時(shí)間val deserializeStartTime = System.currentTimeMillis()// 記錄開(kāi)始反序列化的時(shí)的Cpu時(shí)間val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0LThread.currentThread.setContextClassLoader(replClassLoader)val ser = env.closureSerializer.newInstance()logInfo(s"Running $taskName (TID $taskId)")execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStart: Long = 0var taskStartCpu: Long = 0// 開(kāi)始GC的時(shí)間startGCTime = computeTotalGcTime()try {//反序列化任務(wù)信息val (taskFiles, taskJars, taskProps, taskBytes) =Task.deserializeWithDependencies(serializedTask)// 根據(jù)taskProps設(shè)置executor屬性Executor.taskDeserializationProps.set(taskProps)// 根據(jù)taskFiles和taskJars,// 下載任務(wù)所需的File 和 加載所需的Jar包updateDependencies(taskFiles, taskJars)// 根據(jù)taskBytes生成tasktask = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)//設(shè)置task屬性task.localProperties = taskProps//設(shè)置task內(nèi)存管理task.setTaskMemoryManager(taskMemoryManager)// 若在反序列話之前Task就被kill了,// 拋出異常if (killed) {throw new TaskKilledException}logDebug("Task " + taskId + "'s epoch is " + task.epoch)//更新mapOutputTracker Epoch 為task epochenv.mapOutputTracker.updateEpoch(task.epoch)// 記錄任務(wù)開(kāi)始時(shí)間taskStart = System.currentTimeMillis()// 記錄任務(wù)開(kāi)始時(shí)的cpu時(shí)間taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException = trueval value = try {// 運(yùn)行Taskval res = task.run(taskAttemptId = taskId,attemptNumber = attemptNumber,metricsSystem = env.metricsSystem)threwException = falseres} finally {val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +releasedLocks.mkString("[", ", ", "]")if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}}// 記錄任務(wù)結(jié)束時(shí)間val taskFinish = System.currentTimeMillis()// 記錄任務(wù)結(jié)束時(shí)的cpu時(shí)間val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// 若task在運(yùn)行中被kill了// 則拋出異常if (task.killed) {throw new TaskKilledException}val resultSer = env.serializer.newInstance()// 結(jié)果記錄序列化開(kāi)始的系統(tǒng)時(shí)間val beforeSerialization = System.currentTimeMillis()// 序列化結(jié)果val valueBytes = resultSer.serialize(value)// 結(jié)果記錄序列化完成的系統(tǒng)時(shí)間val afterSerialization = System.currentTimeMillis()// 反序列話發(fā)生在兩個(gè)地方:// 1. 在該函數(shù)下反序列化Task信息以及Task實(shí)例。// 2. 在任務(wù)啟動(dòng)后,Task.run 反序列化 RDD 和 函數(shù)// 計(jì)算task的反序列化費(fèi)時(shí)task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) + task.executorDeserializeTime)// 計(jì)算task的反序列化cpu費(fèi)時(shí)task.metrics.setExecutorDeserializeCpuTime((taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)// 計(jì)算task運(yùn)行費(fèi)時(shí)task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)// 計(jì)算task運(yùn)行cpu費(fèi)時(shí)task.metrics.setExecutorCpuTime((taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)// 計(jì)算GC時(shí)間task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)//計(jì)算結(jié)果序列化時(shí)間 task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)val accumUpdates = task.collectAccumulatorUpdates()// 這里代碼存在缺陷:// value相當(dāng)于被序列化了兩次val directResult = new DirectTaskResult(valueBytes, accumUpdates)val serializedDirectResult = ser.serialize(directResult)// 得到結(jié)果的大小val resultSize = serializedDirectResult.limit// 對(duì)于計(jì)算結(jié)果,會(huì)根據(jù)結(jié)果的大小有不同的策略:// 1.生成結(jié)果在(正無(wú)窮,1GB):// 超過(guò)1GB的部分結(jié)果直接丟棄,// 可以通過(guò)spark.driver.maxResultSize實(shí)現(xiàn)// 默認(rèn)為1G// 2.生成結(jié)果大小在$[1GB,128MB - 200KB]// 會(huì)把該結(jié)果以taskId為編號(hào)存入BlockManager中,// 然后把該編號(hào)通過(guò)Netty發(fā)送給Driver,// 該閾值是Netty框架傳輸?shù)淖畲笾?/span>// spark.akka.frameSize(默認(rèn)為128MB)和Netty的預(yù)留空間reservedSizeBytes(200KB)的差值// 3.生成結(jié)果大小在(128MB - 200KB,0):// 直接通過(guò)Netty發(fā)送到Driverval serializedResult: ByteBuffer = {if (maxResultSize > 0 && resultSize > maxResultSize) {logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +s"dropping it.")ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize > maxDirectResultSize) {val blockId = TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")serializedDirectResult}}// 更新execBackend 狀態(tài)execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException =>val reason = ffe.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException =>logInfo(s"Executor killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case _: InterruptedException if task.killed =>logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case CausedBy(cDE: CommitDeniedException) =>val reason = cDE.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable =>logError(s"Exception in $taskName (TID $taskId)", t)val accums: Seq[AccumulatorV2[_, _]] =if (task != null) {task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.collectAccumulatorUpdates(taskFailed = true)} else {Seq.empty}val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))val serializedTaskEndReason = {try {ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))} catch {case _: NotSerializableException =>ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))}}setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {// 任務(wù)結(jié)束后移除runningTasks.remove(taskId)}}- 1
Executor.updateDependencies
接下來(lái),我們來(lái)看看更新executor的依賴,即下載任務(wù)所需的File和加載所需的Jar包:
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)synchronized {// 下載任務(wù)所需的Filefor ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {logInfo("Fetching " + name + " with timestamp " + timestamp)Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,env.securityManager, hadoopConf, timestamp, useCache = !isLocal)currentFiles(name) = timestamp}// 加載所需的Jar包for ((name, timestamp) <- newJars) {val localName = name.split("/").lastval currentTimeStamp = currentJars.get(name).orElse(currentJars.get(localName)).getOrElse(-1L)if (currentTimeStamp < timestamp) {logInfo("Fetching " + name + " with timestamp " + timestamp)Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,env.securityManager, hadoopConf, timestamp, useCache = !isLocal)currentJars(name) = timestamp// 把它加入到 class loaderval url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURLif (!urlClassLoader.getURLs().contains(url)) {logInfo("Adding " + url + " to class loader")urlClassLoader.addURL(url)}}}}}- 1
Task.run
接下來(lái),我們來(lái)看看這篇博文最核心的部分——task運(yùn)行:
final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem): T = {SparkEnv.get.blockManager.registerTask(taskAttemptId)//創(chuàng)建TaskContextImplcontext = new TaskContextImpl(stageId,partitionId,taskAttemptId,attemptNumber,taskMemoryManager,localProperties,metricsSystem,metrics)//在TaskContext中設(shè)置TaskContextImplTaskContext.setTaskContext(context)taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()try {// 調(diào)用runTaskrunTask(context)} catch {case e: Throwable =>try {context.markTaskFailed(e)} catch {case t: Throwable =>e.addSuppressed(t)}throw e} finally {// 標(biāo)記Task完成context.markTaskCompleted()try {Utils.tryLogNonFatalError {// 釋放內(nèi)存SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)val memoryManager = SparkEnv.get.memoryManagermemoryManager.synchronized { memoryManager.notifyAll() }}} finally {//取消TaskContext設(shè)置TaskContext.unset()}}}- 1
Task有兩個(gè)子類,一個(gè)是非最后的Stage的Task,ShuffleMapTask;一個(gè)是最后的Stage的Task,ResultTask。它們都覆蓋了Task的runTask方法,接下來(lái)我們就分別來(lái)講下它們的runTask方法。
ShuffleMapTask.runTask
根據(jù)每個(gè)Stage的partition數(shù)量來(lái)生成ShuffleMapTask,ShuffleMapTask會(huì)根據(jù)下游的Partition數(shù)量和Shuffle的策略來(lái)生成一系列文件。
override def runTask(context: TaskContext): MapStatus = {val threadMXBean = ManagementFactory.getThreadMXBean// 記錄反序列化開(kāi)始時(shí)間val deserializeStartTime = System.currentTimeMillis()// 記錄反序列化開(kāi)始時(shí)的Cpu時(shí)間val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()// 反序列化rdd 及其 依賴val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)// 計(jì)算 反序列化費(fèi)時(shí)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime// 計(jì)算 反序列化Cpu費(fèi)時(shí)_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lvar writer: ShuffleWriter[Any, Any] = nulltry {//獲取shuffleManagerval manager = SparkEnv.get.shuffleManager// writerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// 調(diào)用writer.write 開(kāi)始計(jì)算RDD,// 這部分 我們會(huì)在后續(xù)博文講解writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])// 停止計(jì)算,并返回結(jié)果writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}- 1
ResultTask.runTask
override def runTask(context: TaskContext): U = {val threadMXBean = ManagementFactory.getThreadMXBean// 記錄反序列化開(kāi)始時(shí)間val deserializeStartTime = System.currentTimeMillis()// 記錄反序列化開(kāi)始時(shí)的Cpu時(shí)間val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()// 反序列化rdd 及其 作用于RDD的結(jié)果函數(shù)val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)// 計(jì)算 反序列化費(fèi)時(shí)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime// 計(jì)算 反序列化Cpu費(fèi)時(shí)_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L// 這部分 我們會(huì)在后續(xù)博文講解func(context, rdd.iterator(partition, context))}后續(xù)處理
計(jì)量統(tǒng)計(jì)
對(duì)各個(gè)費(fèi)時(shí)的統(tǒng)計(jì),上章已經(jīng)講解。
回收內(nèi)存
這在上章Task.run也已經(jīng)講解。
處理執(zhí)行結(jié)果
Executor.TaskRunner.run的execBackend.statusUpdate在《深入理解Spark 2.1 Core (四):運(yùn)算結(jié)果處理和容錯(cuò)的原理與源碼分析 》中我們已經(jīng)講解過(guò)。總結(jié)
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 深入理解Spark 2.1 Core (
- 下一篇: 深入理解Spark 2.1 Core (