Spark数据倾斜的完美解决
數據傾斜解決方案
數據傾斜的解決,跟之前講解的性能調優,有一點異曲同工之妙。
性能調優中最有效最直接最簡單的方式就是加資源加并行度,并注意RDD架構(復用同一個RDD,加上cache緩存)。相對于前面,shuffle、jvm等是次要的。
6.1、原理以及現象分析
6.1.1、數據傾斜怎么出現的
在執行shuffle操作的時候,是按照key,來進行values的數據的輸出、拉取和聚合的。
同一個key的values,一定是分配到一個reduce task進行處理的。
多個key對應的values,比如一共是90萬。可能某個key對應了88萬數據,被分配到一個task上去面去執行。
另外兩個task,可能各分配到了1萬數據,可能是數百個key,對應的1萬條數據。
這樣就會出現數據傾斜問題。
想象一下,出現數據傾斜以后的運行的情況。很糟糕!
其中兩個task,各分配到了1萬數據,可能同時在10分鐘內都運行完了。另外一個task有88萬條,88 * 10 =? 880分鐘 = 14.5個小時。
大家看,本來另外兩個task很快就運行完畢了(10分鐘),但是由于一個拖后腿的家伙,第三個task,要14.5個小時才能運行完,就導致整個spark作業,也得14.5個小時才能運行完。
數據傾斜,一旦出現,是不是性能殺手?!
6.1.2、發生數據傾斜以后的現象
Spark數據傾斜,有兩種表現:
1、你的大部分的task,都執行的特別特別快,(你要用client模式,standalone client,yarn client,本地機器一執行spark-submit腳本,就會開始打印log),task175 finished,剩下幾個task,執行的特別特別慢,前面的task,一般1s可以執行完5個,最后發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task。
出現以上loginfo,就表明出現數據傾斜了。
這樣還算好的,因為雖然老牛拉破車一樣非常慢,但是至少還能跑。
2、另一種情況是,運行的時候,其他task都執行完了,也沒什么特別的問題,但是有的task,就是會突然間報了一個OOM,JVM Out Of Memory,內存溢出了,task failed,task lost,resubmitting task。反復執行幾次都到了某個task就是跑不通,最后就掛掉。
某個task就直接OOM,那么基本上也是因為數據傾斜了,task分配的數量實在是太大了!所以內存放不下,然后你的task每處理一條數據,還要創建大量的對象,內存爆掉了。
這樣也表明出現數據傾斜了。
這種就不太好了,因為你的程序如果不去解決數據傾斜的問題,壓根兒就跑不出來。
作業都跑不完,還談什么性能調優這些東西?!
6.1.3、定位數據傾斜出現的原因與出現問題的位置
根據log去定位
出現數據傾斜的原因,基本只可能是因為發生了shuffle操作,在shuffle的過程中,出現了數據傾斜的問題。因為某個或者某些key對應的數據,遠遠的高于其他的key。
1、你在自己的程序里面找找,哪些地方用了會產生shuffle的算子,groupByKey、countByKey、reduceByKey、join
2、看log
log一般會報是在你的哪一行代碼,導致了OOM異常。或者看log,看看是執行到了第幾個stage。spark代碼,是怎么劃分成一個一個的stage的。哪一個stage生成的task特別慢,就能夠自己用肉眼去對你的spark代碼進行stage的劃分,就能夠通過stage定位到你的代碼,到底哪里發生了數據傾斜。
?1、使用Hive ETL預處理數據
方案適用場景:
如果導致數據傾斜的是Hive表。如果該Hive表中的數據本身很不均勻(比如某個key對應了100萬數據,其他key才對應了10條數據),而且業務場景需要頻繁使用Spark對Hive表執行某個分析操作,那么比較適合使用這種技術方案。
方案實現思路:
此時可以評估一下,是否可以通過Hive來進行數據預處理(即通過Hive ETL預先對數據按照key進行聚合,或者是預先和其他表進行join),然后在Spark作業中針對的數據源就不是原來的Hive表了,而是預處理后的Hive表。此時由于數據已經預先進行過聚合或join操作了,那么在Spark作業中也就不需要使用原先的shuffle類算子執行這類操作了。
方案實現原理:
這種方案從根源上解決了數據傾斜,因為徹底避免了在Spark中執行shuffle類算子,那么肯定就不會有數據傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標不治本。因為畢竟數據本身就存在分布不均勻的問題,所以Hive ETL中進行group by或者join等shuffle操作時,還是會出現數據傾斜,導致Hive ETL的速度很慢。我們只是把數據傾斜的發生提前到了Hive ETL中,避免Spark程序發生數據傾斜而已。
?
2、過濾少數導致傾斜的key
方案適用場景:
如果發現導致傾斜的key就少數幾個,而且對計算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對應10條數據,但是只有一個key對應了100萬數據,從而導致了數據傾斜。
方案實現思路:
如果我們判斷那少數幾個數據量特別多的key,對作業的執行和計算結果不是特別重要的話,那么干脆就直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的數據量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數量,取數據量最多的key過濾掉即可。
方案實現原理:
將導致數據傾斜的key給過濾掉之后,這些key就不會參與計算了,自然不可能產生數據傾斜。
?
3、提高shuffle操作的并行度
方案實現思路:
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的并行度,該值默認是200,對于很多場景來說都有點過小。
方案實現原理:
增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個不同的key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。
?
4、雙重聚合
方案適用場景:
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。
方案實現原理:
將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。如果一個RDD中有一個key導致數據傾斜,同時還有其他的key,那么一般先對數據集進行抽樣,然后找出傾斜的key,再使用filter對原始的RDD進行分離為兩個RDD,一個是由傾斜的key組成的RDD1,一個是由其他的key組成的RDD2,那么對于RDD1可以使用加隨機前綴進行多分區多task計算,對于另一個RDD2正常聚合計算,最后將結果再合并起來。
隨機前綴加幾,ReduceByKey分幾個區。
?
5、將reduce join轉為map join(徹底避免數據傾斜)
BroadCast+filter(或者map)
方案適用場景:
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(比如幾百M或者一兩G),比較適用此方案。
方案實現思路:
不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。
方案實現原理:
普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。
?
6、采樣傾斜key并分拆join操作
方案適用場景:
兩個RDD/Hive表進行join的時候,如果數據量都比較大,無法采用“解決方案五”,那么此時可以看一下兩個RDD/Hive表中的key分布情況。如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。
方案實現思路:
對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數量,計算出來數據量最大的是哪幾個key。然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據并形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。而另外兩個普通的RDD就照常join即可。最后將兩次join的結果使用union算子合并起來即可,就是最終的join結果 。
?
7、使用隨機前綴和擴容RDD進行join
?
方案適用場景:
如果在進行join操作時,RDD中有大量的key導致數據傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了。
方案實現思路:
該方案的實現思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數據分布情況,找到那個造成數據傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條數據。然后將該RDD的每條數據都打上一個n以內的隨機前綴。同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。最后將兩個處理后的RDD進行join即可。
?
總結
以上是生活随笔為你收集整理的Spark数据倾斜的完美解决的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面试中常见的查找与排序
- 下一篇: Spark分区器HashPartitio