spark性能优化 -- spark工作原理
從本篇文章開始,將開啟 spark 學習和總結之旅,專門針對如何提高 spark 性能進行總結,力圖總結出一些干貨。
無論你是從事算法工程師,還是數據分析又或是其他與數據相關工作,利用 spark 進行海量數據處理和建模都是非常重要和必須掌握的一門技術,我感覺編寫 spark 代碼是比較簡單的,特別是利用 Spark SQL 下的 DataFrame 接口進行數據處理,只要有 python 基礎都是非常容易入門的,但是在性能調優上,許多人都是一知半解,寫的 spark 程序經常陷入 OOM 或卡死狀態。這時深入了解 spark 原理就顯得非常有必要了。
本系列總結主要針對 Hadoop YARN 模式。
RDD(Resilient Distributed Datasets)
RDD 是 spark 中最基本的數據抽象,存儲在 exector 或 node 中,它代表一個 “惰性,”“靜態”,“不可變”,“分布式“的數據集合,RDD 基本介紹在網上上太多了,這里就不做詳細介紹了,主要講下以下內容:
transform(轉換)與 action(執行)的區別
轉換操作:返回的是一個新的 RDD,常見的如:map、filter、flatMap、groupByKey 等等 執行操作:返回的是一個結果,一個數值或者是寫入操作等,如 reduce、collect、count、first 等等
惰性計算
spark 中計算 RDD 是惰性的,也即 RDD 真正被計算(執行操作,例如寫入存儲操作、collect 操作等)時,其轉換操作才會真正被執行。spark 為什么采用惰性計算:
在 MapReduce 中,大量的開發人員浪費在最小化 MapReduce 通過次數上。通過將操作合并在一起來實現。在 Spark 中,我們不創建單個執行圖,而是將許多簡單的操作結合在一起。因此,它造成了 Hadoop MapReduce 與 Apache Spark 之間的差異。
惰性設計的好處:
① 提高可管理性 可以查看整個 DAG(將對數據執行的所有轉換的圖形),并且可以使用該信息來優化計算。
② 降低時間復雜度和加快計算速度 只運算真正要計算的轉換操作,并且可以根據 DAG 圖,合并不需要與 drive 通信的操作(連續的依賴轉換),例如在一個 RDD 上同時調用 map 和 filter 轉換操作,spark 可以將 map 和 filter 指令發送到每個 executor 上,spark 程序在真正執行 map 和 filter 時,只需訪問一次 record,而不是發送兩組指令并兩次訪問分區。理論上相對于非惰性,將時間復雜度降低了一半。例如:
val list1 = list.map(i -> i * 3) // Transformation1 val list2 = list1.map(i -> i + 3) // Transformation1 val list3 = list1.map(i -> i / 3) // Transformation1 list3.collect() // ACTION假設原始列表(list) 很大,其中包含數百萬個元素。如果沒有懶惰的評估,我們將完成三遍如此龐大的計算。如果我們假設一次這樣的列表迭代需要 10 秒,那么整個評估就需要 30 秒。并且每個 RDD 都會緩存下來,浪費內存。使用惰性評估,Spark 可以將這三個轉換像這樣合并到一個轉換中,如下:
val list3 = list.map(i -> i + 1)它將只執行一次該操作。只需一次迭代即可完成,這意味著只需要 10 秒的時間。
容錯性
RDD 本身包含其復制所需的所有依賴信息,一旦該 RDD 中某個分區丟失了,該 RDD 有足夠需要重新計算的信息,可以去并行的,很快的重新計算丟失的分區。
運行在內存
在 spark application 的生命周期中,RDD 始終常駐內存(在所在的節點內存),這也是其比 MapReduce 更快的重要原因。
spark 中提供了三種內存管理機制:
① in-memory as deserialized data 這種常駐內存方式速度快(因為去掉了序列化時間),但是內存利用效率低。
② in-memory as serialized data 該方法內存利用效率高,但是速度慢
③ 直接存在 disk 上 對于那些較大容量的 RDD,沒辦法直接存在內存中,需要寫入到 DISK 上。該方法僅適用于大容量 RDD。要持久化一個 RDD,只要調用其 cache()或者 persist()方法即可。在該 RDD 第一次被計算出來時,就會直接緩存在每個節點中。而且 Spark 的持久化機制還是自動容錯的,如果持久化的 RDD 的任何 partition 丟失了,那么 Spark 會自動通過其源 RDD,使用 transformation 操作重新計算該 partition。
cache()和 persist()的區別在于,cache()是 persist()的一種簡化方式,cache()的底層就是調用的 persist()的無參版本,同時就是調用 persist(MEMORY_ONLY),將數據持久化到內存中。如果需要從內存中清楚緩存,那么可以使用 unpersist()方法。
我們來仔細分析下持久化和非持久化的區別:
非持久化:持久化:
顯然對于要復用多次的 RDD,要將其進行持久化操作,此時 Spark 就會根據你的持久化策略,將 RDD 中的數據保存到內存或者磁盤中。以后每次對這個 RDD 進行算子操作時,都會直接從內存或磁盤中提取持久化的 RDD 數據,然后執行算子,而不會從源頭處重新計算一遍這個 RDD,再執行算子操作。 所以在寫 spark 代碼時:盡可能復用同一個 RDD。
這里常有個誤區:
val rdd1 = ... // 讀取hdfs數據,加載成RDD rdd1.cache // 持久化操作val rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...)rdd1.unpersist // 釋放緩存rdd2.take(10).foreach(println) rdd3.take(10).foreach(println)如果按上述代碼進行持久化,則效果就如同沒有持久化一樣。原因就在于 spark 的 lazy 計算。
代碼應該如下:
val rdd1 = ... // 讀取hdfs數據,加載成RDD rdd1.cacheval rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...)rdd2.take(10).foreach(println) rdd3.take(10).foreach(println)rdd1.unpersistrdd2 執行 take 時,會先緩存 rdd1,接下來直接 rdd3 執行 take 時,直接利用緩存的 rdd1,最后,釋放掉 rdd1。所以在何處釋放 RDD 也是非常需要細心的。 請在 action 之后 unpersisit!!!
Spark Job Scheduling
窄依賴 與 寬依賴
shuffle 過程,簡單來說,就是將分布在集群中多個節點上的同一個 key,拉取到同一個節點上,進行聚合或 join 等操作。比如 reduceByKey、join 等算子,都會觸發 shuffle 操作。shuffle 操作需要將數據進行重新聚合和劃分,然后分配到集群的各個節點上進行下一個 stage 操作,這里會涉及集群不同節點間的大量數據交換。由于不同節點間的數據通過網絡進行傳輸時需要先將數據寫入磁盤,因此集群中每個節點均有大量的文件讀寫操作,從而導致 shuffle 操作十分耗時(相對于 map 操作)。
窄依賴:父 RDD 與 子 RDD 的分區是一對一(map 操作)或多對一(coalesce)的,不會有 shuffle 過程;并且子 RDD 的分區結果與其 key 和 value 值無關,每個分區與其他分區亦無關。
上面左圖可對應 map 操作分區,右圖對應 coalesce 操作。
寬依賴:父 RDD 與子 RDD 的分區是一對多的關系,并且是按一定方式進行重分區,會有 shuffle 過程產生,比較耗時,可能會引發 spark 性能問題。常見的寬依賴操作如:groupByKey、reduceByKey、sort、sortByKey 等等。注意:coalesce 操作如果是將 10 個分區換成 100 個分區,由少分區轉成大分區將會發生 shuffle 過程。coalesce 操作場景主要是 rdd 經過多層過濾后的小文件合并。rdd 的 reparation 方法與 coalesce 相反,主要是為了 處理數據傾斜,增加 partiton 的數量使得每個 task 處理的數據量減少,肯定會有 shuffle 過程產生(repartition 其實調用的就是 coalesce,只不過 shuffle = true (coalesce 中 shuffle: Boolean = false))。
Spark Application
一個 spark 應用主要由一系列的 spark Job 組成,而這些 spark Job 由 sparkContext 定義而來。當 SparkContent 啟動時,一個 driver 和一系列的 executor 會在集群的工作節點上啟動。每個 executor 都有個 JVM 虛擬環境,一個 executor 不能跨越多個節點。
上圖表示在一個分布式系統上啟動一個 spark application 的物理硬件層面流程。
啟動一個 SparkContext
驅動程序(driver program)會定義一個集群管理(cluster manager)
cluster manager 會在工作節點上啟動一些 executor,運行提交的代碼(注意:一個節點 node 上會有多個 executor,但是一個 executor 不能跨越多個 node)
需要注意以下兩點:
一個節點 node 上會有多個 executor,但是一個 executor 不能跨越多個 node
每個 executor 會有多個分區,但是一個分區不能跨越多個 executor
DAG(Directed Acyclic Graph)詳解
spark Application tree
簡而言之:一個 spark Application 由多個 Job 組成,Job 由提交代碼中的 Action 操作定義,而一個 Action 操作由多個 Stage 組成,Stage 的分割由寬依賴進行分割的,而每個 Stage 又由多個 Task 組成。一個 Task 對應一個分區,一個 task 會被分配到一個 executor 上執行。
每個 Job 都對應一個 DAG 圖,每個 DAG 有一系列的 Stage 組成。
Job:每個 Job 對應一個 Action 操作,在 spark execution Graph 中,其邊是基于代碼中的 transform 操作的依賴關系定義的。
Stages:每個 Action 中可能包含一個或多個 transform 操作,其中寬依賴又將 Job 劃分成多個 Stage。因為 Stages 的邊緣需要和 driver 進行通信,故通常一個 Job 里,必須順序的執行 Stages 而非并行。并且會將多個窄依賴步驟合并成一個步驟,因為其中沒有的轉換操作沒有 shuffle 過程,可以通過只訪問一次數據,連續執行多個 transform 操作,這也是上面提到的惰性計算的優點。
其代碼中對應的 Stage 如下:
Task:task 是 spark 中最小最基本的執行單元,每個 task 代表一個局部的計算任務。在 executor 中可以有多個 core,而每個 core 可以對應一個 task,每個 task 針對一個分區。 每次針對不同的一塊分區,執行相同的代碼。
注意:
spark 中同時并行的 task 數量不能超過所有 executor core 數量。 其中 所有 executor cores 數量= 每個 executor 中 core 數量 * executor 數量。
task 的并行化是有 executor 數量 × core 數量決定的。task 過多,并行化過小,就會浪費時間;反之就會浪費資源。所以設置參數是一個需要權衡的過程,原則就是在已有的資源情況下,充分利用內存和并行化。
總結
對于 DAG 的深刻理解非常重要,如果理解不深刻則可能定位問題的效率不高。比如常見的數據傾斜。當理解了這些,如果出現了數據傾斜,可以分析 job,stage 和 task,找到部分 task 輸入的嚴重不平衡,最終定位是數據問題或計算邏輯問題。
參考
High Performance Spark
https://www.quora.com/What-is-the-reason-behind-keeping-lazy-evaluation-in-Apache-Spark
https://data-flair.training/blogs/apache-spark-lazy-evaluation/
http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html
備注:公眾號菜單包含了整理了一本AI小抄,非常適合在通勤路上用學習。
往期精彩回顧2019年公眾號文章精選適合初學者入門人工智能的路線及資料下載機器學習在線手冊深度學習在線手冊AI基礎下載(第一部分)備注:加入本站微信群或者qq群,請回復“加群”加入知識星球(4500+用戶,ID:92416895),請回復“知識星球”喜歡文章,點個在看
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的spark性能优化 -- spark工作原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 50题matplotlib从入门到精通
- 下一篇: 案例 | 用pdpipe搭建pandas