日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Spark性能相关参数配置详解

發(fā)布時間:2024/1/17 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark性能相关参数配置详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

隨著Spark的逐漸成熟完善,?越來越多的可配置參數(shù)被添加到Spark中來,?本文試圖通過闡述這其中部分參數(shù)的工作原理和配置思路,?和大家一起探討一下如何根據(jù)實際場合對Spark進(jìn)行配置優(yōu)化。

?

由于篇幅較長,所以在這里分篇組織,如果要看最新完整的網(wǎng)頁版內(nèi)容,可以戳這里:http://spark-config.readthedocs.org/,主要是便于更新內(nèi)容

?

schedule調(diào)度相關(guān)

?

調(diào)度相關(guān)的參數(shù)設(shè)置,大多數(shù)內(nèi)容都很直白,其實無須過多的額外解釋,不過基于這些參數(shù)的常用性(大概會是你針對自己的集群第一步就會配置的參數(shù)),這里多少就其內(nèi)部機制做一些解釋。

?

spark.cores.max

?

一個集群最重要的參數(shù)之一,當(dāng)然就是CPU計算資源的數(shù)量。spark.cores.max這個參數(shù)決定了在Standalone和Mesos模式下,一個Spark應(yīng)用程序所能申請的CPU Core的數(shù)量。如果你沒有并發(fā)跑多個Spark應(yīng)用程序的需求,那么可以不需要設(shè)置這個參數(shù),默認(rèn)會使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值默認(rèn)為Int.Max,也就是不限制的意思)從而應(yīng)用程序可以使用所有當(dāng)前可以獲得的CPU資源。

?

針對這個參數(shù)需要注意的是,這個參數(shù)對Yarn模式不起作用,YARN模式下,資源由Yarn統(tǒng)一調(diào)度管理,一個應(yīng)用啟動時所申請的CPU資源的數(shù)量由另外兩個直接配置Executor的數(shù)量和每個Executor中core數(shù)量的參數(shù)決定。(歷史原因造成,不同運行模式下的一些啟動參數(shù)個人認(rèn)為還有待進(jìn)一步整合)

?

此外,在Standalone模式等后臺分配CPU資源時,目前的實現(xiàn)中,在spark.cores.max允許的范圍內(nèi),基本上是優(yōu)先從每個Worker中申請所能得到的最大數(shù)量的CPU core給每個Executor,因此如果人工限制了所申請的Max Core的數(shù)量小于Standalone和Mesos模式所管理的CPU數(shù)量,可能發(fā)生應(yīng)用只運行在集群中部分節(jié)點上的情況(因為部分節(jié)點所能提供的最大CPU資源數(shù)量已經(jīng)滿足應(yīng)用的要求),而不是平均分布在集群中。通常這不會是太大的問題,但是如果涉及數(shù)據(jù)本地性的場合,有可能就會帶來一定的必須進(jìn)行遠(yuǎn)程數(shù)據(jù)讀取的情況發(fā)生。理論上,這個問題可以通過兩種途徑解決:一是Standalone和Mesos的資源管理模塊自動根據(jù)節(jié)點資源情況,均勻分配和啟動Executor,二是和Yarn模式一樣,允許用戶指定和限制每個Executor的Core的數(shù)量。社區(qū)中有一個PR試圖走第二種途徑來解決類似的問題,不過截至我寫下這篇文檔為止(2014.8),還沒有被Merge。

?

spark.task.cpus

?

這個參數(shù)在字面上的意思就是分配給每個任務(wù)的CPU的數(shù)量,默認(rèn)為1。實際上,這個參數(shù)并不能真的控制每個任務(wù)實際運行時所使用的CPU的數(shù)量,比如你可以通過在任務(wù)內(nèi)部創(chuàng)建額外的工作線程來使用更多的CPU(至少目前為止,將來任務(wù)的執(zhí)行環(huán)境是否能通過LXC等技術(shù)來控制還不好說)。它所發(fā)揮的作用,只是在作業(yè)調(diào)度時,每分配出一個任務(wù)時,對已使用的CPU資源進(jìn)行計數(shù)。也就是說只是理論上用來統(tǒng)計資源的使用情況,便于安排調(diào)度。因此,如果你期望通過修改這個參數(shù)來加快任務(wù)的運行,那還是趕緊換個思路吧。這個參數(shù)的意義,個人覺得還是在你真的在任務(wù)內(nèi)部自己通過任何手段,占用了更多的CPU資源時,讓調(diào)度行為更加準(zhǔn)確的一個輔助手段。

?

?

spark.scheduler.mode

?

這個參數(shù)決定了單個Spark應(yīng)用內(nèi)部調(diào)度的時候使用FIFO模式還是Fair模式。是的,你沒有看錯,這個參數(shù)只管理一個Spark應(yīng)用內(nèi)部的多個沒有依賴關(guān)系的Job作業(yè)的調(diào)度策略。

?

如果你需要的是多個Spark應(yīng)用之間的調(diào)度策略,那么在Standalone模式下,這取決于每個應(yīng)用所申請和獲得的CPU資源的數(shù)量(暫時沒有獲得資源的應(yīng)用就Pending在那里了),基本上就是FIFO形式的,誰先申請和獲得資源,誰就占用資源直到完成。而在Yarn模式下,則多個Spark應(yīng)用間的調(diào)度策略由Yarn自己的策略配置文件所決定。

?

那么這個內(nèi)部的調(diào)度邏輯有什么用呢?如果你的Spark應(yīng)用是通過服務(wù)的形式,為多個用戶提交作業(yè)的話,那么可以通過配置Fair模式相關(guān)參數(shù)來調(diào)整不同用戶作業(yè)的調(diào)度和資源分配優(yōu)先級。

?

?

spark.locality.wait

?

spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node,spark.locality.wait.rack這幾個參數(shù)影響了任務(wù)分配時的本地性策略的相關(guān)細(xì)節(jié)。

?

Spark中任務(wù)的處理需要考慮所涉及的數(shù)據(jù)的本地性的場合,基本就兩種,一是數(shù)據(jù)的來源是HadoopRDD;二是RDD的數(shù)據(jù)來源來自于RDD Cache(即由CacheManager從BlockManager中讀取,或者Streaming數(shù)據(jù)源RDD)。其它情況下,如果不涉及shuffle操作的RDD,不構(gòu)成劃分Stage和Task的基準(zhǔn),不存在判斷Locality本地性的問題,而如果是ShuffleRDD,其本地性始終為No Prefer,因此其實也無所謂Locality。

?

在理想的情況下,任務(wù)當(dāng)然是分配在可以從本地讀取數(shù)據(jù)的節(jié)點上時(同一個JVM內(nèi)部或同一臺物理機器內(nèi)部)的運行時性能最佳。但是每個任務(wù)的執(zhí)行速度無法準(zhǔn)確估計,所以很難在事先獲得全局最優(yōu)的執(zhí)行策略,當(dāng)Spark應(yīng)用得到一個計算資源的時候,如果沒有可以滿足最佳本地性需求的任務(wù)可以運行時,是退而求其次,運行一個本地性條件稍差一點的任務(wù)呢,還是繼續(xù)等待下一個可用的計算資源已期望它能更好的匹配任務(wù)的本地性呢?

?

這幾個參數(shù)一起決定了Spark任務(wù)調(diào)度在得到分配任務(wù)時,選擇暫時不分配任務(wù),而是等待獲得滿足進(jìn)程內(nèi)部/節(jié)點內(nèi)部/機架內(nèi)部這樣的不同層次的本地性資源的最長等待時間。默認(rèn)都是3000毫秒。

?

基本上,如果你的任務(wù)數(shù)量較大和單個任務(wù)運行時間比較長的情況下,單個任務(wù)是否在數(shù)據(jù)本地運行,代價區(qū)別可能比較顯著,如果數(shù)據(jù)本地性不理想,那么調(diào)大這些參數(shù)對于性能優(yōu)化可能會有一定的好處。反之如果等待的代價超過帶來的收益,那就不要考慮了。

?

特別值得注意的是:在處理應(yīng)用剛啟動后提交的第一批任務(wù)時,由于當(dāng)作業(yè)調(diào)度模塊開始工作時,處理任務(wù)的Executors可能還沒有完全注冊完畢,因此一部分的任務(wù)會被放置到No Prefer的隊列中,這部分任務(wù)的優(yōu)先級僅次于數(shù)據(jù)本地性滿足Process級別的任務(wù),從而被優(yōu)先分配到非本地節(jié)點執(zhí)行,如果的確沒有Executors在對應(yīng)的節(jié)點上運行,或者的確是No Prefer的任務(wù)(如shuffleRDD),這樣做確實是比較優(yōu)化的選擇,但是這里的實際情況只是這部分Executors還沒來得及注冊上而已。這種情況下,即使加大本節(jié)中這幾個參數(shù)的數(shù)值也沒有幫助。針對這個情況,有一些已經(jīng)完成的和正在進(jìn)行中的PR通過例如動態(tài)調(diào)整No Prefer隊列,監(jiān)控節(jié)點注冊比例等等方式試圖來給出更加智能的解決方案。不過,你也可以根據(jù)自身集群的啟動情況,通過在創(chuàng)建SparkContext之后,主動Sleep幾秒的方式來簡單的解決這個問題。

?

spark.speculation

?

spark.speculation以及spark.speculation.interval,spark.speculation.quantile, spark.speculation.multiplier等參數(shù)調(diào)整Speculation行為的具體細(xì)節(jié),Speculation是在任務(wù)調(diào)度的時候,如果沒有適合當(dāng)前本地性要求的任務(wù)可供運行,將跑得慢的任務(wù)在空閑計算資源上再度調(diào)度的行為,這些參數(shù)調(diào)整這些行為的頻率和判斷指標(biāo),默認(rèn)是不使用Speculation的。

?

通常來說很難正確的判斷是否需要Speculation,能真正發(fā)揮Speculation用處的場合,往往是某些節(jié)點由于運行環(huán)境原因,比如CPU資源由于某種原因被占用,磁盤損壞導(dǎo)致IO緩慢造成任務(wù)執(zhí)行速度異常的情況,當(dāng)然前提是你的分區(qū)任務(wù)不存在僅能被執(zhí)行一次,或者不能同時執(zhí)行多個拷貝等情況。Speculation任務(wù)參照的指標(biāo)通常是其它任務(wù)的執(zhí)行時間,而實際的任務(wù)可能由于分區(qū)數(shù)據(jù)尺寸不均勻,本來就會有時間差異,加上一定的調(diào)度和IO的隨機性,所以如果一致性指標(biāo)定得過嚴(yán),Speculation可能并不能真的發(fā)現(xiàn)問題,反而增加了不必要的任務(wù)開銷,定得過寬,大概又基本相當(dāng)于沒用。

?

個人覺得,如果你的集群規(guī)模比較大,運行環(huán)境復(fù)雜,的確可能經(jīng)常發(fā)生執(zhí)行異常,加上數(shù)據(jù)分區(qū)尺寸差異不大,為了程序運行時間的穩(wěn)定性,那么可以考慮仔細(xì)調(diào)整這些參數(shù)。否則還是考慮如何排除造成任務(wù)執(zhí)行速度異常的因數(shù)比較靠鋪一些。

?

當(dāng)然,我沒有實際在很大規(guī)模的集群上運行過Spark,所以如果看法有些偏頗,還請有實際經(jīng)驗的XD指正。

?

壓縮和序列化相關(guān)

?

spark.serializer

?

默認(rèn)為org.apache.spark.serializer.JavaSerializer,可選org.apache.spark.serializer.KryoSerializer,實際上只要是org.apache.spark.serializer的子類就可以了,不過如果只是應(yīng)用,大概你不會自己去實現(xiàn)一個的。

?

序列化對于spark應(yīng)用的性能來說,還是有很大影響的,在特定的數(shù)據(jù)格式的情況下,KryoSerializer的性能可以達(dá)到JavaSerializer的10倍以上,當(dāng)然放到整個Spark程序中來考量,比重就沒有那么大了,但是以Wordcount為例,通常也很容易達(dá)到30%以上的性能提升。而對于一些Int之類的基本類型數(shù)據(jù),性能的提升就幾乎可以忽略了。KryoSerializer依賴Twitter的Chill庫來實現(xiàn),相對于JavaSerializer,主要的問題在于不是所有的Java Serializable對象都能支持。

?

需要注意的是,這里可配的Serializer針對的對象是Shuffle數(shù)據(jù),以及RDD Cache等場合,而Spark Task的序列化是通過spark.closure.serializer來配置,但是目前只支持JavaSerializer,所以等于沒法配置啦。

?

更多Kryo序列化相關(guān)優(yōu)化配置,可以參考http://spark.apache.org/docs/latest/tuning.html#data-serialization一節(jié)

?

?

spark.rdd.compress

?

這個參數(shù)決定了RDD Cache的過程中,RDD數(shù)據(jù)在序列化之后是否進(jìn)一步進(jìn)行壓縮再儲存到內(nèi)存或磁盤上。當(dāng)然是為了進(jìn)一步減小Cache數(shù)據(jù)的尺寸,對于Cache在磁盤上而言,絕對大小大概沒有太大關(guān)系,主要是考慮Disk的IO帶寬。而對于Cache在內(nèi)存中,那主要就是考慮尺寸的影響,是否能夠Cache更多的數(shù)據(jù),是否能減小Cache數(shù)據(jù)對GC造成的壓力等。

?

這兩者,前者通常不會是主要問題,尤其是在RDD Cache本身的目的就是追求速度,減少重算步驟,用IO換CPU的情況下。而后者,GC問題當(dāng)然是需要考量的,數(shù)據(jù)量小,占用空間少,GC的問題大概會減輕,但是是否真的需要走到RDDCache壓縮這一步,或許用其它方式來解決可能更加有效。

?

所以這個值默認(rèn)是關(guān)閉的,但是如果在磁盤IO的確成為問題或者GC問題真的沒有其它更好的解決辦法的時候,可以考慮啟用RDD壓縮。

?

?

spark.broadcast.compress

?

是否對Broadcast的數(shù)據(jù)進(jìn)行壓縮,默認(rèn)值為True。

?

Broadcast機制是用來減少運行每個Task時,所需要發(fā)送給TASK的RDD所使用到的相關(guān)數(shù)據(jù)的尺寸,一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數(shù)據(jù),之后的Task都從本地的BlockManager中獲取相關(guān)數(shù)據(jù)。在1.1最新版本的代碼中,RDD本身也改為以Broadcast的形式發(fā)送給Executor(之前的實現(xiàn)RDD本身是隨每個任務(wù)發(fā)送的),因此基本上不太需要顯式的決定哪些數(shù)據(jù)需要broadcast了。

?

因為Broadcast的數(shù)據(jù)需要通過網(wǎng)絡(luò)發(fā)送,而在Executor端又需要存儲在本地BlockMananger中,加上最新的實現(xiàn),默認(rèn)RDD通過Boradcast機制發(fā)送,因此大大增加了Broadcast變量的比重,所以通過壓縮減小尺寸,來減少網(wǎng)絡(luò)傳輸開銷和內(nèi)存占用,通常都是有利于提高整體性能的。

?

什么情況可能不壓縮更好呢,大致上個人覺得同樣還是在網(wǎng)絡(luò)帶寬和內(nèi)存不是問題的時候,如果Driver端CPU資源很成問題(畢竟壓縮的動作基本都在Driver端執(zhí)行),那或許有調(diào)整的必要。

?

?

spark.io.compression.codec

?

RDD Cache和Shuffle數(shù)據(jù)壓縮所采用的算法Codec,默認(rèn)值曾經(jīng)是使用LZF作為默認(rèn)Codec,最近因為LZF的內(nèi)存開銷的問題,默認(rèn)的Codec已經(jīng)改為Snappy。

?

LZF和Snappy相比較,前者壓縮率比較高(當(dāng)然要看具體數(shù)據(jù)內(nèi)容了,通常要高20%左右),但是除了內(nèi)存問題以外,CPU代價也大一些(大概也差20%~50%?)

?

在用于Shuffle數(shù)據(jù)的場合下,內(nèi)存方面,應(yīng)該主要是在使用HashShuffleManager的時候有可能成為問題,因為如果Reduce分區(qū)數(shù)量巨大,需要同時打開大量的壓縮數(shù)據(jù)流用于寫文件,進(jìn)而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由于shuffle文件數(shù)量大大減少,不會產(chǎn)生大量的壓縮數(shù)據(jù)流,所以內(nèi)存開銷大概不會成為主要問題。

?

剩下的就是CPU和壓縮率的權(quán)衡取舍,和前面一樣,取決于CPU/網(wǎng)絡(luò)/磁盤的能力和負(fù)載,個人認(rèn)為CPU通常更容易成為瓶頸。所以要調(diào)整性能,要不不壓縮,要不使用Snappy可能性大一些?

?

對于RDD Cache的場合來說,絕大多數(shù)場合都是內(nèi)存操作或者本地IO,所以CPU負(fù)載的問題可能比IO的問題更加突出,這也是為什么spark.rdd.compress本身默認(rèn)為不壓縮,如果要壓縮,大概也是Snappy合適一些?

?

Storage相關(guān)配置參數(shù)

?

spark.local.dir

?

這個看起來很簡單,就是Spark用于寫中間數(shù)據(jù),如RDD Cache,Shuffle,Spill等數(shù)據(jù)的位置,那么有什么可以注意的呢。

?

首先,最基本的當(dāng)然是我們可以配置多個路徑(用逗號分隔)到多個磁盤上增加整體IO帶寬,這個大家都知道。

?

其次,目前的實現(xiàn)中,Spark是通過對文件名采用hash算法分布到多個路徑下的目錄中去,如果你的存儲設(shè)備有快有慢,比如SSD+HDD混合使用,那么你可以通過在SSD上配置更多的目錄路徑來增大它被Spark使用的比例,從而更好地利用SSD的IO帶寬能力。當(dāng)然這只是一種變通的方法,終極解決方案還是應(yīng)該像目前HDFS的實現(xiàn)方向一樣,讓Spark能夠感知具體的存儲設(shè)備類型,針對性的使用。

?

需要注意的是,在Spark 1.0以后,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)參數(shù)會覆蓋這個配置。比如Spark On YARN的時候,Spark Executor的本地路徑依賴于Yarn的配置,而不取決于這個參數(shù)。

?

spark.executor.memory

?

Executor?內(nèi)存的大小,和性能本身當(dāng)然并沒有直接的關(guān)系,但是幾乎所有運行時性能相關(guān)的內(nèi)容都或多或少間接和內(nèi)存大小相關(guān)。這個參數(shù)最終會被設(shè)置到Executor的JVM的heap尺寸上,對應(yīng)的就是Xmx和Xms的值

?

理論上Executor?內(nèi)存當(dāng)然是多多益善,但是實際受機器配置,以及運行環(huán)境,資源共享,JVM GC效率等因素的影響,還是有可能需要為它設(shè)置一個合理的大小。多大算合理,要看實際情況

?

Executor的內(nèi)存基本上是Executor內(nèi)部所有任務(wù)共享的,而每個Executor上可以支持的任務(wù)的數(shù)量取決于Executor所管理的CPU Core資源的多少,因此你需要了解每個任務(wù)的數(shù)據(jù)規(guī)模的大小,從而推算出每個Executor大致需要多少內(nèi)存即可滿足基本的需求。

?

如何知道每個任務(wù)所需內(nèi)存的大小呢,這個很難統(tǒng)一的衡量,因為除了數(shù)據(jù)集本身的開銷,還包括算法所需各種臨時內(nèi)存空間的使用,而根據(jù)具體的代碼算法等不同,臨時內(nèi)存空間的開銷也不同。但是數(shù)據(jù)集本身的大小,對最終所需內(nèi)存的大小還是有一定的參考意義的。

?

通常來說每個分區(qū)的數(shù)據(jù)集在內(nèi)存中的大小,可能是其在磁盤上源數(shù)據(jù)大小的若干倍(不考慮源數(shù)據(jù)壓縮,Java對象相對于原始裸數(shù)據(jù)也還要算上用于管理數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)的額外開銷),需要準(zhǔn)確的知道大小的話,可以將RDD cache在內(nèi)存中,從BlockManager的Log輸出可以看到每個Cache分區(qū)的大小(其實也是估算出來的,并不完全準(zhǔn)確)

?

如: BlockManagerInfo: Added rdd_0_1?on disk on sr438:41134(size: 495.3 MB)

?

反過來說,如果你的Executor的數(shù)量和內(nèi)存大小受機器物理配置影響相對固定,那么你就需要合理規(guī)劃每個分區(qū)任務(wù)的數(shù)據(jù)規(guī)模,例如采用更多的分區(qū),用增加任務(wù)數(shù)量(進(jìn)而需要更多的批次來運算所有的任務(wù))的方式來減小每個任務(wù)所需處理的數(shù)據(jù)大小。

?

spark.storage.memoryFraction

?

如前面所說spark.executor.memory決定了每個Executor可用內(nèi)存的大小,而spark.storage.memoryFraction則決定了在這部分內(nèi)存中有多少可以用于Memory Store管理RDD Cache數(shù)據(jù),剩下的內(nèi)存用來保證任務(wù)運行時各種其它內(nèi)存空間的需要。

?

spark.executor.memory默認(rèn)值為0.6,官方文檔建議這個比值不要超過JVM Old Gen區(qū)域的比值。這也很容易理解,因為RDD Cache數(shù)據(jù)通常都是長期駐留內(nèi)存的,理論上也就是說最終會被轉(zhuǎn)移到Old Gen區(qū)域(如果該RDD還沒有被刪除的話),如果這部分?jǐn)?shù)據(jù)允許的尺寸太大,勢必把Old Gen區(qū)域占滿,造成頻繁的FULL GC。

?

如何調(diào)整這個比值,取決于你的應(yīng)用對數(shù)據(jù)的使用模式和數(shù)據(jù)的規(guī)模,粗略的來說,如果頻繁發(fā)生Full GC,可以考慮降低這個比值,這樣RDD Cache可用的內(nèi)存空間減少(剩下的部分Cache數(shù)據(jù)就需要通過Disk Store寫到磁盤上了),會帶來一定的性能損失,但是騰出更多的內(nèi)存空間用于執(zhí)行任務(wù),減少Full GC發(fā)生的次數(shù),反而可能改善程序運行的整體性能

?

spark.streaming.blockInterval

?

這個參數(shù)用來設(shè)置Spark Streaming里Stream Receiver生成Block的時間間隔,默認(rèn)為200ms。具體的行為表現(xiàn)是具體的Receiver所接收的數(shù)據(jù),每隔這里設(shè)定的時間間隔,就從Buffer中生成一個StreamBlock放進(jìn)隊列,等待進(jìn)一步被存儲到BlockManager中供后續(xù)計算過程使用。理論上來說,為了每個StreamingBatch間隔里的數(shù)據(jù)是均勻的,這個時間間隔當(dāng)然應(yīng)該能被Batch的間隔時間長度所整除。總體來說,如果內(nèi)存大小夠用,Streaming的數(shù)據(jù)來得及處理,這個blockInterval時間間隔的影響不大,當(dāng)然,如果數(shù)據(jù)Cache Level是Memory+Ser,即做了序列化處理,那么BlockInterval的大小會影響序列化后數(shù)據(jù)塊的大小,對于Java的GC的行為會有一些影響。

?

此外spark.streaming.blockQueueSize決定了在StreamBlock被存儲到BlockMananger之前,隊列中最多可以容納多少個StreamBlock。默認(rèn)為10,因為這個隊列Poll的時間間隔是100ms,所以如果CPU不是特別繁忙的話,基本上應(yīng)該沒有問題。

?

Shuffle相關(guān)

?

Shuffle操作大概是對Spark性能影響最大的步驟之一(因為可能涉及到排序,磁盤IO,網(wǎng)絡(luò)IO等眾多CPU或IO密集的操作),這也是為什么在Spark 1.1的代碼中對整個Shuffle框架代碼進(jìn)行了重構(gòu),將Shuffle相關(guān)讀寫操作抽象封裝到Pluggable的Shuffle Manager中,便于試驗和實現(xiàn)不同的Shuffle功能模塊。例如為了解決Hash Based的Shuffle Manager在文件讀寫效率方面的問題而實現(xiàn)的Sort Base的Shuffle Manager。

?

spark.shuffle.manager

?

用來配置所使用的Shuffle Manager,目前可選的Shuffle Manager包括默認(rèn)的org.apache.spark.shuffle.sort.HashShuffleManager(配置參數(shù)值為hash)和新的org.apache.spark.shuffle.sort.SortShuffleManager(配置參數(shù)值為sort)。

?

這兩個ShuffleManager如何選擇呢,首先需要了解他們在實現(xiàn)方式上的區(qū)別。

?

HashShuffleManager,故名思義也就是在Shuffle的過程中寫數(shù)據(jù)時不做排序操作,只是將數(shù)據(jù)根據(jù)Hash的結(jié)果,將各個Reduce分區(qū)的數(shù)據(jù)寫到各自的磁盤文件中。帶來的問題就是如果Reduce分區(qū)的數(shù)量比較大的話,將會產(chǎn)生大量的磁盤文件。如果文件數(shù)量特別巨大,對文件讀寫的性能會帶來比較大的影響,此外由于同時打開的文件句柄數(shù)量眾多,序列化,以及壓縮等操作需要分配的臨時內(nèi)存空間也可能會迅速膨脹到無法接受的地步,對內(nèi)存的使用和GC帶來很大的壓力,在Executor內(nèi)存比較小的情況下尤為突出,例如Spark on Yarn模式。

?

SortShuffleManager,是1.1版本之后實現(xiàn)的一個試驗性(也就是一些功能和接口還在開發(fā)演變中)的ShuffleManager,它在寫入分區(qū)數(shù)據(jù)的時候,首先會根據(jù)實際情況對數(shù)據(jù)采用不同的方式進(jìn)行排序操作,底線是至少按照Reduce分區(qū)Partition進(jìn)行排序,這樣來至于同一個Map任務(wù)Shuffle到不同的Reduce分區(qū)中去的所有數(shù)據(jù)都可以寫入到同一個外部磁盤文件中去,用簡單的Offset標(biāo)志不同Reduce分區(qū)的數(shù)據(jù)在這個文件中的偏移量。這樣一個Map任務(wù)就只需要生成一個shuffle文件,從而避免了上述HashShuffleManager可能遇到的文件數(shù)量巨大的問題

?

兩者的性能比較,取決于內(nèi)存,排序,文件操作等因素的綜合影響。

?

對于不需要進(jìn)行排序的Shuffle操作來說,如repartition等,如果文件數(shù)量不是特別巨大,HashShuffleManager面臨的內(nèi)存問題不大,而SortShuffleManager需要額外的根據(jù)Partition進(jìn)行排序,顯然HashShuffleManager的效率會更高。

?

而對于本來就需要在Map端進(jìn)行排序的Shuffle操作來說,如ReduceByKey等,使用HashShuffleManager雖然在寫數(shù)據(jù)時不排序,但在其它的步驟中仍然需要排序,而SortShuffleManager則可以將寫數(shù)據(jù)和排序兩個工作合并在一起執(zhí)行,因此即使不考慮HashShuffleManager的內(nèi)存使用問題,SortShuffleManager依舊可能更快。

?

spark.shuffle.sort.bypassMergeThreshold

?

這個參數(shù)僅適用于SortShuffleManager,如前所述,SortShuffleManager在處理不需要排序的Shuffle操作時,由于排序帶來性能的下降。這個參數(shù)決定了在這種情況下,當(dāng)Reduce分區(qū)的數(shù)量小于多少的時候,在SortShuffleManager內(nèi)部不使用Merge Sort的方式處理數(shù)據(jù),而是與Hash Shuffle類似,直接將分區(qū)文件寫入單獨的文件,不同的是,在最后一步還是會將這些文件合并成一個單獨的文件。這樣通過去除Sort步驟來加快處理速度,代價是需要并發(fā)打開多個文件,所以內(nèi)存消耗量增加,本質(zhì)上是相對HashShuffleMananger一個折衷方案。這個參數(shù)的默認(rèn)值是200個分區(qū),如果內(nèi)存GC問題嚴(yán)重,可以降低這個值。

?

spark.shuffle.consolidateFiles

?

這個配置參數(shù)僅適用于HashShuffleMananger的實現(xiàn),同樣是為了解決生成過多文件的問題,采用的方式是在不同批次運行的Map任務(wù)之間重用Shuffle輸出文件,也就是說合并的是不同批次的Map任務(wù)的輸出數(shù)據(jù),但是每個Map任務(wù)所需要的文件還是取決于Reduce分區(qū)的數(shù)量,因此,它并不減少同時打開的輸出文件的數(shù)量,因此對內(nèi)存使用量的減少并沒有幫助。只是HashShuffleManager里的一個折中的解決方案。

?

需要注意的是,這部分的代碼實現(xiàn)盡管原理上說很簡單,但是涉及到底層具體的文件系統(tǒng)的實現(xiàn)和限制等因素,例如在并發(fā)訪問等方面,需要處理的細(xì)節(jié)很多,因此一直存在著這樣那樣的bug或者問題,導(dǎo)致在例如EXT3上使用時,特定情況下性能反而可能下降,因此從Spark 0.8的代碼開始,一直到Spark 1.1的代碼為止也還沒有被標(biāo)志為Stable,不是默認(rèn)采用的方式。此外因為并不減少同時打開的輸出文件的數(shù)量,因此對性能具體能帶來多大的改善也取決于具體的文件數(shù)量的情況。所以即使你面臨著Shuffle文件數(shù)量巨大的問題,這個配置參數(shù)是否使用,在什么版本中可以使用,也最好還是實際測試以后再決定。

?

spark.shuffle.spill

?

shuffle的過程中,如果涉及到排序,聚合等操作,勢必會需要在內(nèi)存中維護(hù)一些數(shù)據(jù)結(jié)構(gòu),進(jìn)而占用額外的內(nèi)存。如果內(nèi)存不夠用怎么辦,那只有兩條路可以走,一就是out of memory出錯了,二就是將部分?jǐn)?shù)據(jù)臨時寫到外部存儲設(shè)備中去,最后再合并到最終的Shuffle輸出文件中去。

?

這里spark.shuffle.spill決定是否Spill到外部存儲設(shè)備(默認(rèn)打開),如果你的內(nèi)存足夠使用,或者數(shù)據(jù)集足夠小,當(dāng)然也就不需要Spill,畢竟Spill帶來了額外的磁盤操作。

?

spark.shuffle.memoryFraction/ spark.shuffle.safetyFraction

?

在啟用Spill的情況下,spark.shuffle.memoryFraction(1.1后默認(rèn)為0.2)決定了當(dāng)Shuffle過程中使用的內(nèi)存達(dá)到總內(nèi)存多少比例的時候開始Spill。

?

通過spark.shuffle.memoryFraction可以調(diào)整Spill的觸發(fā)條件,即Shuffle占用內(nèi)存的大小,進(jìn)而調(diào)整Spill的頻率和GC的行為。總的來說,如果Spill太過頻繁,可以適當(dāng)增加spark.shuffle.memoryFraction的大小,增加用于Shuffle的內(nèi)存,減少Spill的次數(shù)。當(dāng)然這樣一來為了避免內(nèi)存溢出,對應(yīng)的可能需要減少RDD cache占用的內(nèi)存,即減小spark.storage.memoryFraction的值,這樣RDD cache的容量減少,有可能帶來性能影響,因此需要綜合考慮。

?

由于Shuffle數(shù)據(jù)的大小是估算出來的,一來為了降低開銷,并不是每增加一個數(shù)據(jù)項都完整的估算一次,二來估算也會有誤差,所以實際暫用的內(nèi)存可能比估算值要大,這里spark.shuffle.safetyFraction(默認(rèn)為0.8)用來作為一個保險系數(shù),降低實際Shuffle使用的內(nèi)存閥值,增加一定的緩沖,降低實際內(nèi)存占用超過用戶配置值的概率。

?

spark.shuffle.spill.compress/ spark.shuffle.compress

?

這兩個配置參數(shù)都是用來設(shè)置Shuffle過程中是否使用壓縮算法對Shuffle數(shù)據(jù)進(jìn)行壓縮,前者針對Spill的中間數(shù)據(jù),后者針對最終的shuffle輸出文件,默認(rèn)都是True

?

理論上說,spark.shuffle.compress設(shè)置為True通常都是合理的,因為如果使用千兆以下的網(wǎng)卡,網(wǎng)絡(luò)帶寬往往最容易成為瓶頸。此外,目前的Spark任務(wù)調(diào)度實現(xiàn)中,以Shuffle劃分Stage,下一個Stage的任務(wù)是要等待上一個Stage的任務(wù)全部完成以后才能開始執(zhí)行,所以shuffle數(shù)據(jù)的傳輸和CPU計算任務(wù)之間通常不會重疊,這樣Shuffle數(shù)據(jù)傳輸量的大小和所需的時間就直接影響到了整個任務(wù)的完成速度。但是壓縮也是要消耗大量的CPU資源的,所以打開壓縮選項會增加Map任務(wù)的執(zhí)行時間,因此如果在CPU負(fù)載的影響遠(yuǎn)大于磁盤和網(wǎng)絡(luò)帶寬的影響的場合下,也可能將spark.shuffle.compress設(shè)置為False才是最佳的方案

?

對于spark.shuffle.spill.compress而言,情況類似,但是spill數(shù)據(jù)不會被發(fā)送到網(wǎng)絡(luò)中,僅僅是臨時寫入本地磁盤,而且在一個任務(wù)中同時需要執(zhí)行壓縮和解壓縮兩個步驟,所以對CPU負(fù)載的影響會更大一些,而磁盤帶寬(如果標(biāo)配12HDD的話)可能往往不會成為Spark應(yīng)用的主要問題,所以這個參數(shù)相對而言,或許更有機會需要設(shè)置為False。

?

總之,Shuffle過程中數(shù)據(jù)是否應(yīng)該壓縮,取決于CPU/DISK/NETWORK的實際能力和負(fù)載,應(yīng)該綜合考慮。

?

?

原文鏈接:http://spark-config.readthedocs.io/en/latest/

總結(jié)

以上是生活随笔為你收集整理的Spark性能相关参数配置详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。