远程连接spark_spark内部原理篇之计算引擎和调度管理
本篇主要內容包括spark 計算引擎與調度管理的實現方式,
- Spark 計算引擎原理
- Spark 調度管理原理
- Spark 存儲管理原理
- Spark 監控管理
一 :Spark 計算引擎原理
通過上面圖可以很清楚的看到從Job的action到中間調度在到最后的具體執行的過程,下面針對該圖做一個實例,來更加清楚的理解。
圖片
首先,我們啟動了spark-shell 來讀取本地的文件,然后做wordcount操作,然后統計出一共多少行。
那么通過這么一個簡單的job操作,來看一下spark ui 里面具體的DAGScheduler方式
圖片
從上圖我們可以看出flatmap 和 map 為一個stage0,在reducebykey的時候,又劃分了一個stage1 ,那么stage的劃分是根據shuffle或者說根據依賴關系來的,后面會更加詳細的說到。
接下來說一下shuffle,shuffle是什么呢?在第一節的時候,有提到shuffle整個概念,并且簡單的說到了寬依賴和窄依賴,或者我們叫做完全依賴和部分依賴。
shuffle的目的或者我們說shuffle的作用就是數據分類和數據聚合。通俗而言,就是講跨節點間的數據進行聚合和歸并操作,?Shuffle是分布式計算框架的核心數據交換方式,其實現方式直接決定了計算框架的性能和擴展性,shuffle操作是會導致數據計算的效率有所降低,那么如何講shuffle所帶來的損失降到最低呢?下面來一起了解一下spark中對于shuffle處理逐步改進的方案。
spark shuffle分為兩個階段,一個是write階段,一個是read階段
spark shuffle write階段
write階段分為兩種:Hash-based 和 Sort-based
Hash-based:這個是最初的spark版本時,使用的shuffle write 方式
Hash-based 實現結構圖(摘自網絡):
如上圖所示,每一個Task在計算完之后,會將結果集存儲到本地的一份文件中,那么在進行shuffle操作時,這種實現方式會有M*N條鏈接,如果我們的bucket數量比較多的話,那么這個是很耗費資源的。所以后來spark shuffle write 改為sort-based方式
sort-based 實現結構圖(摘自網絡)
如上圖所示,每一個task在計算完之后,會生成一個文件,每次的結果集會追加到該文件中,同時,會有一個索引文件記錄了該塊數據的位置,那么在進行write時,連接數的數量就大大減少了。
spark shuffle read階段
在進行shuffle操作的時候,spark內部隱式的創建了一個transformation操作,用于做shuffle操作
shuffle read階段,spark內部有一個單獨的類BlockStoreShuffleFetcher去獲取數據,之后獲取到mata信息,存入到Set中,如果數據是在本地那么直接通過BlockManager.getBlockData進行本地數據讀取,如果數據實在遠程Executor中,那么會通過NettyBlockTransferService.fetchBlocks去獲取。
二:Spark調度管理原理
Spark 調度管理系統是Spark程序得以運轉的核心,其中作業調度是調度管理模塊的樞紐,調度的前提是判斷多個作業任務的依賴關系(Stage),作業任務之間存在因果的依賴關系,也就是說,有些任務必須要先執行,然后相關依賴的任務才能執行,任務之間不能出現循環依賴,所以本質上就是DAG。
作業調度相關類型,以DAGScheduler,也就是基于DAG圖的調度類為核心
Spark 調度相關概念
- Task(任務):單個分區數據集上的最小處理單元
- TaskSet(任務集):有一組關聯的,但互相直接沒有Shuffle依賴關系的任務組成
- Stage(調度階段):一個任務集對應的調度階段
- Job (作業):由一個RDD Action 生成的一個或多個調度階段所組成的一次計算作業
- Application(應用程序):Spark 應用程序,有一個或者多個作業組成
Spark 調度相關概念邏輯關系圖
Spark 作業調度頂層邏輯
每個RDD Action類型的算子,內部都是一次隱式的作業提交
DAGScheduler最重要的任務之一就是計算作業和任務的依賴關系,制定調度邏輯。
DAGScheduler在SparkContext初始化的過程中被實例化,一個SparkContext應創建一個DAGScheduler
DAGScheduler內部維護著各種“任務/調度階段/作業”的狀態互相之間的映射表,用于在任務狀態,集群狀態更新時,能夠正確的維護作業的運行邏輯
Spark 作業調度流程圖
Spark作業調度交互流程
Spark 作業調度-調度階段的拆分
當一個RDD操作觸發計算,向DAGScheduler提交作業時,DAGScheduler需要從RDD依賴鏈的末端RDD出發,遍歷整個RDD依賴鏈,劃分調度階段,并決定各個調度階段之間的依賴關系調度階段的劃分是以ShuffleDependency為依據,也就是說當某個RDD的運算需要將數據進行shuffle操作時,整個包含了Shuffle依賴關系的RDD將被用來作為輸入信息,構建一個新的調度階段Spark 作業調度-finalStage的提交在劃分調度階段的步驟中會得到一個或多個有依賴關系的調度階段,其中直接觸發RDD關聯的調度階段稱為FinalStage,然后DAGScheduler進一步從這個FinalStage生成一個作業實例,這兩者的關系進一步存儲在映射表中,用于在該調度階段全部完成做一些后續處理,比如:狀態報告,清理作業相關數據等。
Spark 作業調度-狀態監控&任務結果獲取
DAGScheduler對外暴露了一系列的回調函數,對于TaskScheduler而言,這些回調函數主要包括任務的開始結束失敗,任務集的失敗,DAGScheduler根據這些任務的生命周期進一步維護作業呵調度階段的狀態信息
Spark 作業調度-任務結果獲取
一個具體任務在Executor中執行完畢后,其結果需要以某種形式返回給DAGScheduler根據調度的方式不同,返回的方式也不同。對于FinalStage所對應的任務,返回給DAGScheduler的是運算結果本身,而對于中間調度階段對應的任務ShuffleMapTask返回給DAGScheduler的是一個MapStatus對象,MapStatus對象管理了ShuffleMapTask的運算輸出結果在Blockmanager里的項目存儲信息,而非結果本身。根據任務結果的大小不同,ResultTask返回的結果又非為兩類,如果結果足夠小,則直接放在DirectTaskResult對象內,如果超過特定尺寸則在Executor端會將
DirectTaskResult先序列化,再把序列化的結果作為一個數據快存放在BlockManager中,然后將BlockManager返回的BlockID放在IndirectTaskResult對象中,返回給TaskScheduler。TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出并通過BlockManager最終取得對應的DirectTaskResult。
Spark 作業調度總結
Spark的調度管理是Spark作業運行和資源分配的核心,調度的層次依次是底層計算資源,任務調度,作業調度,應用調度。了解這些層次之間的邏輯關系,可以更方便的對Spark的運行狀態監控以及對于集群的配置優化。
希望本文對你有幫助!
感謝關注“碼農星球”。本文版權屬于“碼農星球”。我們提供咨詢和培訓服務,關于本文有任何困惑,請關注并聯系我們。
總結
以上是生活随笔為你收集整理的远程连接spark_spark内部原理篇之计算引擎和调度管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 界面无小事(八):RecyclerVie
- 下一篇: c语言将一个已知头结点的单链表逆序_C语