日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

【转载】Apache Spark Jobs 性能调优(二)

發(fā)布時(shí)間:2025/6/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【转载】Apache Spark Jobs 性能调优(二) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

調(diào)試資源分配

?

Spark 的用戶郵件郵件列表中經(jīng)常會(huì)出現(xiàn) “我有一個(gè)500個(gè)節(jié)點(diǎn)的集群,為什么但是我的應(yīng)用一次只有兩個(gè) task 在執(zhí)行”,鑒于 Spark 控制資源使用的參數(shù)的數(shù)量,這些問(wèn)題不應(yīng)該出現(xiàn)。但是在本章中,你將學(xué)會(huì)壓榨出你集群的每一分資源。推薦的配置將根據(jù)不同的集群管理系統(tǒng)(YARN、Mesos、Spark Standalone)而有所不同,我們將主要集中在YARN?上,因?yàn)檫@個(gè)Cloudera?推薦的方式。


Spark(以及YARN) 需要關(guān)心的兩項(xiàng)主要的資源是 CPU 和 內(nèi)存, 磁盤(pán) 和 IO 當(dāng)然也影響著 Spark 的性能,但是不管是 Spark 還是 Yarn 目前都沒(méi)法對(duì)他們做實(shí)時(shí)有效的管理。

在一個(gè) Spark 應(yīng)用中,每個(gè) Spark executor 擁有固定個(gè)數(shù)的 core 以及固定大小的堆大小。core 的個(gè)數(shù)可以在執(zhí)行 spark-submit 或者 pyspark 或者 spark-shell 時(shí),通過(guò)參數(shù) --executor-cores 指定,或者在 spark-defaults.conf 配置文件或者 SparkConf 對(duì)象中設(shè)置 spark.executor.cores 參數(shù)。同樣地,堆的大小可以通過(guò) --executor-memory 參數(shù)或者 spark.executor.memory 配置項(xiàng)。core 配置項(xiàng)控制一個(gè) executor 中task的并發(fā)數(shù)。 --executor-cores 5 意味著每個(gè) executor 中最多同時(shí)可以有5個(gè) task 運(yùn)行。memory 參數(shù)影響 Spark 可以緩存的數(shù)據(jù)的大小,也就是在 group aggregate 以及 join 操作時(shí) shuffle 的數(shù)據(jù)結(jié)構(gòu)的最大值。

--num-executors 命令行參數(shù)或者spark.executor.instances 配置項(xiàng)控制需要的 executor 個(gè)數(shù)。從 CDH 5.4/Spark 1.3 開(kāi)始,你可以避免使用這個(gè)參數(shù),只要你通過(guò)設(shè)置 spark.dynamicAllocation.enabled 參數(shù)打開(kāi) 動(dòng)態(tài)分配 。動(dòng)態(tài)分配可以使的 Spark 的應(yīng)用在有后續(xù)積壓的在等待的 task 時(shí)請(qǐng)求 executor,并且在空閑時(shí)釋放這些 executor。

同時(shí) Spark 需求的資源如何跟?YARN?中可用的資源配合也是需要著重考慮的,YARN?相關(guān)的參數(shù)有:

  • yarn.nodemanager.resource.memory-mb 控制在每個(gè)節(jié)點(diǎn)上 container 能夠使用的最大內(nèi)存;
  • yarn.nodemanager.resource.cpu-vcores 控制在每個(gè)節(jié)點(diǎn)上 container 能夠使用的最大core個(gè)數(shù);

請(qǐng)求5個(gè) core 會(huì)生成向YARN?要5個(gè)虛擬core的請(qǐng)求。從YARN 請(qǐng)求內(nèi)存相對(duì)比較復(fù)雜因?yàn)橐韵碌囊恍┰?#xff1a;

--executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也會(huì)占用一定的堆空間,比如內(nèi)部的 String 或者直接 byte buffer,executor memory 的 spark.yarn.executor.memoryOverhead 屬性決定向YARN?請(qǐng)求的每個(gè) executor 的內(nèi)存大小,默認(rèn)值為max(384, 0.7 * spark.executor.memory);
YARN?可能會(huì)比請(qǐng)求的內(nèi)存高一點(diǎn),YARN?的 yarn.scheduler.minimum-allocation-mb 和 yarn.scheduler.increment-allocation-mb 屬性控制請(qǐng)求的最小值和增加量。
下面展示的是 Spark on?YARN?內(nèi)存結(jié)構(gòu):


如果這些還不夠決定Spark executor 個(gè)數(shù),還有一些概念還需要考慮的:

  • 應(yīng)用的master,是一個(gè)非 executor 的容器,它擁有特殊的從YARN?請(qǐng)求資源的能力,它自己本身所占的資源也需要被計(jì)算在內(nèi)。在 yarn-client 模式下,它默認(rèn)請(qǐng)求 1024MB 和 1個(gè)core。在 yarn-cluster 模式中,應(yīng)用的 master 運(yùn)行 driver,所以使用參數(shù) --driver-memory 和 --driver-cores 配置它的資源常常很有用。
  • 在 executor 執(zhí)行的時(shí)候配置過(guò)大的 memory 經(jīng)常會(huì)導(dǎo)致過(guò)長(zhǎng)的GC延時(shí),64G是推薦的一個(gè) executor 內(nèi)存大小的上限。
  • 我們注意到?HDFS?client 在大量并發(fā)線程是時(shí)性能問(wèn)題。大概的估計(jì)是每個(gè) executor 中最多5個(gè)并行的 task 就可以占滿的寫(xiě)入帶寬。
  • 在運(yùn)行微型 executor 時(shí)(比如只有一個(gè)core而且只有夠執(zhí)行一個(gè)task的內(nèi)存)扔掉在一個(gè)JVM上同時(shí)運(yùn)行多個(gè)task的好處。比如 broadcast 變量需要為每個(gè) executor 復(fù)制一遍,這么多小executor會(huì)導(dǎo)致更多的數(shù)據(jù)拷貝。

為了讓以上的這些更加具體一點(diǎn),這里有一個(gè)實(shí)際使用過(guò)的配置的例子,可以完全用滿整個(gè)集群的資源。假設(shè)一個(gè)集群有6個(gè)節(jié)點(diǎn)有NodeManager在上面運(yùn)行,每個(gè)節(jié)點(diǎn)有16個(gè)core以及64GB的內(nèi)存。那么 NodeManager的容量:yarn.nodemanager.resource.memory-mb 和 yarn.nodemanager.resource.cpu-vcores 可以設(shè)為 63 * 1024 = 64512 (MB) 和 15。我們避免使用 100% 的?YARN?container 資源因?yàn)檫€要為 OS 和 hadoop 的 Daemon 留一部分資源。在上面的場(chǎng)景中,我們預(yù)留了1個(gè)core和1G的內(nèi)存給這些進(jìn)程。Cloudera Manager 會(huì)自動(dòng)計(jì)算并且配置。

所以看起來(lái)我們最先想到的配置會(huì)是這樣的:--num-executors 6 --executor-cores 15 --executor-memory 63G。但是這個(gè)配置可能無(wú)法達(dá)到我們的需求,因?yàn)?#xff1a;?
- 63GB+ 的 executor memory 塞不進(jìn)只有 63GB 容量的 NodeManager;?
- 應(yīng)用的 master 也需要占用一個(gè)core,意味著在某個(gè)節(jié)點(diǎn)上,沒(méi)有15個(gè)core給 executor 使用;?
- 15個(gè)core會(huì)影響 HDFS IO的吞吐量。?
配置成 --num-executors 17 --executor-cores 5 --executor-memory 19G 可能會(huì)效果更好,因?yàn)?#xff1a;?
- 這個(gè)配置會(huì)在每個(gè)節(jié)點(diǎn)上生成3個(gè) executor,除了應(yīng)用的master運(yùn)行的機(jī)器,這臺(tái)機(jī)器上只會(huì)運(yùn)行2個(gè) executor?

- --executor-memory 被分成3份(63G/每個(gè)節(jié)點(diǎn)3個(gè)executor)=21。 21 * (1 - 0.07) ~ 19。

?

調(diào)試并發(fā)

?

我們知道 Spark 是一套數(shù)據(jù)并行處理的引擎。但是 Spark 并不是神奇得能夠?qū)⑺杏?jì)算并行化,它沒(méi)辦法從所有的并行化方案中找出最優(yōu)的那個(gè)。每個(gè) Spark stage 中包含若干個(gè) task,每個(gè) task 串行地處理數(shù)據(jù)。在調(diào)試 Spark 的job時(shí),task 的個(gè)數(shù)可能是決定程序性能的最重要的參數(shù)。

那么這個(gè)數(shù)字是由什么決定的呢?在之前的博文中介紹了 Spark 如何將 RDD 轉(zhuǎn)換成一組 stage。task 的個(gè)數(shù)與 stage 中上一個(gè) RDD 的 partition 個(gè)數(shù)相同。而一個(gè) RDD 的 partition 個(gè)數(shù)與被它依賴的 RDD 的 partition 個(gè)數(shù)相同,除了以下的情況:coalesce?transformation 可以創(chuàng)建一個(gè)具有更少 partition 個(gè)數(shù)的 RDD,union?transformation 產(chǎn)出的 RDD 的 partition 個(gè)數(shù)是它父 RDD 的 partition 個(gè)數(shù)之和,cartesian?返回的 RDD 的 partition 個(gè)數(shù)是它們的積。

如果一個(gè) RDD 沒(méi)有父 RDD 呢? 由?textFile?或者?hadoopFile?生成的 RDD 的 partition 個(gè)數(shù)由它們底層使用的 MapReduce InputFormat 決定的。一般情況下,每讀到的一個(gè) HDFS block 會(huì)生成一個(gè) partition。通過(guò)parallelize?接口生成的 RDD 的 partition 個(gè)數(shù)由用戶指定,如果用戶沒(méi)有指定則由參數(shù) spark.default.parallelism 決定。

要想知道 partition 的個(gè)數(shù),可以通過(guò)接口 rdd.partitions().size() 獲得。

這里最需要關(guān)心的問(wèn)題在于 task 的個(gè)數(shù)太小。如果運(yùn)行時(shí) task 的個(gè)數(shù)比實(shí)際可用的 slot 還少,那么程序解沒(méi)法使用到所有的 CPU 資源。

過(guò)少的 task 個(gè)數(shù)可能會(huì)導(dǎo)致在一些聚集操作時(shí), 每個(gè) task 的內(nèi)存壓力會(huì)很大。任何?joincogroup*ByKey?操作都會(huì)在內(nèi)存生成一個(gè) hash-map或者 buffer 用于分組或者排序。joincogroup?,groupByKey?會(huì)在 shuffle 時(shí)在 fetching 端使用這些數(shù)據(jù)結(jié)構(gòu),reduceByKey?,aggregateByKey?會(huì)在 shuffle 時(shí)在兩端都會(huì)使用這些數(shù)據(jù)結(jié)構(gòu)。

當(dāng)需要進(jìn)行這個(gè)聚集操作的 record 不能完全輕易塞進(jìn)內(nèi)存中時(shí),一些問(wèn)題會(huì)暴露出來(lái)。首先,在內(nèi)存 hold 大量這些數(shù)據(jù)結(jié)構(gòu)的 record 會(huì)增加 GC的壓力,可能會(huì)導(dǎo)致流程停頓下來(lái)。其次,如果數(shù)據(jù)不能完全載入內(nèi)存,Spark 會(huì)將這些數(shù)據(jù)寫(xiě)到磁盤(pán),這會(huì)引起磁盤(pán) IO和排序。在 Cloudera 的用戶中,這可能是導(dǎo)致 Spark Job 慢的首要原因。

那么如何增加你的 partition 的個(gè)數(shù)呢?如果你的問(wèn)題 stage 是從 Hadoop 讀取數(shù)據(jù),你可以做以下的選項(xiàng):?
- 使用?repartition?選項(xiàng),會(huì)引發(fā) shuffle;?
- 配置 InputFormat 用戶將文件分得更小;?
- 寫(xiě)入 HDFS 文件時(shí)使用更小的block。

如果問(wèn)題 stage 從其他 stage 中獲得輸入,引發(fā) stage 邊界的操作會(huì)接受一個(gè) numPartitions 的參數(shù),比如

<span style="font-family:Microsoft YaHei;"><span style="font-family:Microsoft YaHei;font-size:14px;">val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)</span></span>

  


X 應(yīng)該取什么值?最直接的方法就是做實(shí)驗(yàn)。不停的將 partition 的個(gè)數(shù)從上次實(shí)驗(yàn)的 partition 個(gè)數(shù)乘以1.5,直到性能不再提升為止。

同時(shí)也有一些原則用于計(jì)算 X,但是也不是非常的有效是因?yàn)橛行﹨?shù)是很難計(jì)算的。這里寫(xiě)到不是因?yàn)樗鼈兒軐?shí)用,而是可以幫助理解。這里主要的目標(biāo)是啟動(dòng)足夠的 task 可以使得每個(gè) task 接受的數(shù)據(jù)能夠都塞進(jìn)它所分配到的內(nèi)存中。

每個(gè) task 可用的內(nèi)存通過(guò)這個(gè)公式計(jì)算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores 。 memoryFraction 和 safetyFractio 默認(rèn)值分別 0.2 和 0.8.

在內(nèi)存中所有 shuffle 數(shù)據(jù)的大小很難確定。最可行的是找出一個(gè) stage 運(yùn)行的 Shuffle Spill(memory) 和 Shuffle Spill(Disk) 之間的比例。在用所有shuffle 寫(xiě)乘以這個(gè)比例。但是如果這個(gè) stage 是 reduce 時(shí),可能會(huì)有點(diǎn)復(fù)雜:?

在往上增加一點(diǎn)因?yàn)榇蠖鄶?shù)情況下 partition 的個(gè)數(shù)會(huì)比較多。

試試在,在有所疑慮的時(shí)候,使用更多的 task 數(shù)(也就是 partition 數(shù))都會(huì)效果更好,這與 MapRecuce 中建議 task 數(shù)目選擇盡量保守的建議相反。這個(gè)因?yàn)?MapReduce 在啟動(dòng) task 時(shí)相比需要更大的代價(jià)。

?

壓縮你的數(shù)據(jù)結(jié)構(gòu)

?

Spark 的數(shù)據(jù)流由一組 record 構(gòu)成。一個(gè) record 有兩種表達(dá)形式:一種是反序列化的 Java 對(duì)象另外一種是序列化的二進(jìn)制形式。通常情況下,Spark 對(duì)內(nèi)存中的 record 使用反序列化之后的形式,對(duì)要存到磁盤(pán)上或者需要通過(guò)網(wǎng)絡(luò)傳輸?shù)?record 使用序列化之后的形式。也有計(jì)劃在內(nèi)存中存儲(chǔ)序列化之后的 record。

spark.serializer 控制這兩種形式之間的轉(zhuǎn)換的方式。Kryo serializer,org.apache.spark.serializer.KryoSerializer 是推薦的選擇。但不幸的是它不是默認(rèn)的配置,因?yàn)?KryoSerializer 在早期的 Spark 版本中不穩(wěn)定,而 Spark 不想打破版本的兼容性,所以沒(méi)有把 KryoSerializer 作為默認(rèn)配置,但是 KryoSerializer 應(yīng)該在任何情況下都是第一的選擇。

你的 record 在這兩種形式切換的頻率對(duì)于 Spark 應(yīng)用的運(yùn)行效率具有很大的影響。去檢查一下到處傳遞數(shù)據(jù)的類(lèi)型,看看能否擠出一點(diǎn)水分是非常值得一試的。

過(guò)多的反序列化之后的 record 可能會(huì)導(dǎo)致數(shù)據(jù)到處到磁盤(pán)上更加頻繁,也使得能夠 Cache 在內(nèi)存中的 record 個(gè)數(shù)減少。點(diǎn)擊這里查看如何壓縮這些數(shù)據(jù)。

過(guò)多的序列化之后的 record 導(dǎo)致更多的 磁盤(pán)和網(wǎng)絡(luò) IO,同樣的也會(huì)使得能夠 Cache 在內(nèi)存中的 record 個(gè)數(shù)減少,這里主要的解決方案是把所有的用戶自定義的 class 都通過(guò) SparkConf#registerKryoClasses 的API定義和傳遞的。

?

數(shù)據(jù)格式

?

任何時(shí)候你都可以決定你的數(shù)據(jù)如何保持在磁盤(pán)上,使用可擴(kuò)展的二進(jìn)制格式比如:Avro,Parquet,Thrift或者Protobuf,從中選擇一種。當(dāng)人們?cè)谡務(wù)撛贖adoop上使用Avro,Thrift或者Protobuf時(shí),都是認(rèn)為每個(gè) record 保持成一個(gè) Avro/Thrift/Protobuf 結(jié)構(gòu)保存成 sequence file。而不是JSON。

每次當(dāng)時(shí)試圖使用JSON存儲(chǔ)大量數(shù)據(jù)時(shí),還是先放棄吧...

【轉(zhuǎn)載自:http://blog.csdn.net/u012102306/article/details/51700664】

轉(zhuǎn)載于:https://www.cnblogs.com/laoqing/p/7460145.html

總結(jié)

以上是生活随笔為你收集整理的【转载】Apache Spark Jobs 性能调优(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。