SparkContext源码分析
SparkContext源碼分析
粗略的說明一下SparkContext源碼! createTaskScheduler()針對不同的提交模式,執行不同的方法(local,standalone、yanr等)standalone模式===》》創建一個TaskSchedulerImpl
1、???????底層通過操作SchedulerBackend,針對不同種類的cluster(standalone、yarn。mesoso(亞馬遜))調度task
2、???????他也可以通過一個LoaclBackend,并且將isLocal設置為true,來在本地模式下工作
3、???????他負責處理一下通用的邏輯,比如說決定多個job的調度順序(FIFO),啟動推測任務執行
4、???????客戶端首先應該調用它的initialize()方法和start()方法,然后通過runTasks()方法提交tasksets
創建SparkDeploySchedulerBackend()
initializer方法中創建一個Pool調度池,FIFO、FAIR
taskScher。start()方法=====》調用了一下SparkDeploySchedulerBackend的start方法
此時:val AppDesc = newApplicationDescription(sc.appName、maxCores,sc.executorMemory,command,appUIaddress)
創建一個ApplicationDescription,非常重要!它代表了當前執行的Application的一下情況,包括Application最大需要多少CPU core? 每個slave上需要多大內存。
創建APPclient(Application與spark之間通信)
一個借口。
它負責接收一個spark master的url,以及一個ApplicationDescription,和一個集群事件的監聽器,以及各種事件發生時,監聽器的回調函數!
start()方法,創建一個clientActor
調用registerWithMaster()里面調用tryRegisterAllMasters(),里面去連接所有的master。
DAGScheduler:實現了面向stage的調度機制的高層次的調度層,他會為每一個job計算一個stage的DAG(有向無環圖),追蹤RDD和stage的輸出是否被物化(寫入磁盤或者內存等地方),并且尋找一個最少消耗(最優、最小)調度機制來運行job,他會將stage作為tasksets提交到底層的TaskScheduler上,來在集群上運行他們(task)。
除了處理stage的DAG,還負責決定運行每個task的最佳位置,基于當前的緩存狀態,將這些最佳位置提交給底層的TaskSchedulerImpl,此外,他會處理由于shuffle輸出文件丟失導致的失敗,在這種情況下,舊的stage可能會被重新提交,一個stage內部的失敗,如果不是由于shuffle文件丟失導致的,會被TaskScheduler處理,他會多次重復每一個task,知道最后實在不行,才會去取消整個stage。
SparkUI:jetty工具類。
總結
以上是生活随笔為你收集整理的SparkContext源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark之Master主备切换机制原理
- 下一篇: spark submit参数及调优