日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

本地提交spark_Spark 数据本地化级别

發布時間:2025/3/11 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 本地提交spark_Spark 数据本地化级别 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

?

RDD 源碼

大家可以看到源碼中的第五條注釋說明,翻譯過來的大概意思是提供一系列的最佳計算位置。

我之前一直不太清楚 spark 是如何內部實現的,今天就帶領大家來看一看 spark 的本地數據化級別在任務執行中的演變過程。

1 數據的本地化級別有哪些?

Spark 中任務的處理需要考慮數據的本地性,以 spark 1.6 為例,目前支持一下幾種。(中英文排版很頭疼,誰來幫幫我啊)

PROCESS_LOCAL 進程本地化,表示 task 要計算的數據在同一個 Executor 中。

NODE_LOCAL 節點本地化,速度稍慢,因為數據需要在不同的進程之間傳遞或從文件中讀取。分為兩種情況,第一種:task 要計算的數據是在同一個 worker 的不同 Executor 進程中。第二種:task 要計算的數據是在同一個 worker 的磁盤上,或在 HDFS 上恰好有 block 在同一個節點上。如果 Spark 要計算的數據來源于 HDFSD 上,那么最好的本地化級別就是 NODE_LOCAL。

NO_PREF 沒有最佳位置,數據從哪訪問都一樣快,不需要位置優先。比如 Spark SQL 從 Mysql 中讀取數據。

RACK_LOCAL 機架本地化,數據在同一機架的不同節點上。需要通過網絡傳輸數據以及文件 IO,比 NODE_LOCAL 慢。情況一:task 計算的數據在 worker2 的 EXecutor 中。情況二:task 計算的數據在 work2 的磁盤上。

ANY 跨機架,數據在非同一機架的網絡上,速度最慢。

如果不是很清楚,我畫(造)了一張圖放在這以供大家理解。

?

?

2 Spark 的數據本地化由誰來負責呢?

val rdd1 = sc.textFile("hdfs://tsl...") rdd1.cache()rdd1.map.filter.count()

?

上面這段簡單的代碼,背后其實做什么很多事情。Driver 的 TaskScheduler 在發送 task 之前,首先應該拿到 rdd1 數據所在的位置,rdd1 封裝了這個文件所對應的 block 的位置,DAGScheduler 通過調用 getPrerredLocations() 拿到 partition 所對應的數據的位置,TaskScheduler 根據這些位置來發送相應的 task。

具體的解釋:

DAGScheduler 切割Job,劃分Stage, 通過調用 submitStage 來提交一個Stage 對應的 tasks,submitStage 會調用 submitMissingTasks, submitMissingTasks 確定每個需要計算的 task 的preferredLocations,通過調用 getPreferrdeLocations() 得到 partition 的優先位置,就是這個 partition 對應的 task 的優先位置,對于要提交到 TaskScheduler 的 TaskSet 中的每一個task,該 task 優先位置與其對應的 partition 對應的優先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 會為每個 TaskSet 創建一個 TaskSetManager 對象,該對象包含taskSet 所有 tasks,并管理這些 tasks 的執行,其中就包括計算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在調度和延遲調度 tasks 時發揮作用。

總的來說,Spark 中的數據本地化是由 DAGScheduler 和 TaskScheduler 共同負責的。

3

計算節點與輸入數據位置的關系,下面以一個圖來展開 spark 是如何讓進行調度的。這一個過程會涉及 RDD, DAGScheduler , TaskScheduler。

?

?

第一步:PROCESS_LOCAL

TaskScheduler 根據數據的位置向數據節點發送 task 任務。如果這個任務在 worker1 的 Executor 中等待了 3 秒。(默認的,可以通過spark.locality.wait 來設置),可以通過 SparkConf() 來修改,重試了 5 次之后,還是無法執行,TaskScheduler 就會降低數據本地化的級別,從 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新發送 task 到 worker1 中的 Executor2 中執行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重試了 5 次,還是無法執行,TaskScheduler 就會降低數據本地化的級別,從 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新發送 task 到 worker2 中的 Executor1 中執行。

第四步:

當 task 分配完成之后,task 會通過所在的 worker 的 Executor 中的 BlockManager 來獲取數據。如果 BlockManager 發現自己沒有數據,那么它會調用 getRemote() 方法,通過 ConnectionManager 與原 task 所在節點的 BlockManager 中的 ConnectionManager先建立連接,然后通過TransferService(網絡傳輸組件)獲取數據,通過網絡傳輸回task所在節點(這時候性能大幅下降,大量的網絡IO占用資源),計算后的結果返回給Driver。這一步很像 shuffle 的文件尋址流程,Spark 的 shuffle 文件尋址流程

4

TaskScheduler在發送task的時候,會根據數據所在的節點發送task,這時候的數據本地化的級別是最高的,如果這個task在這個Executor中等待了三秒,重試發射了5次還是依然無法執行,那么TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級數據本地化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那么繼續降低數據本地化的級別...

如果想讓每一個 task 都能拿到最好的數據本地化級別,那么調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個 task 都拿到了最好的數據本地化級別,但是我們 job 執行的時間也會隨之延長。

下面是官方提供的參數說明:

?

?

可以在代碼里面這樣設置:

?

new SparkConf.set("spark.locality.wait

總結

以上是生活随笔為你收集整理的本地提交spark_Spark 数据本地化级别的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。