日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark技术内幕: Task向Executor提交的源代码解析

發布時間:2023/12/9 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark技术内幕: Task向Executor提交的源代码解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在上文《Spark技術內幕:Stage劃分及提交源代碼分析》中,我們分析了Stage的生成和提交。可是Stage的提交,僅僅是DAGScheduler完畢了對DAG的劃分,生成了一個計算拓撲,即須要依照順序計算的Stage,Stage中包括了能夠以partition為單位并行計算的Task。我們并沒有分析Stage中得Task是怎樣生成而且終于提交到Executor中去的。

這就是本文的主題。

從org.apache.spark.scheduler.DAGScheduler#submitMissingTasks開始,分析Stage是怎樣生成TaskSet的。

假設一個Stage的全部的parent stage都已經計算完畢或者存在于cache中。那么他會調用submitMissingTasks來提交該Stage所包括的Tasks。

org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的計算流程例如以下:

  • 首先得到RDD中須要計算的partition,對于Shuffle類型的stage。須要推斷stage中是否緩存了該結果;對于Result類型的Final Stage。則推斷計算Job中該partition是否已經計算完畢。
  • 序列化task的binary。Executor能夠通過廣播變量得到它。每一個task執行的時候首先會反序列化。這樣在不同的executor上執行的task是隔離的,不會相互影響。
  • 為每一個須要計算的partition生成一個task:對于Shuffle類型依賴的Stage,生成ShuffleMapTask類型的task;對于Result類型的Stage,生成一個ResultTask類型的task
  • 確保Task是能夠被序列化的。由于不同的cluster有不同的taskScheduler,在這里推斷能夠簡化邏輯。保證TaskSet的task都是能夠序列化的
  • 通過TaskScheduler提交TaskSet。

  • TaskSet就是能夠做pipeline的一組全然同樣的task,每一個task的處理邏輯全然同樣。不同的是處理數據。每一個task負責處理一個partition。

    pipeline。能夠稱為大數據處理的基石。僅僅有數據進行pipeline處理,才干將其放到集群中去執行。

    對于一個task來說,它從數據源獲得邏輯。然后依照拓撲順序,順序執行(實際上是調用rdd的compute)。

    TaskSet是一個數據結構,存儲了這一組task:private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val attempt: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + attemptoverride def toString: String = "TaskSet " + id }

    管理調度這個TaskSet的時org.apache.spark.scheduler.TaskSetManager。TaskSetManager會負責task的失敗重試。跟蹤每一個task的執行狀態。處理locality-aware的調用。具體的調用堆棧例如以下:
  • org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  • org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
  • org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
  • org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
  • org.apache.spark.executor.Executor#launchTask

  • 首先看一下org.apache.spark.executor.Executor#launchTask: def launchTask(context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {val tr = new TaskRunner(context, taskId, taskName, serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr) // 開始在executor中執行}

    TaskRunner會從序列化的task中反序列化得到task。這個須要看?org.apache.spark.executor.Executor.TaskRunner#run 的實現:task.run(taskId.toInt)。而task.run的實現是: final def run(attemptId: Long): T = {context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)context.taskMetrics.hostname = Utils.localHostName()taskThread = Thread.currentThread()if (_killed) {kill(interruptThread = false)}runTask(context)}
    對于原來提到的兩種Task,即
  • ?org.apache.spark.scheduler.ShuffleMapTask
  • ?org.apache.spark.scheduler.ResultTask
  • 分別實現了不同的runTask:org.apache.spark.scheduler.ResultTask#runTask即順序調用rdd的compute,通過rdd的拓撲順序依次對partition進行計算:
    override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)metrics = Some(context.taskMetrics)try {func(context, rdd.iterator(partition, context))} finally {context.markTaskCompleted()}}

    而org.apache.spark.scheduler.ShuffleMapTask#runTask則是寫shuffle的結果。
    override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)//此處的taskBinary即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的metrics = Some(context.taskMetrics)var writer: ShuffleWriter[Any, Any] = nulltry {val manager = SparkEnv.get.shuffleManagerwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 將rdd計算的結果寫入memory或者diskreturn writer.stop(success = true).get} catch {case e: Exception =>if (writer != null) {writer.stop(success = false)}throw e} finally {context.markTaskCompleted()}}

    這兩個task都不要依照拓撲順序調用rdd的compute來完畢對partition的計算。不同的是ShuffleMapTask須要shuffle write。以供child stage讀取shuffle的結果。

    對于這兩個task都用到的taskBinary,即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的。


    通過上述幾篇博文,實際上我們已經粗略的分析了從用戶定義SparkContext開始。集群是假設為每一個Application分配Executor的,回想一下這個序列圖:
    還有就是用戶觸發某個action,集群是怎樣生成DAG,假設將DAG劃分為能夠成Stage,已經Stage是怎樣將這些能夠pipeline執行的task提交到Executor去執行的。當然了,具體細節還是很值得推敲的。

    以后的每一個周末。都會奉上某個細節的實現。

    歇息了。明天又會開始忙碌的一周。


    轉載于:https://www.cnblogs.com/llguanli/p/8601055.html

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的Spark技术内幕: Task向Executor提交的源代码解析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。