日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Stage划分和Task最佳位置

發布時間:2024/7/5 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Stage划分和Task最佳位置 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

?

1、Job Stage劃分

2、Task最佳位置

3、總結

3.1 Stage劃分總結:

3.2 Task最佳位置總結:


1、Job Stage劃分

Spark Application中因為不同的Action觸發眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是由是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。而Stage劃分的依據就是寬依賴。下面以RDD的collect方法為例:

(1)他是一個action會觸發一個具體的作業runJob

(2)runJob有很多重載方法,不斷地往里調用,最后交給dagScheduler的runJob,在dagScheduler的runJob交給了submitJob,后面還有一個等待作業結果看成功還是失敗,會有相應的動作。

(3)在submitJob中首先看一下分區長度,是因為要進行計算,這個肯定是RDD導致的action他要校驗一下是不是在運行的時候相應的Partition存在。

eventProcessLoop調用post的時候有個Jobsubmitted的參數,他是一個case class,因為一個application中可能有很多的Job,不同的job的Jobsubmitted實例不一樣所以不能用case object。他里面封裝了job的id,最后一個RDD,具體對RDD操作的函數,有哪些Partition要被計算,監聽作業狀態等。

他的核心就是將Jobsubmitted交給eventProcessLoop。他是通過post方法post給eventProcessLoop,這個post其實就是發往EventLoop里面的eventQueue

(4)發現在EventLoop里面開辟了一個線程,他是setDaemon方式作為后臺線程,因為要在后臺做不斷的循環(如果是前臺線程的話對垃圾回收是有影響的),在run方法里面會不斷的循環我們的消息隊列,從eventQueue(是一個LinkedBlockingDeque,我們可以往他里面信息)中獲得消息,調用了onReceive,發現在里面沒有具體的實現所以在DAGSchedulerEventProcessLoop中對onReceive進行了實現,這里就收到了DAGSchedulerEvent,這里面再調用doOnReceive。doOnReceive收到信息就開始處理

(5)接下來就是HandleJobSubmited。這個時候Stage就開始了。我們知道最后一個Stage一定是ResultStage,前面所有的Stage都是ShuffleMapStage。

(6)發現有個getOrCreateParentStages的方法,開始創建ResultStage的父stage,里面有多個嵌套獲取shuffle依賴和循環創建shuffleMapStage,若沒有shuffle,操作則返回空list

進入到創建父Stage的方法getOrCreateParentStages,這里僅僅是抽取當前RDD的shuffle依賴,shuffleMapStage,如果不是shuffleDependency就繼續抽取父RDD,迭代遍歷一直到抽取出為止或者沒有

進入getOrCreateShuffleMapStage方法中,進行匹配能不能取到ParentStage的值,當沒有parentStage的時候會返回空,能取到就返回stage,ShuffleMapStage是根據遍歷出的ShuffleDependencies一次次創建出來的

進入createShuffleMapStage方法 此方法是遞歸循環創建shuffleMapStage的過程

這個時候ShuffleMapStage已經創建完成了,并不是一次就創建完成,而是遇見shuffle的時候會由下往上遞歸創建ShuffleMapStage

(7)構建完所有的ShuffleMapStage后,將其作為參數創建ResultStage

(8)最后將Stage和id關聯,更新job所有的Stage,并將Stage返回出去。

(9)回到handleJobsubmited方法中,finalStage構建完之后,新建一個ActiveJob保存了當前job的一些信息,打印一堆日志之類。getMissingParentStages(finalStage)根據finalStage,剛才找父Stage的時候如果有的話直接返回,如果沒有的話就會創建,所以如果曾經有就不需要再去做。listenerBus.post監聽事件,最后submitStage(finalStage)。

首先獲得id,如果jobId是defined的話再次getMissingParentStages(stage)獲得missing的stage之后判斷一下是否為空,如果為空的話就submitMissingTasks(stage, jobId.get)個就是沒有前置性的Tasks,也就是沒有父Stage。在這個底層其實是DAGScheduler把這個處理的過程交給具體的TaskScheduler去處理

2、Task最佳位置

(1)在handleJobsubmited方法中最后是最后調用submitStage,在他里面會調用submitMissingTasks

(2)這里面有很多代碼,我們要關心Stage本身的算法以及Task任務本地性把當前的Stage加進去,然后對Stage進行判斷,一種是ShuffleMapStage,一種是ResultStage。繼續往下走會看到taskIdToLocations這是關鍵的代碼,taskIdToLocations是一個Map

partitionsToCompute這里面獲得是具體的要計算的PartitionID,我們我們這邊看到的map里面的id是Partition的id。這里面匿名函數,產生的是tuple根據Partition的id。后面toMap就是Partition的id和TaskLocation的位置。

(3)進入到getPreferredLocs(stage.rdd, id),進來的是RDD,PartitionID返回的是一個集合。

再進入getPreferredLocsInternal

visited: HashSet[(RDD[_], Int)]這個HashSet開始是空,所以直接傳進來一個new HashSet,然后判斷visited如果已經有的話,那么添加就不成功,那么就是已經計算了數據本地性了,就返回Nil。

下面的cached就是已經在DAGScheduler的內存數據結構中了。進入getCacheLocs,這邊返回的是序列,cacheLocs是一個HashMap,這包含了每個RDD的Partition的id以及id對應的taskLocation,這個包含了Stage本身也包含了Stage內部任務的本地性

(4)回到getPreferredLocsInternal中,上面是看一下DAGScheduler中有沒有緩存根據Partition而保存的數據本地性的內容,如果不為空的話就把內容返回。然后調用下面的getpreferdLocations(如果自定義一個RDD的話是一定要寫這個方法的)

(5)最后判斷一下如果是窄依賴的話就自己調用自己

3、總結

3.1 Stage劃分總結:

(1)Action觸發Job,開始逆向分析job執行過程Action中利用SparkContext runJob路由到dagScheduler.runJob(rdd,func,分區數,其他),提交Job作業;

(2)DAGScheduler的runJob中調用submitJob并返回監聽waiter,生命周期內監聽Job狀態;

(3)在submitJob內部,將該獲取到的Job(已有JobId),插入到名為eventProcessLoop的LinkedBlockingDeque結構的事件處理隊列中;

(4)eventProcessLoop放入新事件后,調起底層的DAGSchedulerEventProcessLoop的onReceive方法;

(5)執行doOnReceive,根據DAGSchedulerEvent的具體類型如JobSubmitted事件或者MapStageSubmitted事件,調取具體的Submitted handle函數提交具體的Job;

(6)以JobSubmitted為例,在handleJobSubmitted內部,返回從ResultStage 建立stage 建立finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite),finalStage激活Job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties),同時開始逆向構建缺失的stage;

(7)DAG構建完畢,提交stage,submitStage(finalStage),submitStage中stage提交為tasks,submitMissingTasks(),submitMissingTasks,根據ShuffleMapStage還是ResultStage創建 ShuffleMapTask 或 ResultTask。

(7)taskScheduler.submitTasks()開始調起具體的task

3.2 Task最佳位置總結:

(1)在劃分Stage的時候submitMissingTasks方法中會有一個taskIdToLocations的屬性,他的結構為 Map[Int, Seq[TaskLocation]],他保存的就是PartitionID及其對應的最佳位置

(2)在對taskIdToLocations賦值的時候會調用getPreferredLocs方法,再路由到getPreferredLocsInternal返回最佳位置Seq[TaskLocation]

(3)在getPreferredLocsInternal方法中

①判斷rdd的partition是否被訪問過,如果被訪問過,則什么都不做

②然后判斷DAGScheduler的內存中是否cache了在當前Paritition的信息,如果有的話直接返回

③如果沒有cache,則調用rdd.getPreferredLocations方法,獲取RDD partition的最佳位置

④遍歷RDD的依賴,如果有窄依賴,遍歷父依賴的partition,對遍歷到的每個partition,遞歸調用getPreferredLocsInternal方法

即從第一個窄依賴的第一個partition開始,然后將每個partition的最佳位置,添加到序列中,最后返回所有partition的最佳位置序列

注意:DAGScheduler計算數據本地性的時候借助了RDD自身的getPreferredLocations中的數據,因為getPreferredLocations中表明了每個Partition的數據本地性,雖然當前Partition可能被persist或者checkpoint,但是persist或者checkpoint默認情況下肯定是和getPreferredLocations中的Partition的數據本地性是一致的,所以這就極大的簡化Task數據本地性算法的實現和效率的優化。

總結

以上是生活随笔為你收集整理的Stage划分和Task最佳位置的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。