Spark Core
Spark Core
?? ?DAG概念
?? ??? ?有向無環圖
?? ??? ?Spark會根據用戶提交的計算邏輯中的RDD的轉換(變換方法)和動作(action方法)來生成RDD之間的依賴關系,同時這個計算鏈也就生成了邏輯上的DAG。
?? ??? ?RDD之間的關系可以從兩個維度來理解:一個是RDD是從哪些RDD轉換而來,也就是RDD的parent?RDD(s)是什么;還有就是依賴于parent?RDD(s)的哪些Partition(s)。這個關系,就是RDD之間的依賴,org.apache.spark.Dependency。根據依賴于parent?RDD(s)的Partitions的不同情況,Spark將這種依賴分為兩種,一種是寬依賴,一種是窄依賴。
?? ?DAG的生成與Stage的劃分
?? ??? ?DAG的生成
?? ??? ??? ?原始的RDD(s)通過一系列轉換就形成了DAG。RDD之間的依賴關系,包含了RDD由哪些Parent?RDD(s)轉換而來和它依賴parent?RDD(s)的哪些Partitions,是DAG的重要屬性。
?? ??? ??? ?借助這些依賴關系,DAG可以認為這些RDD之間形成了Lineage(血統,血緣關系)。借助Lineage,能保證一個RDD被計算前,它所依賴的parent?RDD都已經完成了計算;同時也實現了RDD的容錯性,即如果一個RDD的部分或者全部的計算結果丟失了,那么就需要重新計算這部分丟失的數據。
?? ??? ?Spark的Stage(階段)
?? ??? ??? ?Spark在執行任務(job)時,首先會根據依賴關系,將DAG劃分為不同的階段(Stage)
?? ??? ??? ?處理流程是:
?? ??? ??? ??? ?1)Spark在執行Transformation類型操作時都不會立即執行,而是懶執行(計算)
?? ??? ??? ??? ?2)執行若干步的Transformation類型的操作后,一旦遇到Action類型操作時,才會真正觸發執行(計算)
?? ??? ??? ??? ?3)執行時,從當前Action方法向前回溯,如果遇到的是窄依賴則應用流水線優化,繼續向前找,直到碰到某一個寬依賴
?? ??? ??? ??? ?4)因為寬依賴必須要進行shuffle,無法實現優化,所以將這一次段執行過程組裝為一個stage
?? ??? ??? ??? ?5)再從當前寬依賴開始繼續向前找。重復剛才的步驟,從而將這個DAG還分為若干的stage
?? ??? ??? ?在stage內部可以執行流水線優化,而在stage之間沒辦法執行流水線優化,因為有shuffle。但是這種機制已經盡力的去避免了shuffle
?? ??? ?Spark的Job和Task
?? ??? ??? ?原始的RDD經過一系列轉換后(一個DAG),會在最后一個RDD上觸發一個動作,這個動作會生成一個Job。
?? ??? ??? ?所以可以這樣理解:一個DAG對應一個Spark的Job。
?? ??? ??? ?在Job被劃分為一批計算任務(Task)后,這批Task會被提交到集群上的計算節點去計算Spark的Task分為兩種:
?? ??? ??? ??? ?1)org.apache.spark.scheduler.ShuffleMapTask
?? ??? ??? ??? ?2)org.apache.spark.scheduler.ResultTask
?? ??? ??? ?簡單來說,DAG的最后一個階段會為每個結果的Partition生成一個ResultTask,其余所有的階段都會生成ShufffleMapTask。
?? ?RDD
?? ??? ?RDD就是帶有分區的集合類型
?? ??? ??? ?RDD是分布式的,彈性的,容錯的數據結構
?? ??? ??? ?彈性分布式數據集(RDD),特點是可以并行操作,并且是容錯的。有兩種方法可以創建RDD:
?? ??? ??? ??? ?1)執行Transform操作(變換操作),
?? ??? ??? ??? ?2)讀取外部存儲系統的數據集,如HDFS,HBase,或任何與Hadoop有關的數據源。
?? ??? ??? ??? ?注:創建RDD的方式有多種,比如案例一中是基于一個基本的集合類型(Array)轉換而來,像parallelize這樣的方法還有很多此外,我們也可以在讀取數據集時就創建RDD。
?? ??? ??? ?分區概念
?? ??? ??? ??? ?可以在不同的機器上并行處理
?? ??? ??? ?它是spark提供的一個特殊集合類。諸如普通的集合類型,如傳統的Array:(1,2,3,4,5)是一個整體,但轉換成RDD后,我們可以對數據進行Partition(分區)處理,這樣做的目的就是為了分布式。
?? ??? ??? ??? ?你可以讓這個RDD有兩個分區,那么有可能是這個形式:RDD(1,2) (3,4)。
?? ??? ??? ??? ?這樣設計的目的在于:可以進行分布式運算。
?? ??? ?RDD操作
?? ??? ??? ?針對RDD的操作,分兩種,一種是Transformation(變換),一種是Actions(執行)。
?? ??? ??? ?Transformation(變換)操作屬于懶操作(算子),不會真正觸發RDD的處理計算。
?? ??? ??? ?變換方法的共同點:1.不會馬上觸發計算 2.每當調用一次變換方法,都會產生一個新的RDD,Actions(執行)操作才會真正觸發。
?? ??? ?RDD的依賴關系
?? ??? ??? ?RDD和它依賴的parent?RDD(s)的關系有兩種不同的類型,即窄依賴(narrow?dependency)和寬依賴(wide?dependency)。
?? ??? ??? ?1)窄依賴指的是每一個parent?RDD的Partition最多被子RDD的一個Partition使用
?? ??? ??? ??? ?對于窄依賴操作,它們只是將Partition的數據根據轉換的規則進行轉化,并不涉及其他的處理,可以簡單地認為只是將數據從一個形式轉換到另一個形式。
?? ??? ??? ??? ?所以對于窄依賴,并不會引入昂貴的Shuffle。所以執行效率非常高。如果整個DAG中存在多個連續的窄依賴,則可以將這些連續的窄依賴整合到一起連續執行,中間不執行shuffle 從而提高效率,這樣的優化方式稱之為流水線優化。
?? ??? ??? ??? ?此外,針對窄依賴,如果子RDD某個分區數據丟失,只需要找到父RDD對應依賴的分區,恢復即可。但如果是寬依賴,當分區丟失時,最糟糕的情況是要重算所有父RDD的所有分區。
?? ??? ??? ?2)寬依賴指的是多個子RDD的Partition會依賴同一個parent?RDD的Partition。
?? ??? ??? ??? ?對于groupByKey這樣的操作,子RDD的所有Partition(s)會依賴于parent?RDD的所有Partition(s),子RDD的Partition是parent?RDD的所有Partition?Shuffle的結果。
?? ??? ??? ?Shuffle概述
?? ??? ??? ??? ?spark中一旦遇到寬依賴就需要進行shuffle的操作,所謂的shuffle的操作的本質就是將數據匯總后重新分發的過程
?? ??? ??? ??? ?這個過程數據要匯總到一起,數據量可能很大所以不可避免的需要進行數據落磁盤的操作,會降低程序的性能,所以spark并不是完全內存不讀寫磁盤,只能說它盡力避免這樣的過程來提高效率 。
?? ??? ??? ??? ?spark中的shuffle,在早期的版本中,會產生多個臨時文件,但是這種多臨時文件的策略造成大量文件的同時的讀寫,磁盤的性能被分攤給多個文件,每個文件讀寫效率都不高,影響spark的執行效率。所以在后續的spark中(1.2.0之后的版本)的shuffle中,只會產生一個文件,并且數據會經過排序再附加索引信息,減少了文件的數量并通過排序索引的方式提升了性能。
?? ??? ?RDD容錯機制
?? ??? ??? ?分布式系統通常在一個機器集群上運行,同時運行的幾百臺機器中某些出問題的概率大大增加,所以容錯設計是分布式系統的一個重要能力。
?? ??? ??? ?Spark以前的集群容錯處理模型,像MapReduce,將計算轉換為一個有向無環圖(DAG)的任務集合,這樣可以通過重復執行DAG里的一部分任務來完成容錯恢復。但是由于主要的數據存儲在分布式文件系統中,沒有提供其他存儲的概念,容錯過程需要在網絡上進行數據復制,從而增加了大量的消耗。所以,分布式編程中經常需要做檢查點,即將某個時機的中間數據寫到存儲(通常是分布式文件系統)中。
?? ??? ??? ?RDD也是一個DAG,每一個RDD都會記住創建該數據集需要哪些操作,跟蹤記錄RDD的繼承關系,這個關系在Spark里面叫lineage(血緣關系)。當一個RDD的某個分區丟失時,RDD是有足夠的信息記錄其如何通過其他RDD進行計算,且只需重新計算該分區,這是Spark的一個創新。
?? ??? ?RDD的緩存
?? ??? ??? ?相比Hadoop?MapReduce來說,Spark計算具有巨大的性能優勢,其中很大一部分原因是Spark對于內存的充分利用,以及提供的緩存機制
?? ??? ??? ?RDD持久化(緩存)
?? ??? ??? ??? ?持久化在早期被稱作緩存(cache),但緩存一般指將內容放在內存中。雖然持久化操作在絕大部分情況下都是將RDD緩存在內存中,但一般都會在內存不夠時用磁盤頂上去(比操作系統默認的磁盤交換性能高很多)。當然,也可以選擇不使用內存,而是僅僅保存到磁盤中。所以,現在Spark使用持久化(persistence)這一更廣泛的名稱。
?? ??? ??? ?默認情況下,RDD只使用一次,用完即扔,再次使用時需要重新計算得到,而持久化(緩存)操作避免了這里的重復計算,實際測試也顯示持久化對性能提升明顯,這也是Spark剛出現時被人稱為內存計算框架的原因。
?? ??? ??? ?持久化的方法是調用persist()函數,除了持久化至內存中,還可以在persist()中指定storage?level參數使用其他的類型,具體如下:
?? ??? ??? ??? ?1)MEMORY_ONLY : 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中. 如果內存空間不夠,部分數據分區將不會被緩存,在每次需要用到這些數據時重新進行計算. 這是默認的級別。
?? ??? ??? ??? ?cache()方法對應的級別就是MEMORY_ONLY級別
?? ??? ??? ??? ?2)MEMORY_AND_DISK:將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。如果內存空間不夠,將未緩存的數據分區存儲到磁盤,在需要使用這些分區時從磁盤讀取。
?? ??? ??? ??? ?3)MEMORY_ONLY_SER :將 RDD 以序列化的 Java 對象的形式進行存儲(每個分區為一個 byte 數組)。這種方式會比反序列化對象的方式節省很多空間,尤其是在使用 fast serialize時會節省更多的空間,但是在讀取時會使得 CPU 的 read 變得更加密集。如果內存空間不夠,部分數據分區將不會被緩存,在每次需要用到這些數據時重新進行計算。
?? ??? ??? ??? ?4)MEMORY_AND_DISK_SER :類似于 MEMORY_ONLY_SER ,但是溢出的分區會存儲到磁盤,而不是在用到它們時重新計算。如果內存空間不夠,將未緩存的數據分區存儲到磁盤,在需要使用這些分區時從磁盤讀取。
?? ??? ??? ??? ?5)DISK_ONLY:只在磁盤上緩存 RDD。
?? ??? ??? ??? ?6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :與上面的級別功能相同,只不過每個分區在集群中兩個節點上建立副本。
?? ??? ??? ??? ?7)OFF_HEAP 將數據存儲在 off-heap memory 中。使用堆外內存,這是Java虛擬機里面的概念,堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。使用堆外內存的好處:可能會利用到更大的內存存儲空間。但是對于數據的垃圾回收會有影響,需要程序員來處理
?? ??? ??? ??? ?注意,可能帶來一些GC回收問題。
?? ??? ??? ?緩存數據的清除
?? ??? ??? ??? ?Spark?會自動監控每個節點上的緩存數據,然后使用?least-recently-used?(LRU)?機制來處理舊的緩存數據。如果你想手動清理這些緩存的?RDD?數據而不是去等待它們被自動清理掉,
?? ??? ??? ??? ?可以使用?RDD.unpersist(?)?方法。
?? ??? ??? ?Spark?也會自動持久化一些在?shuffle?操作過程中產生的臨時數據(比如?reduceByKey),即便是用戶并沒有調用持久化的方法。這樣做可以避免當?shuffle?階段時如果一個節點掛掉了就得重新計算整個數據的問題。如果用戶打算多次重復使用這些數據,我們仍然建議用戶自己調用持久化方法對數據進行持久化。
?? ?Spark框架核心概念
?? ??? ?1.RDD。彈性分布式數據集,是Spark最核心的數據結構。有分區機制,所以可以分布式進行處理。有容錯機制,通過RDD之間的依賴關系來恢復數據。
?? ??? ?2.依賴關系。RDD的依賴關系是通過各種Transformation(變換)來得到的。父RDD和子RDD之間的依賴關系分兩種:①窄依賴? ②寬依賴
?? ??? ??? ?①針對窄依賴:父RDD的分區和子RDD的分區關系是:一對一
?? ??? ??? ?窄依賴不會發生Shuffle,執行效率高,spark框架底層會針對多個連續的窄依賴執行流水線優化,從而提高性能。例如 map? flatMap等方法都是窄依賴方法
?? ??? ??? ?②針對寬依賴:父RDD的分區和子RDD的分區關系是:一對多
?? ??? ??? ?寬依賴會產生shuffle,會產生磁盤讀寫,無法優化。
?? ??? ?3.DAG。有向無環圖,當一整條RDD的依賴關系形成之后,就形成了一個DAG。一般來說,一個DAG,最后都至少會觸發一個Action操作,觸發執行。一個Action對應一個Job任務。
?? ??? ?4.Stage。一個DAG會根據RDD之間的依賴關系進行Stage劃分,流程是:以Action為基準,向前回溯,遇到寬依賴,就形成一個Stage。遇到窄依賴,則執行流水線優化(將多個連續的窄依賴放到一起執行)
?? ??? ?5.task。任務。一個分區對應一個task??梢赃@樣理解:一個Stage是一組Task的集合
?? ??? ?6.RDD的Transformation(變換)操作:懶執行,并不會立即執行
?? ??? ?7.RDD的Action(執行)操作:觸發真正的執行
?? ?Spark Shuffle詳解
?? ??? ?Shuffle,翻譯成中文就是洗牌。之所以需要Shuffle,還是因為具有某種共同特征的一類數據需要最終匯聚(aggregate)到一個計算節點上進行計算。這些數據分布在各個存儲節點上并且由不同節點的計算單元處理。
?? ??? ?數據重新打亂然后匯聚到不同節點的過程就是Shuffle。但是實際上,Shuffle過程可能會非常復雜:
?? ??? ??? ?1)數據量會很大,比如單位為TB或PB的數據分散到幾百甚至數千、數萬臺機器上。
?? ??? ??? ?2)為了將這個數據匯聚到正確的節點,需要將這些數據放入正確的Partition,因為數據大小已經大于節點的內存,因此這個過程中可能會發生多次硬盤續寫。
?? ??? ??? ?3)為了節省帶寬,這個數據可能需要壓縮,如何在壓縮率和壓縮解壓時間中間做一個比較好的選擇?
?? ??? ??? ?4)數據需要通過網絡傳輸,因此數據的序列化和反序列化也變得相對復雜。
?? ??? ??? ?一般來說,每個Task處理的數據可以完全載入內存(如果不能,可以減小每個Partition的大小),因此Task可以做到在內存中計算。但是對于Shuffle來說,如果不持久化這個中間結果,一旦數據丟失,就需要重新計算依賴的全部RDD,因此有必要持久化這個中間結果。所以這就是為什么Shuffle過程會產生文件的原因。
?? ??? ??? ?如果Shuffle過程不落地,①可能會造成內存溢出 ②當某分區丟失時,會重新計算所有父分區數據
?? ??? ?Shuffle Write
?? ??? ??? ?Shuffle?Write,即數據是如何持久化到文件中,以使得下游的Task可以獲取到其需要處理的數據的(即Shuffle?Read)。在Spark 0.8之前,Shuffle Write是持久化到緩存的,但后來發現實際應用中,shuffle過程帶來的數據通常是巨量的,所以經常會發生內存溢出的情況,所以在Spark?0.8以后,Shuffle?Write會將數據持久化到硬盤,再之后Shuffle?Write不斷進行演進優化,但是數據落地到本地文件系統的實現并沒有改變。
?? ??? ??? ?1)Hash?Based?Shuffle?Write
?? ??? ??? ??? ?在Spark?1.0以前,Spark只支持Hash?Based?Shuffle。因為在很多運算場景中并不需要排序,因此多余的排序只能使性能變差,比如Hadoop的Map?Reduce就是這么實現的,也就是Reducer拿到的數據都是已經排好序的。實際上Spark的實現很簡單:每個Shuffle?Map?Task根據key的哈希值,計算出每個key需要寫入的Partition然后將數據單獨寫入一個文件,這個Partition實際上就對應了下游的一個Shuffle?Map?Task或者Result?Task。因此下游的Task在計算時會通過網絡(如果該Task與上游的Shuffle?Map?Task運行在同一個節點上,那么此時就是一個本地的硬盤讀寫)讀取這個文件并進行計算。
?? ??? ??? ??? ?Hash?Based?Shuffle?Write存在的問題
?? ??? ??? ??? ??? ?1)每個節點可能會同時打開多個文件,每次打開文件都會占用一定內存。假設每個Write?Handler的默認需要100KB的內存,那么同時打開這些文件需要50GB的內存,對于一個集群來說,還是有一定的壓力的。尤其是如果Shuffle?Map?Task和下游的Task同時增大10倍,那么整體的內存就增長到5TB。
?? ??? ??? ??? ??? ?2)從整體的角度來看,打開多個文件對于系統來說意味著隨機讀,尤其是每個文件比較小但是數量非常多的情況。而現在機械硬盤在隨機讀方面的性能特別差,非常容易成為性能的瓶頸。如果集群依賴的是固態硬盤,也許情況會改善很多,但是隨機寫的性能肯定不如順序寫的。
?? ??? ??? ??? ?Hash?Based?Shuffle的每個Mapper都需要為每個Reducer寫一個文件,供Reducer讀取,即需要產生M*R個數量的文件,如果Mapper和Reducer的數量比較大,產生的文件數會非常多。
?? ??? ??? ?2)Sort?Based Shuffle?Write
?? ??? ??? ??? ?Spark?Core的一個重要的升級就是將默認的Hash?Based?Shuffle換成了Sort?Based?Shuffle,即spark.shuffle.manager從Hash換成了Sort
?? ??? ??? ??? ?對應的實現類分別是
?? ??? ??? ??? ??? ?org.apache.spark.shuffle.hash.HashShuffleManager
?? ??? ??? ??? ??? ?org.apache.spark.shuffle.sort.SortShuffleManager。
?? ??? ??? ??? ?Sort?Based?Shuffle的模式是:每個Shuffle?Map?Task不會為每個Reducer生成一個單獨的文件;相反,它會將所有的結果寫到一個文件里,同時會生成一個Index文件,
?? ??? ??? ??? ?Reducer可以通過這個Index文件取得它需要處理的數據。避免產生大量文件的直接收益就是節省了內存的使用和順序Disk?IO帶來的低延時。節省內存的使用可以減少GC的風險和頻率。而減少文件的數量可以避免同時寫多個文件給系統帶來的壓力。
?? ??? ??? ??? ?Sort?Based?Write實現詳解
?? ??? ??? ??? ??? ?Shuffle?Map?Task會按照key相對應的Partition?ID進行Sort,其中屬于同一個Partition的key不會Sort。因為對于不需要Sort的操作來說,這個Sort是負收益的;要知道之前Spark剛開始使用Hash?Based的Shuffle而不是Sort?Based就是為了避免Hadoop?Map?Reduce對于所有計算都會Sort的性能損耗。對于那些需要Sort的運算,
?? ??? ??? ??? ??? ?比如sortByKey,這個Sort在Spark?1.2.0里還是由Reducer完成的。
?? ??? ??? ??? ??? ?①答出shuffle的定義
?? ??? ??? ??? ??? ?②spark shuffle的特點
?? ??? ??? ??? ??? ?③spark shuffle的目的
?? ??? ??? ??? ??? ?④spark shuffel的實現類,即對應優缺點
?? ??? ?Shuffle 相關參數配置
?? ??? ??? ?Shuffle是Spark?Core比較復雜的模塊,它也是非常影響性能的操作之一。
?? ??? ??? ?1)spark.shuffle.manager
?? ??? ??? ??? ?兩種方式的Shuffle 即Hash?Based?Shuffle和Sort?Based?Shuffle
?? ??? ??? ?2)spark.shuffle.spill
?? ??? ??? ??? ?這個參數的默認值是true,用于指定Shuffle過程中如果內存中的數據超過閾值(參考spark.shuffle.memoryFraction的設置)時是否需要將部分數據臨時寫入外部存儲。
?? ??? ??? ??? ?如果設置為false,那么這個過程就會一直使用內存,會有內存溢出的風險。因此只有在確定內存足夠使用時,才可以將這個選項設置為false。
?? ??? ??? ?3)spark.shuffle.memoryFraction
?? ??? ??? ??? ?在啟用spark.shuffle.spill的情況下,spark.shuffle.memoryFraction決定了當Shuffle過程中使用的內存達到總內存多少比例的時候開始spill。在Spark?1.2.0里,這個值是0.2
?? ??? ??? ??? ?此參數可以適當調大,可以控制在0.4~0.6。
?? ??? ??? ??? ?通過這個參數可以設置Shuffle過程占用內存的大小,它直接影響了寫入到外部存儲的頻率和垃圾回收的頻率。
?? ??? ??? ??? ?可以適當調大此值,可以減少磁盤I/O次數。
?? ??? ??? ?4)spark.shuffle.blockTransferService
?? ??? ??? ??? ?在Spark?1.2.0中這個配置的默認值是netty,而在之前的版本中是nio。它主要是用于在各個Executor之間傳輸Shuffle數據。netty的實現更加簡潔,但實際上用戶不用太關心這個選項。除非有特殊需求,否則采用默認配置即可。
?? ??? ??? ?5)spark.shuffle.consolidateFiles
?? ??? ??? ??? ?這個配置的默認值是false。主要是為了解決在Hash?Based?Shuffle過程中產生過多文件的問題。如果配置選項為true,那么對于同一個Core上運行的Shuffle?Map?Task不會產生一個新的Shuffle文件而是重用原來的
?? ??? ??? ?6)spark.shuffle.compress和spark.shuffle.spill.compress
?? ??? ??? ??? ?這兩個參數的默認配置都是true。都是用來設置Shuffle過程中是否對Shuffle數據進行壓縮
?? ??? ??? ??? ?前者針對最終寫入本地文件系統的輸出文件
?? ??? ??? ??? ?后者針對在處理過程需要寫入到外部存儲的中間數據,即針對最終的shuffle輸出文件。
?? ??? ??? ?7)spark.reducer.maxMbInFlight
?? ??? ??? ??? ?這個參數用于限制一個Result?Task向其他的Executor請求Shuffle數據時所占用的最大內存數,默認是64MB。尤其是如果網卡是千兆和千兆以下的網卡時。默認值是 設置這個值需要綜合考慮網卡帶寬和內存。
?? ?Spark調優
?? ??? ?更好的序列化實現
?? ??? ??? ?Spark用到序列化的地方
?? ??? ??? ??? ?1)Shuffle時需要將對象寫入到外部的臨時文件。
?? ??? ??? ??? ?2)每個Partition中的數據要發送到worker上,spark先把RDD包裝成task對象,將task通過網絡發給worker。
?? ??? ??? ??? ?3)RDD如果支持內存+硬盤,只要往硬盤中寫數據也會涉及序列化。
?? ??? ??? ?默認使用的是java的序列化。但java的序列化有兩個問題,一個是性能相對比較低,另外它序列化完二進制的內容長度也比較大,造成網絡傳輸時間拉長。業界現在有很多更好的實現,如kryo,比java的序列化快10倍以上。而且生成內容長度也短。時間快,空間小,自然選擇它了。
?? ??? ?通過代碼使用Kryo
?? ??? ?配置多臨時文件目錄
?? ??? ??? ?spark.local.dir參數。當shuffle、歸并排序(sort、merge)時都會產生臨時文件。這些臨時文件都在這個指定的目錄下。那這個文件夾有很多臨時文件,如果都發生讀寫操作,有的線程在讀這個文件,有的線程在往這個文件里寫,磁盤I/O性能就非常低。
?? ??? ??? ?可以創建多個文件夾,每個文件夾都對應一個真實的硬盤。假如原來是3個程序同時讀寫一個硬盤,效率肯定低,現在讓三個程序分別讀取3個磁盤,這樣沖突減少,效率就提高了。這樣就有效提高外部文件讀和寫的效率。怎么配置呢?只需要在這個配置時配置多個路徑就可以。中間用逗號分隔。
?? ??? ??? ?spark.local.dir=/home/tmp,/home/tmp2
?? ??? ?啟用推測執行機制
?? ??? ??? ?可以設置spark.speculation? true
?? ??? ??? ?開啟后,spark會檢測執行較慢的Task,并復制這個Task在其他節點運行,最后哪個節點先運行完,就用其結果,然后將慢Task 殺死
?? ??? ?collect速度慢
?? ??? ??? ?collect只適合在測試時,因為把結果都收集到Driver服務器上,數據要跨網絡傳輸,同時要求Driver服務器內存大,所以收集過程慢。解決辦法就是直接輸出到分布式文件系統中。
?? ??? ?有些情況下,RDD操作使用MapPartitions替代map
?? ??? ??? ?map方法對RDD的每一條記錄逐一操作。mapPartitions是對RDD里的每個分區操作
?? ??? ??? ?rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
?? ??? ??? ?這樣頻繁的鏈接、斷開數據庫,效率差。
?? ??? ??? ?rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
?? ??? ??? ?這樣就一次鏈接一次斷開,中間批量操作,效率提升。
?? ??? ?Spark的GC調優
?? ??? ??? ?由于Spark立足于內存計算,常常需要在內存中存放大量數據,因此也更依賴JVM的垃圾回收機制(GC)。并且同時,它也支持兼容批處理和流式處理,對于程序吞吐量和延遲都有較高要求,因此GC參數的調優在Spark應用實踐中顯得尤為重要。
?? ??? ??? ?主要有兩種策略——Parallel?GC(吞吐量優先)和CMS?GC(低延遲響應)。
?? ??? ??? ?GC算法原理
?? ??? ??? ??? ?對于內存較大的環境非常友好。因為G1 GC對于內存的使用率特別高,內存越大,此優勢越明顯。
?? ??? ??? ?選擇垃圾收集器
?? ??? ??? ??? ?park默認使用的是Parallel?GC。經調研我們發現,Parallel?GC常常受困于Full?GC,而每次Full?GC都給性能帶來了較大的下降。而Parallel?GC可以進行參數調優的空間也非常有限,我們只能通過調節一些基本參數來提高性能,如各年代分區大小比例、進入老年代前的拷貝次數等。而且這些調優策略只能推遲Full?GC的到來,如果是長期運行的應用,Parallel?GC調優的意義就非常有限了。
?? ??? ??? ?將InitiatingHeapOccupancyPercent參數調低(默認值是45),可以使G1 GC收集器更早開始Mixed GC(Minor GC);但另一方面,會增加GC發生頻率。(啟動并發GC周期時的堆內存占用百分比. G1之類的垃圾收集器用它來觸發并發GC周期,基于整個堆的使用率,而不只是某一代內存的使用比. 值為 0 則表示"一直執行GC循環". 默認值為 45.)降低此值,會提高Minor GC的頻率,但是會推遲Full GC的到來。
?? ??? ??? ?提高ConcGCThreads的值,在Mixed GC階段投入更多的并發線程,爭取提高每次暫停的效率。但是此參數會占用一定的有效工作線程資源。
?? ??? ??? ?調試這兩個參數可以有效降低Full GC出現的概率。Full GC被消除之后,最終的性能獲得了大幅提升。
?? ??? ?Spark的內存管理
?? ??? ??? ?Spark的核心概念是RDD,實際運行中內存消耗都與RDD密切相關。Spark允許用戶將應用中重復使用的RDD數據持久化緩存起來,從而避免反復計算的開銷,而RDD的持久化形態之一就是將全部或者部分數據緩存在JVM的Heap中。當我們觀察到GC延遲影響效率時,應當先檢查Spark應用本身是否有效利用有限的內存空間。RDD占用的內存空間比較少的話,程序運行的heap空間也會比較寬松,GC效率也會相應提高;而RDD如果占用大量空間的話,則會帶來巨大的性能損失
?? ??? ?總結
?? ??? ??? ?對于大量依賴于內存計算的Spark應用,GC調優顯得尤為重要。在發現GC問題的時候,不要著急調試GC。而是先考慮是否存在Spark進程內存管理的效率問題,例如RDD緩存的持久化和釋放。至于GC參數的調試,首先我們比較推薦使用G1 GC來運行Spark應用。相較于傳統的垃圾收集器,隨著G1的不斷成熟,需要配置的選項會更少,能同時滿足高吞吐量和低延遲的尋求。當然,GC的調優不是絕對的,不同的應用會有不同應用的特性,掌握根據GC日志進行調優的方法,才能以不變應萬變。最后,也不能忘了先對程序本身的邏輯和代碼編寫進行考量,例如減少中間變量的創建或者復制,控制大對象的創建,將長期存活對象放在Off-heap中等等。
?? ?Checkpoint機制
?? ??? ?checkpoint的意思就是建立檢查點,類似于快照,例如在spark計算里面 計算流程DAG特別長,服務器需要將整個DAG計算完成得出結果,但是如果在這很長的計算流程中突然中間算出的數據丟失了,spark又會根據RDD的依賴關系從頭到尾計算一遍,這樣子就很費性能,當然我們可以將中間的計算結果通過cache或者persist放到內存或者磁盤中,但是這樣也不能保證數據完全不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間數據做一個檢查點將結果存儲到一個高可用的地方
?? ??? ?總結:Spark的CheckPoint機制很重要,也很常用,尤其在機器學習中的一些迭代算法中很常見。比如一個算法迭代10000次,如果不適用緩沖機制,如果某分區數據丟失,會導致整個計算鏈重新計算,所以引入緩存機制。但是光引入緩存,也不完全可靠,比如緩存丟失或緩存存儲不下,也會導致重新計算,所以使用CheckPoint機制再做一層保證。
?? ??? ?補充:檢查目錄的路徑,一般都是設置到HDFS上
?? ??? ?Spark懶執行的意義
?? ??? ??? ?Spark中,Transformation方法都是懶操作方法,比如map,flatMap,reduceByKey等。當觸發某個Action操作時才真正執行。
?? ??? ??? ?懶操作的意義:
?? ??? ??? ??? ?①不運行job就觸發計算,避免了大量的無意義的計算,即避免了大量的無意義的中間結果的產生,即避免產生無意義的磁盤I/O及網絡傳輸
?? ??? ??? ??? ?②更深層次的意義在于,執行運算時,看到之前的計算操作越多,執行優化的可能性就越高
?? ?Spark共享變量
?? ??? ?Spark程序的大部分操作都是RDD操作,通過傳入函數給RDD操作函數來計算。這些函數在不同的節點上并發執行,但每個內部的變量有不同的作用域,不能相互訪問,所以有時會不太方便,Spark提供了兩類共享變量供編程使用——廣播變量和計數器
?? ??? ?1.?廣播變量
?? ??? ??? ?這是一個只讀對象,在所有節點上都有一份緩存,創建方法是SparkContext.broadcast()
?? ??? ??? ?注意,廣播變量是只讀的,所以創建之后再更新它的值是沒有意義的,一般用val修飾符來定義廣播變量。
?? ??? ?2.?計數器
?? ??? ??? ?計數器只能增加,是共享變量,用于計數或求和。
?? ??? ??? ?計數器變量的創建方法是SparkContext.accumulator(v,?name),其中v是初始值,name是名稱。
?? ?spark解決數據傾斜問題
?? ??? ?將少量的數據轉化為Map進行廣播,廣播會將此 Map 發送到每個節點中,如果不進行廣播,每個task執行時都會去獲取該Map數據,造成了性能浪費。
轉載于:https://www.cnblogs.com/Striverchen/p/10557905.html
總結
以上是生活随笔為你收集整理的Spark Core的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux01
- 下一篇: 计算机考研2017真题408,2017计