日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

spark shuffle再补充

發(fā)布時間:2024/2/28 66 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark shuffle再补充 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

shuffle概覽

一個spark的RDD有一組固定的分區(qū)組成,每個分區(qū)有一系列的記錄組成。對于由窄依賴變換(例如map和filter)返回的RDD,會延續(xù)父RDD的分區(qū)信息,以pipeline的形式計算。每個對象僅依賴于父RDD中的單個對象。諸如coalesce之類的操作可能導致任務處理多個輸入分區(qū),但轉換仍然被認為是窄依賴的,因為一個父RDD的分區(qū)只會被一個子RDD分區(qū)繼承。

Spark還支持寬依賴的轉換,例如groupByKey和reduceByKey。在這些依賴項中,計算單個分區(qū)中的記錄所需的數(shù)據(jù)可以來自于父數(shù)據(jù)集的許多分區(qū)中。要執(zhí)行這些轉換,具有相同key的所有元組必須最終位于同一分區(qū)中,由同一任務處理。為了滿足這一要求,Spark產(chǎn)生一個shuffle,它在集群內(nèi)部傳輸數(shù)據(jù),并產(chǎn)生一個帶有一組新分區(qū)的新stage。

可以看下面的代碼片段:

sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()

上面的代碼片段只有一個action操作,count,從輸入textfile到action經(jīng)過了三個轉換操作。這段代碼只會在一個stage中運行,因為,三個轉換操作沒有shuffle,也即是三個轉換操作的每個分區(qū)都是只依賴于它的父RDD的單個分區(qū)。

但是,下面的單詞統(tǒng)計就跟上面有很大區(qū)別:

val tokenized = sc.textFile(args(0)).flatMap(_.split(' ')) val wordCounts = tokenized.map((_,?1)).reduceByKey(_ + _) val filtered = wordCounts.filter(_._2 >=?1000) val charCounts = filtered.flatMap(_._1.toCharArray).map((_,?1)).reduceByKey(_ + _) charCounts.collect()

這段代碼里有兩個reducebykey操作,三個stage。

下面圖更復雜,因為有一個join操作:

粉框圈住的就是整個DAG的stage劃分。

在每個stage的邊界,父stage的task會將數(shù)據(jù)寫入磁盤,子stage的task會將數(shù)據(jù)通過網(wǎng)絡讀取。由于它們會導致很高的磁盤和網(wǎng)絡IO,所以shuffle代價相當高,應該盡量避免。父stage的數(shù)據(jù)分區(qū)往往和子stage的分區(qū)數(shù)不同。觸發(fā)shuffle的操作算子往往可以指定分區(qū)數(shù)的,也即是numPartitions代表下個stage會有多少個分區(qū)。就像mr任務中reducer的數(shù)據(jù)是非常重要的一個參數(shù)一樣,shuffle的時候指定分區(qū)數(shù)也將在很大程度上決定一個應用程序的性能。

優(yōu)化shuffle

通常情況可以選擇使用產(chǎn)生相同結果的action和transform相互替換。但是并不是產(chǎn)生相同結果的算子就會有相同的性能。通常避免常見的陷阱并選擇正確的算子可以顯著提高應用程序的性能。

當選擇轉換操作的時候,應最小化shuffle次數(shù)和shuffle的數(shù)據(jù)量。shuffle是非常消耗性能的操作。所有的shuffle數(shù)據(jù)都會被寫入磁盤,然后通過網(wǎng)絡傳輸。repartition , join, cogroup, 和 ?*By 或者 *ByKey 類型的操作都會產(chǎn)生shuffle。我們可以對一下幾個操作算子進行優(yōu)化:

1. groupByKey某些情況下可以被reducebykey代替。

2. reduceByKey某些情況下可以被 aggregatebykey代替。

3. flatMap-join-groupBy某些情況下可以被cgroup代替。

?

no shuffle

在某些情況下,前面描述的轉換操作不會導致shuffle。當先前的轉換操作已經(jīng)使用了和shuffle相同的分區(qū)器分區(qū)數(shù)據(jù)的時候,spark就不會產(chǎn)生shuffle。

舉個例子:

rdd1?= someRdd.reduceByKey(...)rdd2?= someOtherRdd.reduceByKey(...)rdd3?= rdd1.join(rdd2)

由于使用redcuebykey的時候沒有指定分區(qū)器,所以都是使用的默認分區(qū)器,會導致rdd1和rdd2都采用的是hash分區(qū)器。兩個reducebykey操作會產(chǎn)生兩個shuffle過程。如果,數(shù)據(jù)集有相同的分區(qū)數(shù),執(zhí)行join操作的時候就不需要進行額外的shuffle。由于數(shù)據(jù)集的分區(qū)相同,因此rdd1的任何單個分區(qū)中的key集合只能出現(xiàn)在rdd2的單個分區(qū)中。因此,rdd3的任何單個輸出分區(qū)的內(nèi)容僅取決于rdd1中單個分區(qū)的內(nèi)容和rdd2中的單個分區(qū),并且不需要第三個shuffle。

例如,如果someRdd有四個分區(qū),someOtherRdd有兩個分區(qū),而reduceByKeys都使用三個分區(qū),運行的任務集如下所示:

如果rdd1和rdd2使用不同的分區(qū)器或者相同的分區(qū)器不同的分區(qū)數(shù),僅僅一個數(shù)據(jù)集在join的過程中需要重新shuffle

?

在join的過程中為了避免shuffle,可以使用廣播變量。當executor內(nèi)存可以存儲數(shù)據(jù)集,在driver端可以將其加載到一個hash表中,然后廣播到executor。然后,map轉換可以引用哈希表來執(zhí)行查找。

增加shuffle

有時候需要打破最小化shuffle次數(shù)的規(guī)則。

當增加并行度的時候,額外的shuffle是有利的。例如,數(shù)據(jù)中有一些文件是不可分割的,那么該大文件對應的分區(qū)就會有大量的記錄,而不是說將數(shù)據(jù)分散到盡可能多的分區(qū)內(nèi)部來使用所有已經(jīng)申請cpu。在這種情況下,使用reparition重新產(chǎn)生更多的分區(qū)數(shù),以滿足后面轉換算子所需的并行度,這會提升很大性能。

使用reduce和aggregate操作將數(shù)據(jù)聚合到driver端,也是修改區(qū)數(shù)的很好的例子。

在對大量分區(qū)執(zhí)行聚合的時候,在driver的單線程中聚合會成為瓶頸。要減driver的負載,可以首先使用reducebykey或者aggregatebykey執(zhí)行一輪分布式聚合,同時將結果數(shù)據(jù)集分區(qū)數(shù)減少。實際思路是首先在每個分區(qū)內(nèi)部進行初步聚合,同時減少分區(qū)數(shù),然后再將聚合的結果發(fā)到driver端實現(xiàn)最終聚合。典型的操作是treeReduce 和 treeAggregate。

當聚合已經(jīng)按照key進行分組時,此方法特別適用。例如,假如一個程序計算語料庫中每個單詞出現(xiàn)的次數(shù),并將結果使用map返回到driver。一種方法是可以使用聚合操作完成在每個分區(qū)計算局部map,然后在driver中合并map。可以用aggregateByKey以完全分布的方式進行統(tǒng)計,然后簡單的用collectAsMap將結果返回到driver。

?

轉自:https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzA3MDY0NTMxOQ==&scene=124#wechat_redirect

超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的spark shuffle再补充的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。