spark内核揭秘-09-RDD的count操作 触发Job全生命周期-01
RDD源碼的count方法:
從上面代碼可以看出來,count方法觸發(fā)SparkContext的runJob方法的調(diào)用:
進(jìn)入?runJob(rdd, func, 0 until rdd.partitions.size, false)方法:
進(jìn)一步跟蹤runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)方法:
繼續(xù)跟蹤進(jìn)入runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)方法:
代碼分析:
1、getCallSite :
2、clean(func):
3、dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,?resultHandler, localProperties.get):
代碼分析:
3.1、進(jìn)入submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties):
上面代碼分析:
3.1.1、 進(jìn)入new JobWaiter(this, jobId, partitions.size, resultHandler)方法
3.1.2、進(jìn)入eventProcessActor ! JobSubmitted(?jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)方法
我們可以看出來,是給自己發(fā)消息的
3.1.3、進(jìn)入? dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties)方法
首先構(gòu)建finalStage,然后又一個getMissingParentsStages方法,可以發(fā)現(xiàn)運行有本地運行和集群運行兩種模式,本地運行主要用于本地實驗和調(diào)試:
3.1.3.1、進(jìn)入??finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)方法:
3.1.3.2、進(jìn)入?runLocally(job)方法:
3.1.3.2.1、?runLocallyWithinThread(job)方法:
3.1.3.3、進(jìn)入 submitStage(finalStage)方法:
上面代碼分析:submitStage第一次傳入的參數(shù)是Job的最后一個Stage,然后判斷一下是否缺失父Stage,如果沒有依賴的parent Stage的話就可以submitMissingTasks運行,如果有parent Stage的話就要再一次submitStage做遞歸操作,最終會導(dǎo)致submitMissingTasks的調(diào)用:
3.1.3.3.1、進(jìn)入??activeJobForStage(stage)?方法:
3.1.3.3.2、進(jìn)入??getMissingParentStages(stage).sortBy(_.id)?方法:
跟進(jìn)getShuffleMapState方法:
進(jìn)入registerShuffleDependencies方法:
3.1.3.3.3、進(jìn)入submitMissingTasks(stage, jobId.get)?方法:
PS:分析代碼太多,下篇繼續(xù)分析源碼
版權(quán)聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉(zhuǎn)載。
轉(zhuǎn)載于:https://www.cnblogs.com/stark-summer/p/4829813.html
總結(jié)
以上是生活随笔為你收集整理的spark内核揭秘-09-RDD的count操作 触发Job全生命周期-01的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: holer实现外网访问内网数据库
- 下一篇: weblogic jprofile配置