日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Spark源码分析之七:Task运行(一)

發布時間:2023/11/27 生活经验 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark源码分析之七:Task运行(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在Task調度相關的兩篇文章《Spark源碼分析之五:Task調度(一)》與《Spark源碼分析之六:Task調度(二)》中,我們大致了解了Task調度相關的主要邏輯,并且在Task調度邏輯的最后,CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的makeOffers()方法的最后,我們通過調用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],相關代碼如下:

?

[java]?view plain?copy ?
  1. //?調用scheduler的resourceOffers()方法,分配資源,并在得到資源后,調用launchTasks()方法,啟動tasks??
  2. ??????//?這個scheduler就是TaskSchedulerImpl??
  3. ??????launchTasks(scheduler.resourceOffers(workOffers))??
[java]?view plain?copy ?
  1. /**?
  2. ???*?Called?by?cluster?manager?to?offer?resources?on?slaves.?We?respond?by?asking?our?active?task?
  3. ???*?sets?for?tasks?in?order?of?priority.?We?fill?each?node?with?tasks?in?a?round-robin?manner?so?
  4. ???*?that?tasks?are?balanced?across?the?cluster.?
  5. ???*?
  6. ???*?被集群manager調用以提供slaves上的資源。我們通過按照優先順序詢問活動task集中的task來回應。?
  7. ???*?我們通過循環的方式將task調度到每個節點上以便tasks在集群中可以保持大致的均衡。?
  8. ???*/??
  9. ??def?resourceOffers(offers:?Seq[WorkerOffer]):?Seq[Seq[TaskDescription]]?=?synchronized?{??

? ? ? ? 這個TaskDescription很簡單,是傳遞到executor上即將被執行的Task的描述,通常由TaskSetManager的resourceOffer()方法生成。代碼如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ?*?Description?of?a?task?that?gets?passed?onto?executors?to?be?executed,?usually?created?by?
  3. ?*?[[TaskSetManager.resourceOffer]].?
  4. ?*/??
  5. private[spark]?class?TaskDescription(??
  6. ????val?taskId:?Long,??
  7. ????val?attemptNumber:?Int,??
  8. ????val?executorId:?String,??
  9. ????val?name:?String,??
  10. ????val?index:?Int,????//?Index?within?this?task's?TaskSet??
  11. ????_serializedTask:?ByteBuffer)??
  12. ??extends?Serializable?{??
  13. ??
  14. ??//?Because?ByteBuffers?are?not?serializable,?wrap?the?task?in?a?SerializableBuffer??
  15. ??//?由于ByteBuffers不可以被序列化,所以將task包裝在SerializableBuffer中,_serializedTask為ByteBuffer類型的Task??
  16. ??private?val?buffer?=?new?SerializableBuffer(_serializedTask)??
  17. ????
  18. ??//?序列化后的Task,?取buffer的value??
  19. ??def?serializedTask:?ByteBuffer?=?buffer.value??
  20. ??
  21. ??
  22. ??override?def?toString:?String?=?"TaskDescription(TID=%d,?index=%d)".format(taskId,?index)??
  23. }??

? ? ? ? 此時,得到Seq[Seq[TaskDescription]],即Task被調度到相應executor上后(僅是邏輯調度,實際上并未分配到executor上執行),接下來要做的,便是真正的將Task分配到指定的executor上去執行,也就是本篇我們將要講的Task的運行。而這部分的開端,源于上述提到的CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. //?Launch?tasks?returned?by?a?set?of?resource?offers??
  2. ????private?def?launchTasks(tasks:?Seq[Seq[TaskDescription]])?{??
  3. ??????
  4. ??????//?循環每個task??
  5. ??????for?(task?<-?tasks.flatten)?{??
  6. ??????????
  7. ????????//?序列化Task??
  8. ????????val?serializedTask?=?ser.serialize(task)??
  9. ??????????
  10. ????????//?序列化后的task的大小超出規定的上限??
  11. ????????//?即如果序列化后task的大小大于等于框架配置的Akka消息最大大小減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值??
  12. ????????if?(serializedTask.limit?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
  13. ????????????
  14. ??????????//?根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager??
  15. ??????????scheduler.taskIdToTaskSetManager.get(task.taskId).foreach?{?taskSetMgr?=>??
  16. ????????????try?{??
  17. ??????????????var?msg?=?"Serialized?task?%s:%d?was?%d?bytes,?which?exceeds?max?allowed:?"?+??
  18. ????????????????"spark.akka.frameSize?(%d?bytes)?-?reserved?(%d?bytes).?Consider?increasing?"?+??
  19. ????????????????"spark.akka.frameSize?or?using?broadcast?variables?for?large?values."??
  20. ??????????????msg?=?msg.format(task.taskId,?task.index,?serializedTask.limit,?akkaFrameSize,??
  21. ????????????????AkkaUtils.reservedSizeBytes)??
  22. ????????????????
  23. ??????????????//?調用TaskSetManager的abort()方法,標記對應TaskSetManager為失敗??
  24. ??????????????taskSetMgr.abort(msg)??
  25. ????????????}?catch?{??
  26. ??????????????case?e:?Exception?=>?logError("Exception?in?error?callback",?e)??
  27. ????????????}??
  28. ??????????}??
  29. ????????}??
  30. ????????else?{//?序列化后task的大小在規定的大小內??
  31. ????????????
  32. ??????????//?從executorDataMap中,根據task.executorId獲取executor描述信息executorData??
  33. ??????????val?executorData?=?executorDataMap(task.executorId)??
  34. ????????????
  35. ??????????//?executorData中,freeCores做相應減少??
  36. ??????????executorData.freeCores?-=?scheduler.CPUS_PER_TASK??
  37. ????????????
  38. ??????????//?利用executorData中的executorEndpoint,發送LaunchTask事件,LaunchTask事件中包含序列化后的task??
  39. ??????????executorData.executorEndpoint.send(LaunchTask(new?SerializableBuffer(serializedTask)))??
  40. ????????}??
  41. ??????}??
  42. ????}??

? ? ? ??launchTasks的執行邏輯很簡單,針對傳入的TaskDescription序列,循環每個Task,做以下處理:

?

? ? ? ? 1、首先對Task進行序列化,得到serializedTask;

? ? ? ? 2、針對序列化后的Task:serializedTask,判斷其大小:

? ? ? ? ? ? ? 2.1、序列化后的task的大小達到或超出規定的上限,即框架配置的Akka消息最大大小,減去除序列化task或task結果外,一個Akka消息需要保留的額外大小的值,則根據task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中獲取對應的TaskSetManager,并調用其abort()方法,標記對應TaskSetManager為失敗;

? ? ? ? ? ? ? 2.2、序列化后的task的大小未達到上限,在規定的大小范圍內,則:

? ? ? ? ? ? ? ? ? ? ? ?2.2.1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;

? ? ? ? ? ? ? ? ? ? ? ?2.2.2、在executorData中,freeCores做相應減少;

? ? ? ? ? ? ? ? ? ? ? ?2.2.3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。

? ? ? ? 接下來,我們重點分析下上述流程。

? ? ? ? 先說下異常流程,即序列化后Task的大小超過上限時,對TaskSet標記為失敗的處理。入口方法為TaskSetManager的abort()方法,代碼如下:

?

[java]?view plain?copy ?
  1. def?abort(message:?String,?exception:?Option[Throwable]?=?None):?Unit?=?sched.synchronized?{??
  2. ??????
  3. ????//?TODO:?Kill?running?tasks?if?we?were?not?terminated?due?to?a?Mesos?error??
  4. ????//?調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗??
  5. ????sched.dagScheduler.taskSetFailed(taskSet,?message,?exception)??
  6. ??????
  7. ????//?標志位isZombie設置為true??
  8. ????isZombie?=?true??
  9. ??????
  10. ????//?滿足一定條件的情況下,將TaskSet標記為Finished??
  11. ????maybeFinishTaskSet()??
  12. ??}??

? ? ? ??abort()方法處理邏輯共分三步:

?

? ? ? ? 第一,調用DAGScheduler的taskSetFailed()方法,標記TaskSet運行失敗;

? ? ? ? 第二,標志位isZombie設置為true;

? ? ? ? 第三,滿足一定條件的情況下,將TaskSet標記為Finished。

? ? ? ? 首先看下DAGScheduler的taskSetFailed()方法,代碼如下:

?

[java]?view plain?copy ?
  1. /**?
  2. ???*?Called?by?the?TaskSetManager?to?cancel?an?entire?TaskSet?due?to?either?repeated?failures?or?
  3. ???*?cancellation?of?the?job?itself.?
  4. ???*/??
  5. ??def?taskSetFailed(taskSet:?TaskSet,?reason:?String,?exception:?Option[Throwable]):?Unit?=?{??
  6. ????eventProcessLoop.post(TaskSetFailed(taskSet,?reason,?exception))??
  7. ??}??

? ? ? ? 和第二篇文章《Spark源碼分析之二:Job的調度模型與運行反饋》中Job的調度模型一致,都是依靠事件隊列eventProcessLoop來完成事件的調度執行的,這里,我們在事件隊列eventProcessLoop中放入了一個TaskSetFailed事件。在DAGScheduler的事件處理調度函數doOnReceive()方法中,明確規定了事件的處理方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. //?如果是TaskSetFailed事件,調用dagScheduler.handleTaskSetFailed()方法處理??
  2. ????case?TaskSetFailed(taskSet,?reason,?exception)?=>??
  3. ??????dagScheduler.handleTaskSetFailed(taskSet,?reason,?exception)??

? ? ? ? 下面,我們看下handleTaskSetFailed()這個方法。

?

[java]?view plain?copy ?
  1. private[scheduler]?def?handleTaskSetFailed(??
  2. ??????taskSet:?TaskSet,??
  3. ??????reason:?String,??
  4. ??????exception:?Option[Throwable]):?Unit?=?{??
  5. ??????
  6. ????//?根據taskSet的stageId獲取到對應的Stage,循環調用abortStage,終止該Stage??
  7. ????stageIdToStage.get(taskSet.stageId).foreach?{?abortStage(_,?reason,?exception)?}??
  8. ??????
  9. ????//?提交等待的Stages??
  10. ????submitWaitingStages()??
  11. ??}??

? ? ? ? 很簡單,首先通過taskSet的stageId獲取到對應的Stage,針對Stage,循環調用abortStage()方法,終止該Stage,然后調用submitWaitingStages()方法提交等待的Stages。我們先看下abortStage()方法,代碼如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ???*?Aborts?all?jobs?depending?on?a?particular?Stage.?This?is?called?in?response?to?a?task?set?
  3. ???*?being?canceled?by?the?TaskScheduler.?Use?taskSetFailed()?to?inject?this?event?from?outside.?
  4. ???*?終止給定Stage上的所有Job。?
  5. ???*/??
  6. ??private[scheduler]?def?abortStage(??
  7. ??????failedStage:?Stage,??
  8. ??????reason:?String,??
  9. ??????exception:?Option[Throwable]):?Unit?=?{??
  10. ??????
  11. ????//?如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回??
  12. ????if?(!stageIdToStage.contains(failedStage.id))?{??
  13. ??????//?Skip?all?the?actions?if?the?stage?has?been?removed.??
  14. ??????return??
  15. ????}??
  16. ??????
  17. ????//?遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs??
  18. ????val?dependentJobs:?Seq[ActiveJob]?=??
  19. ??????activeJobs.filter(job?=>?stageDependsOn(job.finalStage,?failedStage)).toSeq??
  20. ??????
  21. ????//?標記failedStage的完成時間completionTime??
  22. ????failedStage.latestInfo.completionTime?=?Some(clock.getTimeMillis())??
  23. ??????
  24. ????//?遍歷dependentJobs,調用failJobAndIndependentStages()??
  25. ????for?(job?<-?dependentJobs)?{??
  26. ??????failJobAndIndependentStages(job,?s"Job?aborted?due?to?stage?failure:?$reason",?exception)??
  27. ????}??
  28. ????if?(dependentJobs.isEmpty)?{??
  29. ??????logInfo("Ignoring?failure?of?"?+?failedStage?+?"?because?all?jobs?depending?on?it?are?done")??
  30. ????}??
  31. ??}??

? ? ? ? 這個方法的處理邏輯主要分為四步:

?

? ? ? ? 1、如果stageIdToStage中不存在對應的stage,說明stage已經被移除,直接返回,這是對異常情況下的一種特殊處理;

? ? ? ? 2、遍歷activeJobs中的ActiveJob,逐個調用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs;

? ? ? ? 3、標記failedStage的完成時間completionTime;

? ? ? ? 4、遍歷dependentJobs,調用failJobAndIndependentStages()。

? ? ? ? 其它都好說,我們主要看下stageDependsOn()和failJobAndIndependentStages()這兩個方法。首先看下stageDependsOn()方法,代碼如下:

?

[java]?view plain?copy ?
  1. /**?Return?true?if?one?of?stage's?ancestors?is?target.?*/??
  2. ??//?如果參數stage的祖先是target,返回true??
  3. ??private?def?stageDependsOn(stage:?Stage,?target:?Stage):?Boolean?=?{??
  4. ??????
  5. ????//?如果stage即為target,返回true??
  6. ????if?(stage?==?target)?{??
  7. ??????return?true??
  8. ????}??
  9. ??????
  10. ????//?存儲處理過的RDD??
  11. ????val?visitedRdds?=?new?HashSet[RDD[_]]??
  12. ??????
  13. ????//?We?are?manually?maintaining?a?stack?here?to?prevent?StackOverflowError??
  14. ????//?caused?by?recursively?visiting??
  15. ????//?存儲待處理的RDD??
  16. ????val?waitingForVisit?=?new?Stack[RDD[_]]??
  17. ??????
  18. ????//?定義一個visit()方法??
  19. ????def?visit(rdd:?RDD[_])?{??
  20. ??????//?如果該RDD未被處理過的話,繼續處理??
  21. ??????if?(!visitedRdds(rdd))?{??
  22. ????????//?將RDD添加到visitedRdds中??
  23. ????????visitedRdds?+=?rdd??
  24. ??????????
  25. ????????//?遍歷RDD的依賴??
  26. ????????for?(dep?<-?rdd.dependencies)?{??
  27. ??????????dep?match?{??
  28. ????????????//?如果是ShuffleDependency??
  29. ????????????case?shufDep:?ShuffleDependency[_,?_,?_]?=>??
  30. ??????????????
  31. ??????????????//?獲得mapStage,并且如果stage的isAvailable為false的話,將其壓入waitingForVisit??
  32. ??????????????val?mapStage?=?getShuffleMapStage(shufDep,?stage.firstJobId)??
  33. ??????????????if?(!mapStage.isAvailable)?{??
  34. ????????????????waitingForVisit.push(mapStage.rdd)??
  35. ??????????????}??//?Otherwise?there's?no?need?to?follow?the?dependency?back??
  36. ????????????//?如果是NarrowDependency,直接將其壓入waitingForVisit??
  37. ????????????case?narrowDep:?NarrowDependency[_]?=>??
  38. ??????????????waitingForVisit.push(narrowDep.rdd)??
  39. ??????????}??
  40. ????????}??
  41. ??????}??
  42. ????}??
  43. ??????
  44. ????//?從stage的rdd開始處理,將其入棧waitingForVisit??
  45. ????waitingForVisit.push(stage.rdd)??
  46. ??????
  47. ????//?當waitingForVisit中存在數據,就調用visit()方法進行處理??
  48. ????while?(waitingForVisit.nonEmpty)?{??
  49. ??????visit(waitingForVisit.pop())??
  50. ????}??
  51. ??????
  52. ????//?根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target??
  53. ????visitedRdds.contains(target.rdd)??
  54. ??}??

? ? ? ? 這個方法主要是判斷參數stage是否為參數target的祖先stage,其代碼風格與stage劃分和提交中的部分代碼一樣,這在前面的兩篇文章中也提到過,在此不再贅述。而它主要是通過stage的rdd,并遍歷其上層依賴的rdd鏈,將每個stage的rdd加入到visitedRdds中,最后根據visitedRdds中是否存在target的rdd判斷參數stage的祖先是否為target。值得一提的是,如果RDD的依賴是NarrowDependency,直接將其壓入waitingForVisit,如果為ShuffleDependency,則需要判斷stage的isAvailable,如果為false,則將對應RDD壓入waitingForVisit。關于isAvailable,我在《Spark源碼分析之四:Stage提交》一文中具體闡述過,這里不再贅述。

? ? ? ? 接下來,我們再看下failJobAndIndependentStages()方法,這個方法的主要作用就是使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態。代碼如下:

?

[java]?view plain?copy ?
  1. /**?Fails?a?job?and?all?stages?that?are?only?used?by?that?job,?and?cleans?up?relevant?state.?*/??
  2. ??//?使得一個Job和僅被該Job使用的所有stages失敗,并清空有關狀態??
  3. ??private?def?failJobAndIndependentStages(??
  4. ??????job:?ActiveJob,??
  5. ??????failureReason:?String,??
  6. ??????exception:?Option[Throwable]?=?None):?Unit?=?{??
  7. ??????
  8. ????//?構造一個異常,內容為failureReason??
  9. ????val?error?=?new?SparkException(failureReason,?exception.getOrElse(null))??
  10. ??????
  11. ????//?標志位,是否能取消Stages??
  12. ????var?ableToCancelStages?=?true??
  13. ??
  14. ????//?標志位,是否應該中斷線程??
  15. ????val?shouldInterruptThread?=??
  16. ??????if?(job.properties?==?null)?false??
  17. ??????else?job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false").toBoolean??
  18. ??
  19. ????//?Cancel?all?independent,?running?stages.??
  20. ????//?取消所有獨立的,正在運行的stages??
  21. ??????
  22. ????//?根據Job的jobId,獲取其stages??
  23. ????val?stages?=?jobIdToStageIds(job.jobId)??
  24. ??????
  25. ????//?如果stages為空,記錄錯誤日志??
  26. ????if?(stages.isEmpty)?{??
  27. ??????logError("No?stages?registered?for?job?"?+?job.jobId)??
  28. ????}??
  29. ??????
  30. ????//?遍歷stages,循環處理??
  31. ????stages.foreach?{?stageId?=>??
  32. ????????
  33. ??????//?根據stageId,獲取jobsForStage,即每個Job所包含的Stage信息??
  34. ??????val?jobsForStage:?Option[HashSet[Int]]?=?stageIdToStage.get(stageId).map(_.jobIds)??
  35. ????????
  36. ??????//?首先處理異常情況,即jobsForStage為空,或者jobsForStage中不包含當前Job??
  37. ??????if?(jobsForStage.isEmpty?||?!jobsForStage.get.contains(job.jobId))?{??
  38. ????????logError(??
  39. ??????????"Job?%d?not?registered?for?stage?%d?even?though?that?stage?was?registered?for?the?job"??
  40. ????????????.format(job.jobId,?stageId))??
  41. ??????}?else?if?(jobsForStage.get.size?==?1)?{??
  42. ????????//?如果stageId對應的stage不存在??
  43. ????????if?(!stageIdToStage.contains(stageId))?{??
  44. ??????????logError(s"Missing?Stage?for?stage?with?id?$stageId")??
  45. ????????}?else?{??
  46. ??????????//?This?is?the?only?job?that?uses?this?stage,?so?fail?the?stage?if?it?is?running.??
  47. ??????????//???
  48. ??????????val?stage?=?stageIdToStage(stageId)??
  49. ??????????if?(runningStages.contains(stage))?{??
  50. ????????????try?{?//?cancelTasks?will?fail?if?a?SchedulerBackend?does?not?implement?killTask??
  51. ????????????????
  52. ??????????????//?調用taskScheduler的cancelTasks()方法,取消stage內的tasks??
  53. ??????????????taskScheduler.cancelTasks(stageId,?shouldInterruptThread)??
  54. ????????????????
  55. ??????????????//?標記Stage為完成??
  56. ??????????????markStageAsFinished(stage,?Some(failureReason))??
  57. ????????????}?catch?{??
  58. ??????????????case?e:?UnsupportedOperationException?=>??
  59. ????????????????logInfo(s"Could?not?cancel?tasks?for?stage?$stageId",?e)??
  60. ??????????????ableToCancelStages?=?false??
  61. ????????????}??
  62. ??????????}??
  63. ????????}??
  64. ??????}??
  65. ????}??
  66. ??
  67. ????if?(ableToCancelStages)?{//?如果能取消Stages??
  68. ??????
  69. ??????//?調用job監聽器的jobFailed()方法??
  70. ??????job.listener.jobFailed(error)??
  71. ????????
  72. ??????//?為Job和獨立Stages清空狀態,獨立Stages的意思為該stage僅為該Job使用??
  73. ??????cleanupStateForJobAndIndependentStages(job)??
  74. ????????
  75. ??????//?發送一個SparkListenerJobEnd事件??
  76. ??????listenerBus.post(SparkListenerJobEnd(job.jobId,?clock.getTimeMillis(),?JobFailed(error)))??
  77. ????}??
  78. ??}??

? ? ? ? 處理過程還是很簡單的,讀者可以通過上述源碼和注釋自行補腦,這里就先略過了。

?

? ? ? ? 下面,再說下正常情況下,即序列化后Task大小未超過上限時,LaunchTask事件的發送及executor端的響應。代碼再跳轉到CoarseGrainedSchedulerBackend的內部類DriverEndpoint中的launchTasks()方法。正常情況下處理流程主要分為三大部分:

? ? ? ? 1、從executorDataMap中,根據task.executorId獲取executor描述信息executorData;

? ? ? ? 2、在executorData中,freeCores做相應減少;

? ? ? ? 3、利用executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件,LaunchTask事件中包含序列化后的task,將Task傳遞到executor中去執行。

? ? ? ? 我們重點看下第3步,利用Driver端持有的executor描述信息executorData中的executorEndpoint,即Driver端executor通訊端點的引用,發送LaunchTask事件給executor,將Task傳遞到executor中去執行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。

? ? ? ? 我們先說下這個CoarseGrainedExecutorBackend,類的定義如下所示:

?

[java]?view plain?copy ?
  1. private[spark]?class?CoarseGrainedExecutorBackend(??
  2. ????override?val?rpcEnv:?RpcEnv,??
  3. ????driverUrl:?String,??
  4. ????executorId:?String,??
  5. ????hostPort:?String,??
  6. ????cores:?Int,??
  7. ????userClassPath:?Seq[URL],??
  8. ????env:?SparkEnv)??
  9. ??extends?ThreadSafeRpcEndpoint?with?ExecutorBackend?with?Logging?{??

? ? ? ? 由上面的代碼我們可以知道,它實現了ThreadSafeRpcEndpoint和ExecutorBackend兩個trait,而ExecutorBackend的定義如下:

?

?

[java]?view plain?copy ?
  1. /**?
  2. ?*?A?pluggable?interface?used?by?the?Executor?to?send?updates?to?the?cluster?scheduler.?
  3. ?*?一個被Executor用來發送更新到集群調度器的可插拔接口。?
  4. ?*/??
  5. private[spark]?trait?ExecutorBackend?{??
  6. ????
  7. ??//?唯一的一個statusUpdate()方法??
  8. ??//?需要Long類型的taskId、TaskState類型的state、ByteBuffer類型的data三個參數??
  9. ??def?statusUpdate(taskId:?Long,?state:?TaskState,?data:?ByteBuffer)??
  10. }??

?

? ? ? ? 那么它自然就有兩種主要的任務,第一,作為endpoint提供driver與executor間的通訊功能;第二,提供了executor任務執行時狀態匯報的功能。

? ? ? ??CoarseGrainedExecutorBackend到底是什么呢?這里我們先不深究,留到以后分析,你只要知道它是Executor的一個后臺輔助進程,和Executor是一對一的關系,向Executor提供了與Driver通訊、任務執行時狀態匯報兩個基本功能即可。

? ? ? ? 接下來,我們看下CoarseGrainedExecutorBackend是如何處理LaunchTask事件的。做為RpcEndpoint,在其處理各類事件或消息的receive()方法中,定義如下:

?

[java]?view plain?copy ?
  1. case?LaunchTask(data)?=>??
  2. ??????if?(executor?==?null)?{??
  3. ????????logError("Received?LaunchTask?command?but?executor?was?null")??
  4. ????????System.exit(1)??
  5. ??????}?else?{??
  6. ????????
  7. ????????//?反序列話task,得到taskDesc??
  8. ????????val?taskDesc?=?ser.deserialize[TaskDescription](data.value)??
  9. ????????logInfo("Got?assigned?task?"?+?taskDesc.taskId)??
  10. ??????????
  11. ????????//?調用executor的launchTask()方法加載task??
  12. ????????executor.launchTask(this,?taskId?=?taskDesc.taskId,?attemptNumber?=?taskDesc.attemptNumber,??
  13. ??????????taskDesc.name,?taskDesc.serializedTask)??
  14. ??????}??

? ? ? ? 首先,會判斷對應的executor是否為空,為空的話,記錄錯誤日志并退出,不為空的話,則按照如下流程處理:

?

? ? ? ? 1、反序列話task,得到taskDesc;

? ? ? ? 2、調用executor的launchTask()方法加載task。

? ? ? ? 那么,重點就落在了Executor的launchTask()方法中,代碼如下:

?

[java]?view plain?copy ?
  1. def?launchTask(??
  2. ??????context:?ExecutorBackend,??
  3. ??????taskId:?Long,??
  4. ??????attemptNumber:?Int,??
  5. ??????taskName:?String,??
  6. ??????serializedTask:?ByteBuffer):?Unit?=?{??
  7. ????????
  8. ????//?新建一個TaskRunner??
  9. ????val?tr?=?new?TaskRunner(context,?taskId?=?taskId,?attemptNumber?=?attemptNumber,?taskName,??
  10. ??????serializedTask)??
  11. ????????
  12. ????//?將taskId與TaskRunner的對應關系存入runningTasks??
  13. ????runningTasks.put(taskId,?tr)??
  14. ??????
  15. ????//?線程池執行TaskRunner??
  16. ????threadPool.execute(tr)??
  17. ??}??

? ? ? ? 非常簡單,創建一個TaskRunner對象,然后將taskId與TaskRunner的對應關系存入runningTasks,將TaskRunner扔到線程池中去執行即可。

?

? ? ? ? 我們先看下這個TaskRunner類。我們先看下Class及其成員變量的定義,如下:

?

[java]?view plain?copy ?
  1. class?TaskRunner(??
  2. ??????execBackend:?ExecutorBackend,??
  3. ??????val?taskId:?Long,??
  4. ??????val?attemptNumber:?Int,??
  5. ??????taskName:?String,??
  6. ??????serializedTask:?ByteBuffer)??
  7. ????extends?Runnable?{??
  8. ??????
  9. ????//?TaskRunner繼承了Runnable??
  10. ??
  11. ????/**?Whether?this?task?has?been?killed.?*/??
  12. ????//?標志位,task是否被殺掉??
  13. ????@volatile?private?var?killed?=?false??
  14. ??
  15. ????/**?How?much?the?JVM?process?has?spent?in?GC?when?the?task?starts?to?run.?*/??
  16. ????@volatile?var?startGCTime:?Long?=?_??
  17. ??
  18. ????/**?
  19. ?????*?The?task?to?run.?This?will?be?set?in?run()?by?deserializing?the?task?binary?coming?
  20. ?????*?from?the?driver.?Once?it?is?set,?it?will?never?be?changed.?
  21. ?????*??
  22. ?????*?需要運行的task。它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變。?
  23. ?????*/??
  24. ????@volatile?var?task:?Task[Any]?=?_??
  25. }??

? ? ? ? 由類的定義我們可以看出,TaskRunner繼承了Runnable,所以它本質上是一個線程,故其可以被放到線程池中去運行。它所包含的成員變量,主要有以下幾個:

?

? ? ? ? 1、execBackend:Executor后臺輔助進程,提供了與Driver通訊、狀態匯報等兩大基本功能,實際上傳入的是CoarseGrainedExecutorBackend實例;

? ? ? ? 2、taskId:Task的唯一標識;

? ? ? ? 3、attemptNumber:Task運行的序列號,Spark與MapReduce一樣,可以為拖后腿任務啟動備份任務,即推測執行原理,如此,就需要通過taskId加attemptNumber來唯一標識一個Task運行實例;

? ? ? ? 4、serializedTask:ByteBuffer類型,序列化后的Task,包含的是Task的內容,通過發序列化它來得到Task,并運行其中的run()方法來執行Task;

? ? ? ? 5、killed:Task是否被殺死的標志位;

? ? ? ? 6、task:Task[Any]類型,需要運行的Task,它將在反序列化來自driver的task二進制數據時在run()方法被設置,一旦被設置,它將不會再發生改變;

? ? ? ?7、startGCTime:JVM在task開始運行后,進行垃圾回收的時間。

? ? ? ? 另外,既然是一個線程,TaskRunner必須得提供run()方法,該run()方法就是TaskRunner線程在線程池中被調度時,需要執行的方法,我們來看下它的定義:

?

[java]?view plain?copy ?
  1. override?def?run():?Unit?=?{??
  2. ??????
  3. ??????//?Step1:Task及其運行時需要的輔助對象構造??
  4. ????????
  5. ??????//?獲取任務內存管理器??
  6. ??????val?taskMemoryManager?=?new?TaskMemoryManager(env.memoryManager,?taskId)??
  7. ????????
  8. ??????//?反序列化開始時間??
  9. ??????val?deserializeStartTime?=?System.currentTimeMillis()??
  10. ????????
  11. ??????//?當前線程設置上下文類加載器??
  12. ??????Thread.currentThread.setContextClassLoader(replClassLoader)??
  13. ????????
  14. ??????//?從SparkEnv中獲取序列化器??
  15. ??????val?ser?=?env.closureSerializer.newInstance()??
  16. ??????logInfo(s"Running?$taskName?(TID?$taskId)")??
  17. ????????
  18. ??????//?execBackend更新狀態TaskState.RUNNING??
  19. ??????execBackend.statusUpdate(taskId,?TaskState.RUNNING,?EMPTY_BYTE_BUFFER)??
  20. ??????var?taskStart:?Long?=?0??
  21. ????????
  22. ??????//?計算垃圾回收的時間??
  23. ??????startGCTime?=?computeTotalGcTime()??
  24. ??
  25. ??????try?{??
  26. ????????//?調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes??
  27. ????????val?(taskFiles,?taskJars,?taskBytes)?=?Task.deserializeWithDependencies(serializedTask)??
  28. ????????updateDependencies(taskFiles,?taskJars)??
  29. ??????????
  30. ????????//?反序列化Task二進制數據taskBytes,得到task實例??
  31. ????????task?=?ser.deserialize[Task[Any]](taskBytes,?Thread.currentThread.getContextClassLoader)??
  32. ??????????
  33. ????????//?設置Task的任務內存管理器??
  34. ????????task.setTaskMemoryManager(taskMemoryManager)??
  35. ??
  36. ????????//?If?this?task?has?been?killed?before?we?deserialized?it,?let's?quit?now.?Otherwise,??
  37. ????????//?continue?executing?the?task.??
  38. ????????//?如果此時Task被kill,拋出異常,快速退出??
  39. ????????if?(killed)?{??
  40. ??????????//?Throw?an?exception?rather?than?returning,?because?returning?within?a?try{}?block??
  41. ??????????//?causes?a?NonLocalReturnControl?exception?to?be?thrown.?The?NonLocalReturnControl??
  42. ??????????//?exception?will?be?caught?by?the?catch?block,?leading?to?an?incorrect?ExceptionFailure??
  43. ??????????//?for?the?task.??
  44. ??????????throw?new?TaskKilledException??
  45. ????????}??
  46. ??
  47. ????????logDebug("Task?"?+?taskId?+?"'s?epoch?is?"?+?task.epoch)??
  48. ????????//?mapOutputTracker更新Epoch??
  49. ????????env.mapOutputTracker.updateEpoch(task.epoch)??
  50. ??
  51. ????????//?Run?the?actual?task?and?measure?its?runtime.??
  52. ????????//?運行真正的task,并度量它的運行時間??
  53. ??????????
  54. ????????//?Step2:Task運行??
  55. ??????????
  56. ????????//?task開始時間??
  57. ????????taskStart?=?System.currentTimeMillis()??
  58. ??????????
  59. ????????//?標志位threwException設置為true,標識Task真正執行過程中是否拋出異常??
  60. ????????var?threwException?=?true??
  61. ??????????
  62. ????????//?調用Task的run()方法,真正執行Task,并獲得運行結果value??
  63. ????????val?(value,?accumUpdates)?=?try?{??
  64. ??????????
  65. ??????????//?調用Task的run()方法,真正執行Task??
  66. ??????????val?res?=?task.run(??
  67. ????????????taskAttemptId?=?taskId,??
  68. ????????????attemptNumber?=?attemptNumber,??
  69. ????????????metricsSystem?=?env.metricsSystem)??
  70. ????????????
  71. ??????????//?標志位threwException設置為false??
  72. ??????????threwException?=?false??
  73. ????????????
  74. ??????????//?返回res,Task的run()方法中,res的定義為(T,?AccumulatorUpdates)??
  75. ??????????//?這里,前者為任務運行結果,后者為累加器更新??
  76. ??????????res??
  77. ????????}?finally?{??
  78. ????????????
  79. ??????????//?通過任務內存管理器清理所有的分配的內存??
  80. ??????????val?freedMemory?=?taskMemoryManager.cleanUpAllAllocatedMemory()??
  81. ??????????if?(freedMemory?>?0)?{??
  82. ????????????val?errMsg?=?s"Managed?memory?leak?detected;?size?=?$freedMemory?bytes,?TID?=?$taskId"??
  83. ????????????if?(conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak",?false)?&&?!threwException)?{??
  84. ??????????????throw?new?SparkException(errMsg)??
  85. ????????????}?else?{??
  86. ??????????????logError(errMsg)??
  87. ????????????}??
  88. ??????????}??
  89. ????????}??
  90. ??????????
  91. ????????//?task完成時間??
  92. ????????val?taskFinish?=?System.currentTimeMillis()??
  93. ??
  94. ????????//?If?the?task?has?been?killed,?let's?fail?it.??
  95. ????????//?如果task被殺死,拋出TaskKilledException異常??
  96. ????????if?(task.killed)?{??
  97. ??????????throw?new?TaskKilledException??
  98. ????????}??
  99. ??
  100. ????????//?Step3:Task運行結果處理??
  101. ??????????
  102. ????????//?通過Spark獲取Task運行結果序列化器??
  103. ????????val?resultSer?=?env.serializer.newInstance()??
  104. ??????????
  105. ????????//?結果序列化前的時間點??
  106. ????????val?beforeSerialization?=?System.currentTimeMillis()??
  107. ??????????
  108. ????????//?利用Task運行結果序列化器序列化Task運行結果,得到valueBytes??
  109. ????????val?valueBytes?=?resultSer.serialize(value)??
  110. ??????????
  111. ????????//?結果序列化后的時間點??
  112. ????????val?afterSerialization?=?System.currentTimeMillis()??
  113. ??
  114. ????????//?度量指標體系相關,暫不介紹??
  115. ????????for?(m?<-?task.metrics)?{??
  116. ??????????//?Deserialization?happens?in?two?parts:?first,?we?deserialize?a?Task?object,?which??
  117. ??????????//?includes?the?Partition.?Second,?Task.run()?deserializes?the?RDD?and?function?to?be?run.??
  118. ??????????m.setExecutorDeserializeTime(??
  119. ????????????(taskStart?-?deserializeStartTime)?+?task.executorDeserializeTime)??
  120. ??????????//?We?need?to?subtract?Task.run()'s?deserialization?time?to?avoid?double-counting??
  121. ??????????m.setExecutorRunTime((taskFinish?-?taskStart)?-?task.executorDeserializeTime)??
  122. ??????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
  123. ??????????m.setResultSerializationTime(afterSerialization?-?beforeSerialization)??
  124. ??????????m.updateAccumulators()??
  125. ????????}??
  126. ??
  127. ????????//?構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator?updates??
  128. ????????val?directResult?=?new?DirectTaskResult(valueBytes,?accumUpdates,?task.metrics.orNull)??
  129. ??????????
  130. ????????//?序列化DirectTaskResult,得到serializedDirectResult??
  131. ????????val?serializedDirectResult?=?ser.serialize(directResult)??
  132. ??????????
  133. ????????//?獲取Task運行結果大小??
  134. ????????val?resultSize?=?serializedDirectResult.limit??
  135. ??
  136. ????????//?directSend?=?sending?directly?back?to?the?driver??
  137. ????????//?directSend的意思就是直接發送結果至Driver端??
  138. ????????val?serializedResult:?ByteBuffer?=?{??
  139. ??????????
  140. ??????????//?如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult??
  141. ??????????//?IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用??
  142. ??????????if?(maxResultSize?>?0?&&?resultSize?>?maxResultSize)?{??
  143. ????????????logWarning(s"Finished?$taskName?(TID?$taskId).?Result?is?larger?than?maxResultSize?"?+??
  144. ??????????????s"(${Utils.bytesToString(resultSize)}?>?${Utils.bytesToString(maxResultSize)}),?"?+??
  145. ??????????????s"dropping?it.")??
  146. ????????????ser.serialize(new?IndirectTaskResult[Any](TaskResultBlockId(taskId),?resultSize))??
  147. ??????????}??
  148. ??????????//?如果?Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager??
  149. ??????????//?即運行結果無法通過消息傳遞??
  150. ??????????else?if?(resultSize?>=?akkaFrameSize?-?AkkaUtils.reservedSizeBytes)?{??
  151. ??????????????
  152. ????????????val?blockId?=?TaskResultBlockId(taskId)??
  153. ????????????env.blockManager.putBytes(??
  154. ??????????????blockId,?serializedDirectResult,?StorageLevel.MEMORY_AND_DISK_SER)??
  155. ????????????logInfo(??
  156. ??????????????s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?via?BlockManager)")??
  157. ????????????ser.serialize(new?IndirectTaskResult[Any](blockId,?resultSize))??
  158. ??????????}???
  159. ??????????//?Task運行結果比較小的話,直接返回,通過消息傳遞??
  160. ??????????else?{??
  161. ????????????logInfo(s"Finished?$taskName?(TID?$taskId).?$resultSize?bytes?result?sent?to?driver")??
  162. ????????????serializedDirectResult??
  163. ??????????}??
  164. ????????}??
  165. ??
  166. ????????//?execBackend更新狀態TaskState.FINISHED??
  167. ????????execBackend.statusUpdate(taskId,?TaskState.FINISHED,?serializedResult)??
  168. ??
  169. ??????}?catch?{//?處理各種異常信息??
  170. ??????????
  171. ????????case?ffe:?FetchFailedException?=>??
  172. ??????????val?reason?=?ffe.toTaskEndReason??
  173. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
  174. ??
  175. ????????case?_:?TaskKilledException?|?_:?InterruptedException?if?task.killed?=>??
  176. ??????????logInfo(s"Executor?killed?$taskName?(TID?$taskId)")??
  177. ??????????execBackend.statusUpdate(taskId,?TaskState.KILLED,?ser.serialize(TaskKilled))??
  178. ??
  179. ????????case?cDE:?CommitDeniedException?=>??
  180. ??????????val?reason?=?cDE.toTaskEndReason??
  181. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?ser.serialize(reason))??
  182. ??
  183. ????????case?t:?Throwable?=>??
  184. ??????????//?Attempt?to?exit?cleanly?by?informing?the?driver?of?our?failure.??
  185. ??????????//?If?anything?goes?wrong?(or?this?was?a?fatal?exception),?we?will?delegate?to??
  186. ??????????//?the?default?uncaught?exception?handler,?which?will?terminate?the?Executor.??
  187. ??????????logError(s"Exception?in?$taskName?(TID?$taskId)",?t)??
  188. ??
  189. ??????????val?metrics:?Option[TaskMetrics]?=?Option(task).flatMap?{?task?=>??
  190. ????????????task.metrics.map?{?m?=>??
  191. ??????????????m.setExecutorRunTime(System.currentTimeMillis()?-?taskStart)??
  192. ??????????????m.setJvmGCTime(computeTotalGcTime()?-?startGCTime)??
  193. ??????????????m.updateAccumulators()??
  194. ??????????????m??
  195. ????????????}??
  196. ??????????}??
  197. ??????????val?serializedTaskEndReason?=?{??
  198. ????????????try?{??
  199. ??????????????ser.serialize(new?ExceptionFailure(t,?metrics))??
  200. ????????????}?catch?{??
  201. ??????????????case?_:?NotSerializableException?=>??
  202. ????????????????//?t?is?not?serializable?so?just?send?the?stacktrace??
  203. ????????????????ser.serialize(new?ExceptionFailure(t,?metrics,?false))??
  204. ????????????}??
  205. ??????????}??
  206. ????????????
  207. ??????????//?execBackend更新狀態TaskState.FAILED??
  208. ??????????execBackend.statusUpdate(taskId,?TaskState.FAILED,?serializedTaskEndReason)??
  209. ??
  210. ??????????//?Don't?forcibly?exit?unless?the?exception?was?inherently?fatal,?to?avoid??
  211. ??????????//?stopping?other?tasks?unnecessarily.??
  212. ??????????if?(Utils.isFatalError(t))?{??
  213. ????????????SparkUncaughtExceptionHandler.uncaughtException(t)??
  214. ??????????}??
  215. ??
  216. ??????}?finally?{??
  217. ????????
  218. ????????//?最后,無論運行成功還是失敗,將task從runningTasks中移除??
  219. ????????runningTasks.remove(taskId)??
  220. ??????}??
  221. ????}??

? ? ? ? 如此長的一個方法,好長好大,哈哈!不過,縱觀全篇,無非三個Step就可搞定:

?

? ? ? ? 1、Step1:Task及其運行時需要的輔助對象構造;

? ? ? ? 2、Step2:Task運行;

? ? ? ? 3、Step3:Task運行結果處理。

? ? ? ? 對, 就這么簡單!鑒于時間與篇幅問題,我們這里先講下主要流程,細節方面的東西留待下節繼續。

? ? ? ? 下面,我們一個個Step來看,首先看下Step1:Task及其運行時需要的輔助對象構造,主要包括以下步驟:

? ? ? ? 1.1、構造TaskMemoryManager任務內存管理器,即taskMemoryManager;

? ? ? ??1.2、記錄反序列化開始時間;

? ? ? ??1.3、當前線程設置上下文類加載器;

? ? ? ??1.4、從SparkEnv中獲取序列化器ser;

? ? ? ??1.5、execBackend更新狀態TaskState.RUNNING;

? ? ? ??1.6、計算垃圾回收時間;

? ? ? ??1.7、調用Task的deserializeWithDependencies()方法,反序列化Task,得到Task運行需要的文件taskFiles、jar包taskFiles和Task二進制數據taskBytes;

? ? ? ? 1.8、反序列化Task二進制數據taskBytes,得到task實例;

? ? ? ??1.9、設置Task的任務內存管理器;

? ? ? ? 1.10、如果此時Task被kill,拋出異常,快速退出;

? ? ? ??

? ? ? ? 接下來,是Step2:Task運行,主要流程如下:

? ? ? ??2.1、獲取task開始時間;

? ? ? ??2.2、標志位threwException設置為true,標識Task真正執行過程中是否拋出異常;

? ? ? ??2.3、調用Task的run()方法,真正執行Task,并獲得運行結果value,和累加器更新accumUpdates;

? ? ? ??2.4、標志位threwException設置為false;

? ? ? ??2.5、通過任務內存管理器taskMemoryManager清理所有的分配的內存;

? ? ? ??2.6、獲取task完成時間;

? ? ? ? 2.7、如果task被殺死,拋出TaskKilledException異常。

? ? ? ??

? ? ? ? 最后一步,Step3:Task運行結果處理,大體流程如下:

? ? ? ? 3.1、通過SparkEnv獲取Task運行結果序列化器;

? ? ? ? 3.2、獲取結果序列化前的時間點;

? ? ? ? 3.3、利用Task運行結果序列化器序列化Task運行結果value,得到valueBytes;

? ? ? ??3.4、獲取結果序列化后的時間點;

? ? ? ??3.5、度量指標體系相關,暫不介紹;

? ? ? ??3.6、構造DirectTaskResult,同時包含Task運行結果valueBytes和累加器更新值accumulator updates;

? ? ? ? 3.7、序列化DirectTaskResult,得到serializedDirectResult;

? ? ? ? 3.8、獲取Task運行結果大小;

? ? ? ? 3.9、處理Task運行結果:

? ? ? ? ? ? ? ? ?3.9.1、如果Task運行結果大小大于所有Task運行結果的最大大小,序列化IndirectTaskResult,IndirectTaskResult為存儲在Worker上BlockManager中DirectTaskResult的一個引用;

? ? ? ? ? ? ? ? ?3.9.2、如果 Task運行結果大小超過Akka除去需要保留的字節外最大大小,則將結果寫入BlockManager,Task運行結果比較小的話,直接返回,通過消息傳遞;

? ? ? ? ? ? ? ? ?3.9.3、Task運行結果比較小的話,直接返回,通過消息傳遞

? ? ? ??3.10、execBackend更新狀態TaskState.FINISHED;

? ? ? ??最后,無論運行成功還是失敗,將task從runningTasks中移除。

? ? ? ? 至此,Task的運行主體流程已經介紹完畢,剩余的部分細節,包括Task內run()方法的具體執行,還有任務內存管理器、序列化器、累加更新,還有部分異常情況處理,狀態匯報等等其他更為詳細的內容留到下篇再講吧!

? ? ? ? 明天還要工作,洗洗睡了!

?

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50726216

轉載于:https://www.cnblogs.com/jirimutu01/p/5274461.html

總結

以上是生活随笔為你收集整理的Spark源码分析之七:Task运行(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。