【Spark 深入学习 04】再说Spark底层运行机制
本節內容
· spark底層執行機制
· 細說RDD構建過程
· Job Stage的劃分算法
· Task最佳計算位置算法
?
一、spark底層執行機制
? ? 對于Spark底層的運行原理,找到了一副很好的圖,先貼上
?
?
? ??客戶端提交應用后,spark是如何執行的要有一個整體的概念,做到心中有數,先整體把握,才能更好的分模塊開墾細節,廢話不多說,先來看該圖如何更好的理解。
????1)提交前的聯系
?????Worker向Master或則ResourceManager匯報自己有哪些資源(內存、CPU、磁盤空間、網絡等),Master或則ResourceManager與Worker一直保持心跳
? ? 2)應用提交后
???Spark通過RDD對分布式的數據進行管理,RDD記錄了轉換成“spark格式”后的數據分區(記錄數據的存儲位置)和數據分區對應的操作
???· 應用提交后,形成RDD Graph,并且在后臺創建DAG對象(spark不僅僅用DAG建模,而且還會執行它,并且里面不是用對象表示,而是用RDD對象之間的關系)
???· DAG Scheduler 優先使用pipeline方法,把RDD的transformation壓縮,當碰到wide transformation 時,narrow無法和wide pipeline,那DAG scheduler會把前面的transformation定義成一個stage,DAG Scheduler的工作結果就是將RDD產生一組stages
???· 將DAG Scheduler產生的stages傳送給task scheduler,task scheduler使用集群管理器依次執行task,task被分配到各個work下執行,當所有的task執行完畢,一個stage標記完成,再運行下一個stage,直到整個spark job完成。
?
? ? 簡單理解, Spark 把要處理的數據,處理中間結果,和輸出結果都定義成 RDD. 這樣一個常見的 Spark job 就類似于:
????? 從數據源讀取數據,把輸入生成一個 RDD;
????? 通過運算把輸入 RDD 轉換成另一個RDD;
????? 再通過運算把生成的 RDD 轉換成另一個RDD,重復需要進行的 RDD 轉換操作 (此處省略一千遍);
????? 最后運算成結果 RDD,處理結果;
?
? ? ?Spark的運行流程: Client提交應用,master找到一個worker啟動driver[也可以其他],driver向master請求資源,之后將應用轉化為RDD Graph,再由DAGScheduler將RDD Graph轉換為stage的DAG提交給TaskScheduler,由TaskScheduler提交任務給executor。
? ? ?從調度來看,經歷了如下調度:application調度 -> Job調度 -> Stage調度 -> Task調度
?
二、細說RDD構建過程
? ? ?從前面的學習我們發現?RDD 其實就是數據集,是一組數據被處理到一個階段的狀態。
? ? ?每一個?Spark Job 就是定義了由輸入 RDD,如何把它轉化成下一個狀態,再下一個狀態 …… 直到轉化成我們的輸出。這些轉化就是對 RDD 里每一個 data record 的操作。用個高大上點的語言,一個 Spark job 就是一系列的 RDD 以及他們之間的轉換關系。那么用戶如何才能定義 RDD 和轉換關系呢?換句話說,用戶如何使用 Spark 呢?
? ? ?用戶需要定義一個包含主函數的?Java (main) 類。在這個 main 函數中,無論業務邏輯多么復雜,無論你需要使用多少 Java 類,如果從 Spark 的角度簡化你的程序,那么其實就是:
? ? ???首先生成?JavaSparkContext 類的對象.
? ?????從?JavaSparkContext 類的對象里產生第一個輸入RDD. 以讀取 HDFS 作為數據源為例,調用 JavaSparkContext.textFile() 就生成第一個 RDD.
? ?????每個?RDD 都定義了一些標準的常用的變化,比如我們上面提到的 map, filter, reduceByKey …… 這些變化在 Spark 里叫做 transformation.
? ?????之后可以按照業務邏輯,調用這些函數。這些函數返回的也是?RDD, 然后繼續調用,產生新的RDD …… 循環往復,構建你的 RDD 關系圖。
? ?????注意?RDD 還定義了其他一些函數,比如 collect, count, saveAsTextFile 等等,他們的返回值不是 RDD. 這些函數在 Spark 里叫做 actions, 他們通常作為 job 的結尾處理。
?? ????用戶調用?actions 產生輸出結果,Job 結束。
? ? Action 都是類似于 “數數這個 RDD 里有幾個 data record”, 或者 ”把這個 RDD 存入一個文件” 等等。想想他們作為結尾其實非常合理:我們使用 Spark 總是來實現業務邏輯的吧?處理得出的結果自然需要寫入文件,或者存入數據庫,或者數數有多少元素,或者其他一些統計什么的。所以 Spark 要求只有用戶使用了一個 action,一個 job 才算結束。當然,一個 job 可以有多個 action,比如我們的數據既要存入文件,我們又期望知道有多少個元素。
? ? ?這些?RDD 組成的關系在 Spark 里叫做 DAG,就是有向無循環圖,圖論里的一個概念,大家有興趣可以專門翻翻這個概念。可以發現,實踐中絕大部分業務邏輯都可以用 DAG 表示,所以 spark 把 job 定義成 DAG 也就不足為奇了。
?
RDD 的兩種變化
? ? ?我們上面剛剛介紹了?transformation 的概念。在 Spark 眼中,transformation 被分成 narrow transformation 和 wide transformation. 這是什么東西呢?
上文提到過?RDD 被分成幾個分區,分散在多臺機器上。當我們把一個 RDD A 轉化成下一個 RDD B 時,這里有兩種情況:
?· 有時候只需要一個?A 里面的一個分區,就可以產生 B 里的一個分區了,比如 map 的例子:A 和 B 之間每個分區是一一對應的關系,這就是 narrow transofmration.
?· 還有一類?transformation,可能需要 A 里面所有的分區,才能產生 B 里的一個分區,比如 reduceByKey的例子,這就是 wide transformation.
?
Narrow 或者 Wide 有什么關系嗎?
? ? 一個?Spark job 中可能需要連續地調用 transformation, 比如先 map,后 filter,然后再 map …… 那這些 RDD 的變化用圖表示就是:
?
?
? ? 我們可以大膽設想一下,如果每個分區里的數據就待在那臺機器的內存里,我們逐一的調用?map, filter, map 函數到這些分區里,Job 就很好的完成。
更重要的是,由于數據沒有轉移到別的機器,我們避免了?Network IO 或者 Disk IO. 唯一的任務就是把 map / filter 的運行環境搬到這些機器上運行,這對現代計算機來說,overhead 幾乎可以忽略不計。
? ? ?這種把多個操作合并到一起,在數據上一口氣運行的方法在?Spark 里叫 pipeline (其實 pipeline 被廣泛應用的很多領域,比如 CPU)。這時候不同就出現了:只有 narrow transformation 才可以進行 pipleline 操作。對于 wide transformation, RDD 轉換需要很多分區運算,包括數據在機器間搬動,所以失去了 pipeline 的前提。
RDD 的執行
? ?當用戶調用?actions 函數時,Spark 會在后臺創建出一個 DAG. 就是說 Spark 不僅用 DAG 建模,而且真正地創建出一個 DAG, 然后執行它(順便說一句 DAG 在 Spark 里不是用一個對象表示的,而是用 RDD 對象之間的關系)。?
?
?
? ? ?Spark 會把這個 DAG 交給一個叫 DAG scheduler 的模塊,DAG scheduler 會優先使用 pipeline 方法,把 RDD 的 transformation 壓縮;當我們遇到 wide transformation 時,由于之前的 narrow transformation 無法和 wide transformation pipeline, 那 DAG scheduler 會把前面的 transformation 定義成一個 stage.
重要的事情說三遍:DAG scheduler 會分析 Spark Job 所有的 transformation, 用 wide transformation 作為邊界,把所有 transformation 分成若干個stages. 一個 stage 里的一個分區就被 Spark 叫做一個task. 所以一個 task 是一個分區的數據和數據上面的操作,這些操作可能包括一個 transformation,也可能是多個,但一定是 narrow transformation.
? ? ?DAG scheduler 工作的結果就是產生一組 stages. 這組 stages 被傳到 Spark 的另一個組件 task scheduler, task scheduler 會使用集群管理器依次執行 task, 當所有的 task 執行完畢,一個 stage 標記完成;再運行下一個 stage …… 直到整個 Spark job 完成。
?
?
三、Job Stage的劃分算法
? ? ?從前文了解到的處理流程,RDD Graph->DAG Scheduler->Task Scheduler,DAG Scheduler將RDD轉換為Job Stage。
? ? ?由于Spark的算子構建一般都是鏈式的,這就涉及了要如何進行這些鏈式計算,Spark的策略是對這些算子,先劃分Stage,然后在進行計算。
?????? 由于數據是分布式的存儲在各個節點上的,所以為了減少網絡傳輸的開銷,就必須最大化的追求數據本地性,所謂的數據本地性是指,在計算時,數據本身已經在內存中或者利用已有緩存無需計算的方式獲取數據。
1. ?Stage劃分算法思想
(1)一個Job由多個Stage構成
? ? ?一個Job可以有一個或者多個Stage,Stage劃分的依據就是寬依賴,產生寬依賴的算子:reduceByKey、groupByKey等等
(2)根據依賴關系,從前往后依次執行多個Stage
?????? SparkApplication 中可以因為不同的Action觸發眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是有一個或者多個Stage構成,后面的Stage依賴前面的Stage,也就是說只有前面的Stage計算完后,后面的Stage才會運行。
?(3)Stage的執行時Lazy級別的
?????? 所有的Stage會形成一個DAG(有向無環圖),由于RDD的Lazy特性,導致Stage也是Lazy級別的,只有遇到了Action才會真正發生作業的執行,在Action之前,Spark框架只是將要進行的計算記錄下來,并沒有真的執行。Action導致作業執行的代碼如下:觸發作業,發送消息。消息的接收和處理:
(1)DAGScheduler啟動一個線程EventLoop(消息循環器),不斷地從消息隊列中取消息。消息是通過EventLoop的put方法放入消息隊列,當EventLoop拿到消息后會回調DAGScheduler的OnReceive,進而調用doOnReceive方法進行處理。
為什么要開辟線程來執行消息的讀、取?這樣可以提交更多的Job,異步處理多Job,處理的業務邏輯一致(調用自己方法也是發送消息),解耦合,擴展性好。
(2)在doOnReceive中通過模式匹配的方式把JobSubmitted封裝的內容路由到handleJobSubmitted。
(3)在handleJobSubmitted中首先創建finalStage。
(4)通過遞歸的方式創建DAG。
?
四、Task最佳計算位置算法
1.Task任務本算法運用場景??
? ? ? ?在上一節,我們介紹了Job Stage劃分算法,并最終得到了DAG圖中的Result Stage(final Stage)。接下來我們通過查看Task任務本地性(為了保證Data Locality)的運用場景----Task的運行調度處理,來引入Task任務本地性算法。
? ? ? 在得到邏輯上Result Stage,Spark為了進行計算就必須先報任務以一定的集群可識別形式提交給集群進行計算。Spark的任務提交過程如下:
(1)生成ActiveJob,為提交finalStage做準備。
(2)提交finalStage
提交Stage,如果有未提交的ParentStage,則會遞歸提交這些ParentStage,只有所有ParentStage都計算完了,才能提交當前Stag
(3)提交MissingTask
??missingTask會最先會再到需要計算的分片,然后對Stage的運行環境進行設定,然后取得Task計算的本地性級別,最后會根據這些信息建立Tasks來處理每個分片,在提交給底層TaskScheduler之前,Spark還會將Tasks封裝成TaskSet。最后提交TaskSet給TaskScheduler,等待TaskScheduler最終向集群提交這些Task,并且DAGScheduler會監聽這些Task的狀態。
2.數據本地性
(1)這里我們來著重講解獲取數據本地性部分的代碼:
?
? ? ? ?這里會將要計算的分片(Partition)轉換為(id, getPreferredLocs(stage.rdd, id)) 類型的truple,進而由truple轉換未一個Map映射,在Task構造時需要一個locs參數,便可以利用這個映射由id得到相應Partition的本地性級別。
? ? ? 在每個分片(Partition)內部則是通過getPreferredLocs方法得到的
在具體算法實現的時候,首先查詢DAGScheduler的內存數據結構中是否存在當前partition的數據本地性信息,若有的話就直接放回該信息;若沒有首先會調用rdd.getPreferredLocations來得到數據的本地性。
? ? ? 例如想讓Spark運行在Hbase上或者是一種現在Spark還沒有直接支持的數據庫上,此時開發者需要自定義RDD,為了保證Task計算的數據本地性,最為關鍵的方式就是必須實現RDD的getPreferredLocations方法,來支持各種來源的數據。
? ? ?DAGScheduler計算數據本地性時,巧妙的借助了RDD自身的getPreferredLocations中的數據,最大化的優化效率,因為getPreferredLocations中表明了每個Partition的數據本地性。雖然當然Partition可能被persist或checkpoint,但是persist或checkpoint默認情況下肯定和getPreferredLocations中的partition的數據本地性是一致的。所以,這中算法就極大的簡化了Task數據本地性算法的實現,并且優化了效率
?
五、參考資料
1.http://mp.weixin.qq.com/s/nDRt1VQTYmsYcW98q4cvEQ-五分鐘深入 Spark 運行機制
2.http://blog.csdn.net/sinat_25306771/article/details/51429984
?
轉載于:https://www.cnblogs.com/licheng/p/6815297.html
總結
以上是生活随笔為你收集整理的【Spark 深入学习 04】再说Spark底层运行机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Tomcat】Tomcat 系统架构与
- 下一篇: Jakarta Commons Logg