spark shuffle再补充
?
shuffle概覽
一個(gè)spark的RDD有一組固定的分區(qū)組成,每個(gè)分區(qū)有一系列的記錄組成。對(duì)于由窄依賴(lài)變換(例如map和filter)返回的RDD,會(huì)延續(xù)父RDD的分區(qū)信息,以pipeline的形式計(jì)算。每個(gè)對(duì)象僅依賴(lài)于父RDD中的單個(gè)對(duì)象。諸如coalesce之類(lèi)的操作可能導(dǎo)致任務(wù)處理多個(gè)輸入分區(qū),但轉(zhuǎn)換仍然被認(rèn)為是窄依賴(lài)的,因?yàn)橐粋€(gè)父RDD的分區(qū)只會(huì)被一個(gè)子RDD分區(qū)繼承。
Spark還支持寬依賴(lài)的轉(zhuǎn)換,例如groupByKey和reduceByKey。在這些依賴(lài)項(xiàng)中,計(jì)算單個(gè)分區(qū)中的記錄所需的數(shù)據(jù)可以來(lái)自于父數(shù)據(jù)集的許多分區(qū)中。要執(zhí)行這些轉(zhuǎn)換,具有相同key的所有元組必須最終位于同一分區(qū)中,由同一任務(wù)處理。為了滿(mǎn)足這一要求,Spark產(chǎn)生一個(gè)shuffle,它在集群內(nèi)部傳輸數(shù)據(jù),并產(chǎn)生一個(gè)帶有一組新分區(qū)的新stage。
可以看下面的代碼片段:
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()上面的代碼片段只有一個(gè)action操作,count,從輸入textfile到action經(jīng)過(guò)了三個(gè)轉(zhuǎn)換操作。這段代碼只會(huì)在一個(gè)stage中運(yùn)行,因?yàn)?#xff0c;三個(gè)轉(zhuǎn)換操作沒(méi)有shuffle,也即是三個(gè)轉(zhuǎn)換操作的每個(gè)分區(qū)都是只依賴(lài)于它的父RDD的單個(gè)分區(qū)。
但是,下面的單詞統(tǒng)計(jì)就跟上面有很大區(qū)別:
val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_,?1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >=?1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_,?1)).reduceByKey(_ + _) charCounts.collect()這段代碼里有兩個(gè)reducebykey操作,三個(gè)stage。
下面圖更復(fù)雜,因?yàn)橛幸粋€(gè)join操作:
粉框圈住的就是整個(gè)DAG的stage劃分。
在每個(gè)stage的邊界,父stage的task會(huì)將數(shù)據(jù)寫(xiě)入磁盤(pán),子stage的task會(huì)將數(shù)據(jù)通過(guò)網(wǎng)絡(luò)讀取。由于它們會(huì)導(dǎo)致很高的磁盤(pán)和網(wǎng)絡(luò)IO,所以shuffle代價(jià)相當(dāng)高,應(yīng)該盡量避免。父stage的數(shù)據(jù)分區(qū)往往和子stage的分區(qū)數(shù)不同。觸發(fā)shuffle的操作算子往往可以指定分區(qū)數(shù)的,也即是numPartitions代表下個(gè)stage會(huì)有多少個(gè)分區(qū)。就像mr任務(wù)中reducer的數(shù)據(jù)是非常重要的一個(gè)參數(shù)一樣,shuffle的時(shí)候指定分區(qū)數(shù)也將在很大程度上決定一個(gè)應(yīng)用程序的性能。
優(yōu)化shuffle
通常情況可以選擇使用產(chǎn)生相同結(jié)果的action和transform相互替換。但是并不是產(chǎn)生相同結(jié)果的算子就會(huì)有相同的性能。通常避免常見(jiàn)的陷阱并選擇正確的算子可以顯著提高應(yīng)用程序的性能。
當(dāng)選擇轉(zhuǎn)換操作的時(shí)候,應(yīng)最小化shuffle次數(shù)和shuffle的數(shù)據(jù)量。shuffle是非常消耗性能的操作。所有的shuffle數(shù)據(jù)都會(huì)被寫(xiě)入磁盤(pán),然后通過(guò)網(wǎng)絡(luò)傳輸。repartition , join, cogroup, 和 ?*By 或者 *ByKey 類(lèi)型的操作都會(huì)產(chǎn)生shuffle。我們可以對(duì)一下幾個(gè)操作算子進(jìn)行優(yōu)化:
1. groupByKey某些情況下可以被reducebykey代替。
2. reduceByKey某些情況下可以被 aggregatebykey代替。
3. flatMap-join-groupBy某些情況下可以被cgroup代替。
?
no shuffle
在某些情況下,前面描述的轉(zhuǎn)換操作不會(huì)導(dǎo)致shuffle。當(dāng)先前的轉(zhuǎn)換操作已經(jīng)使用了和shuffle相同的分區(qū)器分區(qū)數(shù)據(jù)的時(shí)候,spark就不會(huì)產(chǎn)生shuffle。
舉個(gè)例子:
rdd1?= someRdd.reduceByKey(...)rdd2?= someOtherRdd.reduceByKey(...)rdd3?= rdd1.join(rdd2)由于使用redcuebykey的時(shí)候沒(méi)有指定分區(qū)器,所以都是使用的默認(rèn)分區(qū)器,會(huì)導(dǎo)致rdd1和rdd2都采用的是hash分區(qū)器。兩個(gè)reducebykey操作會(huì)產(chǎn)生兩個(gè)shuffle過(guò)程。如果,數(shù)據(jù)集有相同的分區(qū)數(shù),執(zhí)行join操作的時(shí)候就不需要進(jìn)行額外的shuffle。由于數(shù)據(jù)集的分區(qū)相同,因此rdd1的任何單個(gè)分區(qū)中的key集合只能出現(xiàn)在rdd2的單個(gè)分區(qū)中。因此,rdd3的任何單個(gè)輸出分區(qū)的內(nèi)容僅取決于rdd1中單個(gè)分區(qū)的內(nèi)容和rdd2中的單個(gè)分區(qū),并且不需要第三個(gè)shuffle。
例如,如果someRdd有四個(gè)分區(qū),someOtherRdd有兩個(gè)分區(qū),而reduceByKeys都使用三個(gè)分區(qū),運(yùn)行的任務(wù)集如下所示:
如果rdd1和rdd2使用不同的分區(qū)器或者相同的分區(qū)器不同的分區(qū)數(shù),僅僅一個(gè)數(shù)據(jù)集在join的過(guò)程中需要重新shuffle
?
在join的過(guò)程中為了避免shuffle,可以使用廣播變量。當(dāng)executor內(nèi)存可以存儲(chǔ)數(shù)據(jù)集,在driver端可以將其加載到一個(gè)hash表中,然后廣播到executor。然后,map轉(zhuǎn)換可以引用哈希表來(lái)執(zhí)行查找。
增加shuffle
有時(shí)候需要打破最小化shuffle次數(shù)的規(guī)則。
當(dāng)增加并行度的時(shí)候,額外的shuffle是有利的。例如,數(shù)據(jù)中有一些文件是不可分割的,那么該大文件對(duì)應(yīng)的分區(qū)就會(huì)有大量的記錄,而不是說(shuō)將數(shù)據(jù)分散到盡可能多的分區(qū)內(nèi)部來(lái)使用所有已經(jīng)申請(qǐng)cpu。在這種情況下,使用reparition重新產(chǎn)生更多的分區(qū)數(shù),以滿(mǎn)足后面轉(zhuǎn)換算子所需的并行度,這會(huì)提升很大性能。
使用reduce和aggregate操作將數(shù)據(jù)聚合到driver端,也是修改區(qū)數(shù)的很好的例子。
在對(duì)大量分區(qū)執(zhí)行聚合的時(shí)候,在driver的單線(xiàn)程中聚合會(huì)成為瓶頸。要減driver的負(fù)載,可以首先使用reducebykey或者aggregatebykey執(zhí)行一輪分布式聚合,同時(shí)將結(jié)果數(shù)據(jù)集分區(qū)數(shù)減少。實(shí)際思路是首先在每個(gè)分區(qū)內(nèi)部進(jìn)行初步聚合,同時(shí)減少分區(qū)數(shù),然后再將聚合的結(jié)果發(fā)到driver端實(shí)現(xiàn)最終聚合。典型的操作是treeReduce 和 treeAggregate。
當(dāng)聚合已經(jīng)按照key進(jìn)行分組時(shí),此方法特別適用。例如,假如一個(gè)程序計(jì)算語(yǔ)料庫(kù)中每個(gè)單詞出現(xiàn)的次數(shù),并將結(jié)果使用map返回到driver。一種方法是可以使用聚合操作完成在每個(gè)分區(qū)計(jì)算局部map,然后在driver中合并map。可以用aggregateByKey以完全分布的方式進(jìn)行統(tǒng)計(jì),然后簡(jiǎn)單的用collectAsMap將結(jié)果返回到driver。
?
轉(zhuǎn)自:https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzA3MDY0NTMxOQ==&scene=124#wechat_redirect
超強(qiáng)干貨來(lái)襲 云風(fēng)專(zhuān)訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生總結(jié)
以上是生活随笔為你收集整理的spark shuffle再补充的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spark _22 _创建DataFra
- 下一篇: 阿里云Spark Shuffle的优化