日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

3万字细品数据倾斜(建议收藏)

發(fā)布時間:2025/3/21 编程问答 66 豆豆
生活随笔 收集整理的這篇文章主要介紹了 3万字细品数据倾斜(建议收藏) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、前言

1.1?緒論

數(shù)據(jù)傾斜是大數(shù)據(jù)領(lǐng)域繞不開的攔路虎,當你所需處理的數(shù)據(jù)量到達了上億甚至是千億條的時候,數(shù)據(jù)傾斜將是橫在你面前一道巨大的坎。

邁的過去,將會海闊天空!邁不過去,就要做好準備:很可能有幾周甚至幾月都要頭疼于數(shù)據(jù)傾斜導致的各類詭異的問題。

1.2 鄭重聲明

話題比較大,技術(shù)要求也比較高,筆者盡最大的能力來寫出自己的理解,寫的不對和不好的地方大家一起交流。

有些例子不是特別嚴謹,一些小細節(jié)對文章理解沒有影響,不要太在意。(比如我在算機器內(nèi)存的時候,就不把Hadoop自身的進程算到使用內(nèi)存中)

1.3 文章結(jié)構(gòu)

1.先大致解釋一下什么是數(shù)據(jù)傾斜

2.再根據(jù)幾個場景來描述一下數(shù)據(jù)傾斜產(chǎn)生的情況

3.詳細分析一下在Hadoop和Spark中產(chǎn)生數(shù)據(jù)傾斜的原因

4.如何解決(優(yōu)化)數(shù)據(jù)傾斜問題?

二、什么是數(shù)據(jù)傾斜

簡單的講,數(shù)據(jù)傾斜就是我們在計算數(shù)據(jù)的時候,數(shù)據(jù)的分散度不夠,導致大量的數(shù)據(jù)集中到了集群中的一臺或者幾臺機器上計算,而集群中的其他節(jié)點空閑。這些傾斜了的數(shù)據(jù)的計算速度遠遠低于平均計算速度,導致整個計算過程過慢。

2.1 關(guān)鍵字:數(shù)據(jù)傾斜

相信大部分做數(shù)據(jù)的童鞋們都會遇到數(shù)據(jù)傾斜,數(shù)據(jù)傾斜會發(fā)生在數(shù)據(jù)開發(fā)的各個環(huán)節(jié)中,比如:

1.用Hive算數(shù)據(jù)的時候reduce階段卡在99.99%

2.用SparkStreaming做實時算法時候,一直會有executor出現(xiàn)OOM的錯誤,但是其余的executor內(nèi)存使用率卻很低。

3.這些問題經(jīng)常會困擾我們,辛辛苦苦等了幾個小時的數(shù)據(jù)就是跑不出來,心里多難過啊。

2.2 關(guān)鍵字:千億級

為什么要突出這么大數(shù)據(jù)量?先說一下筆者自己最初對數(shù)據(jù)量的理解:

數(shù)據(jù)量大就了不起了?數(shù)據(jù)量少,機器也少,計算能力也是有限的,因此難度也是一樣的。憑什么數(shù)據(jù)量大就會有數(shù)據(jù)傾斜,數(shù)據(jù)量小就沒有?

這樣理解也有道理,但是比較片面,舉兩個場景來對比:

公司一:總用戶量1000萬,5臺64G內(nèi)存的的服務(wù)器。

公司二:總用戶量10億,1000臺64G內(nèi)存的服務(wù)器。

兩個公司都部署了Hadoop集群。假設(shè)現(xiàn)在遇到了數(shù)據(jù)傾斜,發(fā)生什么?

1.公司一的數(shù)據(jù)分析師在做join的時候發(fā)生了數(shù)據(jù)傾斜,會導致有幾百萬用戶的相關(guān)數(shù)據(jù)集中到了一臺服務(wù)器上,幾百萬的用戶數(shù)據(jù),說大也不大,正常字段量的數(shù)據(jù)的話64G還是能輕松處理掉的。

2.公司二的數(shù)據(jù)分析師在做join的時候也發(fā)生了數(shù)據(jù)傾斜,可能會有1個億的用戶相關(guān)數(shù)據(jù)集中到了一臺機器上了(相信我,這很常見)。這時候一臺機器就很難搞定了,最后會很難算出結(jié)果。

三、數(shù)據(jù)傾斜長什么樣

下面會分幾個場景來描述一下數(shù)據(jù)傾斜的特征,方便讀者辨別。由于Hadoop和Spark是最常見的兩個計算平臺,下面就以這兩個平臺說明。

3.1 Hadoop中的數(shù)據(jù)傾斜

3.1.1 概述

Hadoop中直接貼近用戶使用使用的時Mapreduce程序和Hive程序,雖說Hive最后也是用MR來執(zhí)行(至少目前Hive內(nèi)存計算并不普及),但是畢竟寫的內(nèi)容邏輯區(qū)別很大,一個是程序,一個是Sql,因此這里稍作區(qū)分。

3.1.2 表現(xiàn)

Hadoop中的數(shù)據(jù)傾斜主要表現(xiàn)在、Reduce階段卡在99.99%,一直不能結(jié)束。

這里如果詳細的看日志或者和監(jiān)控界面的話會發(fā)現(xiàn):

有一個多幾個Reduce卡住

各種container報錯OOM

異常的Reducer讀寫的數(shù)據(jù)量極大,至少遠遠超過其它正常的Reducer

伴隨著數(shù)據(jù)傾斜,會出現(xiàn)任務(wù)被kill等各種詭異的表現(xiàn)。

3.1.2 經(jīng)驗

Hive的數(shù)據(jù)傾斜,一般都發(fā)生在Sql中g(shù)roup by和join on上,而且和數(shù)據(jù)邏輯綁定比較深。

3.2 Spark中的數(shù)據(jù)傾斜

Spark中的數(shù)據(jù)傾斜也很常見,這里包括Spark Streaming和Spark Sql,表現(xiàn)主要有下面幾種:

Executor lost,OOM,Shuffle過程出錯

Driver OOM

單個Executor執(zhí)行時間特別久,整體任務(wù)卡在某個階段不能結(jié)束

正常運行的任務(wù)突然失敗

注意,在Spark streaming程序中,數(shù)據(jù)傾斜更容易出現(xiàn),特別是在程序中包含一些類似sql的join、group這種操作的時候。?因為Spark Streaming程序在運行的時候,我們一般不會分配特別多的內(nèi)存,因此一旦在這個過程中出現(xiàn)一些數(shù)據(jù)傾斜,就十分容易造成OOM。

四、數(shù)據(jù)傾斜的原理

4.1 數(shù)據(jù)傾斜產(chǎn)生原因概述

我們以Spark和Hive的使用場景為例。

他們在做數(shù)據(jù)運算的時候會涉及到,count distinct、group by、join on等操作,這些都會觸發(fā)Shuffle動作。一旦觸發(fā)Shuffle,所有相同key的值就會被拉到一個或幾個Reducer節(jié)點上,容易發(fā)生單點計算問題,導致數(shù)據(jù)傾斜。

一般來說,數(shù)據(jù)傾斜原因有以下幾方面:

key分布不均勻

業(yè)務(wù)數(shù)據(jù)本身的特性

建表時考慮不周

某些SQL語句本身就有數(shù)據(jù)傾斜

4.2 Shuffle與數(shù)據(jù)傾斜

Hadoop和Spark在Shuffle過程中產(chǎn)生數(shù)據(jù)傾斜的原理基本類似即數(shù)據(jù)不均勻。如下圖:

大部分數(shù)據(jù)傾斜的原理就類似于上圖,很明了,因為數(shù)據(jù)分布不均勻,導致大量的數(shù)據(jù)分配到了一個節(jié)點。

4.3 數(shù)據(jù)本身與數(shù)據(jù)傾斜

我們舉一個例子,就說數(shù)據(jù)默認值的設(shè)計吧,假設(shè)我們有兩張表:

user(用戶信息表):userid,register_ip

ip(IP表):ip,register_user_cnt

這可能是兩個不同的人開發(fā)的數(shù)據(jù)表。如果我們的數(shù)據(jù)規(guī)范不太完善的話,會出現(xiàn)一種情況:

user表中的register_ip字段,如果獲取不到這個信息,我們默認為null;

但是在ip表中,我們在統(tǒng)計這個值的時候,為了方便,我們把獲取不到ip的用戶,統(tǒng)一認為他們的ip為0。

兩邊其實都沒有錯的,但是一旦我們做關(guān)聯(lián)了,這個任務(wù)會在做關(guān)聯(lián)的階段,也就是sql的on的階段卡死。

4.4 業(yè)務(wù)邏輯與數(shù)據(jù)傾斜

數(shù)據(jù)往往和業(yè)務(wù)是強相關(guān)的,業(yè)務(wù)的場景直接影響到了數(shù)據(jù)的分布。

再舉一個例子,比如就說訂單場景吧,我們在某一天在北京和上海兩個城市多了強力的推廣,結(jié)果可能是這兩個城市的訂單量增長了10000%,其余城市的數(shù)據(jù)量不變。

然后我們要統(tǒng)計不同城市的訂單情況,這樣,一做group操作,可能直接就數(shù)據(jù)傾斜了。

五、 解決數(shù)據(jù)傾斜思路

5.1 概述

數(shù)據(jù)傾斜的產(chǎn)生是有一些討論的,解決它們也是有一些討論的,本章會先給出幾個解決數(shù)據(jù)傾斜的思路,然后對Hadoop和Spark分別給出一些解決數(shù)據(jù)傾斜的方案。

注意:?很多數(shù)據(jù)傾斜的問題,都可以用和平臺無關(guān)的方式解決,比如更好的數(shù)據(jù)預(yù)處理, 異常值的過濾等,因此筆者認為,解決數(shù)據(jù)傾斜的重點在于對數(shù)據(jù)設(shè)計和業(yè)務(wù)的理解,這兩個搞清楚了,數(shù)據(jù)傾斜就解決了大部分了。

5.2 解決思路

解決數(shù)據(jù)傾斜有這幾個思路:

5.2.1 業(yè)務(wù)邏輯

我們從業(yè)務(wù)邏輯的層面上來優(yōu)化數(shù)據(jù)傾斜,比如上面的兩個城市做推廣活動導致那兩個城市數(shù)據(jù)量激增的例子,我們可以單獨對這兩個城市來做count,單獨做時可用兩次MR,第一次打散計算,第二次再最終聚合計算。完成后和其它城市做整合。

5.2.2 程序?qū)用?/p>

比如說在Hive中,經(jīng)常遇到count(distinct)操作,這樣會導致最終只有一個Reduce任務(wù)。

我們可以先group by,再在外面包一層count,就可以了。比如計算按用戶名去重后的總用戶量:

// 優(yōu)化前 只有一個reduce,先去重再count負擔比較大:

select?name,count(distinct?name)from?user;

//優(yōu)化后

// 設(shè)置該任務(wù)的每個job的reducer個數(shù)為3個。Hive默認-1,自動推斷。

set?mapred.reduce.tasks=3;

// 啟動兩個job,一個負責子查詢(可以有多個reduce),另一個負責count(1):

select?count(1)?from?(select?name?from?user?group?by?name)?tmp;

5.2.3 調(diào)參方面

Hadoop和Spark都自帶了很多的參數(shù)和機制來調(diào)節(jié)數(shù)據(jù)傾斜,合理利用它們就能解決大部分問題。

5.3 從業(yè)務(wù)和數(shù)據(jù)上解決數(shù)據(jù)傾斜

很多數(shù)據(jù)傾斜都是在數(shù)據(jù)的使用上造成的。我們舉幾個場景,并分別給出它們的解決方案。

數(shù)據(jù)分布不均勻:

前面提到的“從數(shù)據(jù)角度來理解數(shù)據(jù)傾斜”和“從業(yè)務(wù)計角度來理解數(shù)據(jù)傾斜”中的例子,其實都是數(shù)據(jù)分布不均勻的類型,這種情況和計算平臺無關(guān),我們能通過設(shè)計的角度嘗試解決它。

有損的方法:

找到異常數(shù)據(jù),比如ip為0的數(shù)據(jù),過濾掉

無損的方法:

對分布不均勻的數(shù)據(jù),單獨計算

先對key做一層hash,先將數(shù)據(jù)隨機打散讓它的并行度變大,再匯集

數(shù)據(jù)預(yù)處理

六、MR解決數(shù)據(jù)傾斜具體方法

6.1 大量相同key沒有combine就傳到Reducer

combiner函數(shù)

思想:提前在map進行combine,減少傳輸?shù)臄?shù)據(jù)量

在Mapper加上combiner相當于提前進行reduce,即把一個Mapper中的相同key進行了聚合,減少shuffle過程中傳輸?shù)臄?shù)據(jù)量,以及Reducer端的計算量。

如果導致數(shù)據(jù)傾斜的key 大量分布在不同的mapper的時候,這種方法就不是很有效了。

6.2 導致數(shù)據(jù)傾斜的key 大量分布在不同的mapper

局部聚合加全局聚合。

第一次在map階段對那些導致了數(shù)據(jù)傾斜的key 加上1到n的隨機前綴,這樣本來相同的key 也會被分到多個Reducer中進行局部聚合,數(shù)量就會大大降低。

第二次mapreduce,去掉key的隨機前綴,進行全局聚合。

思想:二次mr,第一次將key隨機散列到不同reducer進行處理達到負載均衡目的。第二次再根據(jù)去掉key的隨機前綴,按原key進行reduce處理。

該方法進行兩次mapreduce:

這個方法進行兩次mapreduce,性能稍差。

增加Reducer
思想:增加Reducer,提升并行度
JobConf.setNumReduceTasks(int)

實現(xiàn)custom partitioner
思想:根據(jù)數(shù)據(jù)分布情況,自定義散列函數(shù),將key均勻分配到不同Reducer

七、Hive解決數(shù)據(jù)傾斜具體方法

7.1 場景

7.1.1 group by

注:group by 優(yōu)于distinct group

情形:group by 維度過小,某值的數(shù)量過多

后果:處理某值的reduce非常耗時

解決方式:采用sum() group by的方式來替換count(distinct)完成計算。

7.1.2 count(distinct)

count(distinct xx)

情形:某特殊值過多

后果:處理此特殊值的reduce耗時;只有一個reduce任務(wù)

解決方式:count distinct時,將值為空的情況單獨處理,比如可以直接過濾空值的行,在最后結(jié)果中加1。如果還有其他計算,需要進行g(shù)roup by,可以先將值為空的記錄單獨處理,再和其他計算結(jié)果進行union。

7.1.3 join

情形1:小表與大表join,但較小表key集中
后果:shuffle分發(fā)到某一個或幾個Reducer上的數(shù)據(jù)量遠高于平均值。想象極端情況,小表的join列全部為一個值,那么shuffle后全部到一個Reducer節(jié)點,其他節(jié)點無負載。這就是極端的數(shù)據(jù)傾斜了。
解決方式:mapjoin

情形2:大表與大表join,但是分桶的判斷字段0值或空值過多
后果:這些空值/0值都由一個Reducer處理,非常慢
解決方式:把空值的key變成一個字符串加上隨機數(shù),把傾斜的數(shù)據(jù)分到不同的reduce上,由于null值關(guān)聯(lián)不上,處理后并不影響最終結(jié)果。

7.1.4 不同數(shù)據(jù)類型關(guān)聯(lián)產(chǎn)生數(shù)據(jù)傾斜

情形:比如用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時。

后果:處理此特殊值的reduce耗時;只有一個reduce任務(wù)
默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。

解決方式:把數(shù)字類型轉(zhuǎn)換成字符串類型

select?*?from?users?aleft?outer?join?logs?bon?a.usr_id?=?cast(b.user_id?as?string)

7.2 調(diào)優(yōu)

7.2.1 hive.map.aggr=true

#?開啟map端combiner set?hive.map.aggr=true;

思想
開啟map combiner。在map中會做部分聚集操作,效率更高但需要更多的內(nèi)存。

點評
假如map各條數(shù)據(jù)基本上不一樣, 聚合沒什么意義,做combiner反而畫蛇添足,hive里也考慮的比較周到通過參數(shù):

hive.groupby.mapaggr.checkinterval?= 100000 (默認)

hive.map.aggr.hash.min.reduction=0.5(默認)

7.2.2?hive.groupby.skewindata=true

#?開啟數(shù)據(jù)傾斜時負載均衡 set?hive.groupby.skewindata=true;

思想
就是先隨機分發(fā)并處理,再按照key group by來分發(fā)處理。

操作
當選項設(shè)定為true,生成的查詢計劃會有兩個MRJob。

第一個MRJob 中,Map的輸出結(jié)果集合會隨機分布到Reduce中,每個Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的GroupBy Key有可能被分發(fā)到不同的Reduce中,從而達到負載均衡的目的;

第二個MRJob再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的原始GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。

點評
它使計算變成了兩個mapreduce,先在第一個中在 shuffle 過程 partition 時隨機給 key 打標記,使每個key 隨機均勻分布到各個 reduce 上計算,但是這樣只能完成部分計算,因為相同key沒有分配到相同reduce上。

所以需要第二次的mapreduce,這次就回歸正常 shuffle,但是數(shù)據(jù)分布不均勻的問題在第一次mapreduce已經(jīng)有了很大的改善,因此基本解決數(shù)據(jù)傾斜。因為大量計算已經(jīng)在第一次mr中隨機分布到各個節(jié)點完成。

7.2.3 Join

7.2.3.1 關(guān)于驅(qū)動表的選取

選用join key分布最均勻的表作為驅(qū)動表。

7.2.3.2 做好列裁剪和filter操作

以達到兩表做join的時候,數(shù)據(jù)量相對變小的效果。

7.2.3.3 left semi join

7.2.3.4 大小表Join - MapJoin

思想
小表關(guān)聯(lián)一個超大表時,容易發(fā)生數(shù)據(jù)傾斜,使用?MapJoin把小表全部加載到內(nèi)存在map端進行join。如果需要的數(shù)據(jù)在 Map 的過程中可以訪問到則不再需要Reduce。

實例分析
原始sql:

select?c.channel_name,count(t.requesturl)?PVfrom?ods.cms_channel?cjoin(select?host,requesturl?from??dms.tracklog_5min?where?day='20151111'?)?ton?c.channel_name=t.hostgroup?by?c.channel_nameorder?by?c.channel_name;

以上為小表join大表的操作,可以使用mapjoin把小表c放到內(nèi)存中處理,語法很簡單只需要增加?/*+ MAPJOIN(小標) */,把需要分發(fā)的表放入到內(nèi)存中。

select?/*+?MAPJOIN(c)?*/ c.channel_name,count(t.requesturl)?PVfrom?ods.cms_channel?cjoin(select?host,requesturl?from??dms.tracklog_5min?where?day='20151111'?)?ton?c.channel_name=t.hostgroup?by?c.channel_nameorder?by?c.channel_name;

7.2.3.5 大表Join大表 - skewjoin

當key值都是有效值時可使用hive配置:

set hive.optimize.skewjoin=true;
指定是否開啟數(shù)據(jù)傾斜的join運行時優(yōu)化,默認不開啟即false。

set hive.skewjoin.key=100000;
判斷數(shù)據(jù)傾斜的閾值,如果在join中發(fā)現(xiàn)同樣的key超過該值,則認為是該key是傾斜key。

默認100000。一般可以設(shè)置成處理的總記錄數(shù)/reduce個數(shù)的2-4倍。

set hive.optimize.skewjoin.compiletime=true;
指定是否開啟數(shù)據(jù)傾斜的join編譯時優(yōu)化,默認不開啟即false。

具體來說,會基于存儲在原數(shù)據(jù)中的傾斜key,來在編譯時為導致傾斜的key單獨創(chuàng)建執(zhí)行計劃,而其他key也有一個執(zhí)行計劃用來join。然后,對上面生成的兩個join執(zhí)行后求并集。因此,除非相同的傾斜key同時存在于這兩個join表中,否則對于引起傾斜的key的join就會優(yōu)化為map-side join。

此外,該參數(shù)與hive.optimize.skewjoin之間的主要區(qū)別在于,此參數(shù)使用存儲在metastore中的傾斜信息在編譯時來優(yōu)化執(zhí)行計劃。如果元數(shù)據(jù)中沒有傾斜信息,則此參數(shù)無效。一般可將這兩個參數(shù)都設(shè)為true。如果元數(shù)據(jù)中有傾斜信息,則hive.optimize.skewjoin不做任何操作。

7.2.3.6 小結(jié)

以上方式,都是根據(jù)數(shù)據(jù)傾斜形成的原因進行的一些變化。要么將 reduce 端的隱患在 map 端就解決,要么就是對 key 的操作,以減緩reduce 的壓力。了解了原因再去尋找解決之道就相對思路多了些,方法肯定不止這幾種。

7.2.4 先group再count

能先進行?group?操作的時候先進行g(shù)roup操作,把 key 先進行一次 reduce,之后再進行 count 或者 distinct count 操作。

7.2.5 控制空值分布

將為空的key轉(zhuǎn)變?yōu)樽址与S機數(shù)或純隨機數(shù),將因空值而造成傾斜的數(shù)據(jù)分不到多個Reducer。

注:對于異常值如果不需要的話,最好是提前在where條件里過濾掉,這樣可以使計算量大大減少

實踐中,可以使用case when對空值賦上隨機值。此方法比直接寫is not null更好,因為前者job數(shù)為1,后者為2.

使用case when實例1:

select?userid,?name fromuser_info?a join?( select?case when?userid?is?null??then??cast?(rand(47)*?100000?as?int?) else?userid end from?user_read_log )?b on?a.userid?=?b.userid

使用case when實例2:

select'${date}'?as?thedate,a.search_type,a.query,a.category,a.cat_name,a.brand_id,a.brand_name,a.dir_type,a.rewcatid,a.new_cat_name,a.new_brand_id,f.brand_name?as?new_brand_name,a.pv,a.uv,a.ipv,a.ipvuv,a.trans_amt,a.trans_num,a.alipay_uv from?fdi_search_query_cat_qp_temp?a left?outer?join?brand?f onf.pt='${date}000000'and?case?when?a.new_brand_id?is?null?then?concat('hive',rand()?)?else?a.new_brand_id?end?=?f.brand_id

如果上述的方法還不能解決,比如當有多個JOIN的時候,建議建立臨時表,然后拆分HIVE SQL語句。

7.2.6 壓縮

設(shè)置map端輸出、中間結(jié)果壓縮。(不完全是解決數(shù)據(jù)傾斜的問題,但是減少了IO讀寫和網(wǎng)絡(luò)傳輸,能提高很多效率)

7.2.7 增加Reuducer個數(shù)

默認是由參數(shù)hive.exec.reducers.bytes.per.reducer來推斷需要的Reducer個數(shù)。

可通過mapred.reduce.tasks控制,默認-

八、Spark解決數(shù)據(jù)傾斜具體方法

8.1 概述

mapjoin

設(shè)置rdd壓縮

合理設(shè)置driver的內(nèi)存

Spark Sql中的優(yōu)化和Hive類似,可以參考Hive

8.2 Spark數(shù)據(jù)傾斜表現(xiàn)

絕大多數(shù)task執(zhí)行得都非???#xff0c;但個別task執(zhí)行極慢。比如,總共有1000個task,997個task都在1分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個task卻要一兩個小時。這種情況很常見。

原本能夠正常執(zhí)行的Spark作業(yè),某天突然報出OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的。這種情況比較少見。

8.3 Spark數(shù)據(jù)傾斜的原理

Shuffle必須將各個節(jié)點上相同的key拉取到某個節(jié)點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應(yīng)的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜。

比如大部分key對應(yīng)10條數(shù)據(jù),但是個別key卻對應(yīng)了100萬條數(shù)據(jù),那么大部分task可能就只會分配到10條數(shù)據(jù),然后1秒鐘就運行完了;但是個別task可能分配到了100萬數(shù)據(jù),要運行一兩個小時。因此,整個Spark作業(yè)的運行進度是由運行時間最長的那個task決定的。

因此出現(xiàn)數(shù)據(jù)傾斜的時候,Spark作業(yè)看起來會運行得非常緩慢,甚至可能因為某個task處理的數(shù)據(jù)量過大導致OOM。

8.4 Spark數(shù)據(jù)傾斜例子

下圖就是一個很清晰的例子:

hello這個key,在三個節(jié)點上對應(yīng)了總共7條數(shù)據(jù),這些數(shù)據(jù)都會被拉取到同一個task中進行處理;

而world和you這兩個key分別才對應(yīng)1條數(shù)據(jù),所以這兩個task只要分別處理1條數(shù)據(jù)即可。

此時第一個task的運行時間可能是另外兩個task的7倍,而整個stage的運行速度也由運行最慢的那個task所決定。

8.4 定位導致數(shù)據(jù)傾斜代碼

Spark數(shù)據(jù)傾斜只會發(fā)生在shuffle過程中。

這里給大家羅列一些常用的并且可能會觸發(fā)shuffle操作的算子:
distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

出現(xiàn)數(shù)據(jù)傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。

8.4.1 某個task執(zhí)行特別慢的情況

首先要看的,就是數(shù)據(jù)傾斜發(fā)生在第幾個stage中:

如果是用yarn-client模式提交,那么在提交的機器本地是直接可以看到log,可以在log中找到當前運行到了第幾個stage;

如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當前運行到了第幾個stage。

此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當前這個stage各個task分配的數(shù)據(jù)量,從而進一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。

看task運行時間和數(shù)據(jù)量

task運行時間

比如下圖中,倒數(shù)第三列顯示了每個task的運行時間。明顯可以看到,有的task運行特別快,只需要幾秒鐘就可以運行完;而有的task運行特別慢,需要幾分鐘才能運行完,此時單從運行時間上看就已經(jīng)能夠確定發(fā)生數(shù)據(jù)傾斜了。

task數(shù)據(jù)量

此外,倒數(shù)第一列顯示了每個task處理的數(shù)據(jù)量,明顯可以看到,運行時間特別短的task只需要處理幾百KB的數(shù)據(jù)即可,而運行時間特別長的task需要處理幾千KB的數(shù)據(jù),處理的數(shù)據(jù)量差了10倍。此時更加能夠確定是發(fā)生了數(shù)據(jù)傾斜。

推斷傾斜代碼

知道數(shù)據(jù)傾斜發(fā)生在哪一個stage之后,接著我們就需要根據(jù)stage劃分原理,推算出來發(fā)生傾斜的那個stage對應(yīng)代碼中的哪一部分,這部分代碼中肯定會有一個shuffle類算子。

精準推算stage與代碼的對應(yīng)關(guān)系,需要對Spark的源碼有深入的理解,這里我們可以介紹一個相對簡單實用的推算方法:只要看到Spark代碼中出現(xiàn)了一個shuffle類算子或者是Spark SQL的SQL語句中出現(xiàn)了會導致shuffle的語句(比如group by語句),那么就可以判定,以那個地方為界限劃分出了前后兩個stage。

這里我們就以如下單詞計數(shù)來舉例。

val?conf?=?new?SparkConf() val?sc?=?new?SparkContext(conf) val?lines?=?sc.textFile("hdfs://...") val?words?=?lines.flatMap(_.split("?")) val?pairs?=?words.map((_,?1)) val?wordCounts?=?pairs.reduceByKey(_?+?_) wordCounts.collect().foreach(println(_))

在整個代碼中只有一個reduceByKey是會發(fā)生shuffle的算子,也就是說這個算子為界限劃分出了前后兩個stage:

stage0,主要是執(zhí)行從textFile到map操作,以及shuffle write操作(對pairs RDD中的數(shù)據(jù)進行分區(qū)操作,每個task處理的數(shù)據(jù)中,相同的key會寫入同一個磁盤文件內(nèi))。

stage1,主要是執(zhí)行從reduceByKey到collect操作,以及stage1的各個task一開始運行,就會首先執(zhí)行shuffle read操作(會從stage0的各個task所在節(jié)點拉取屬于自己處理的那些key,然后對同一個key進行全局性的聚合或join等操作,在這里就是對key的value值進行累加)

stage1在執(zhí)行完reduceByKey算子之后,就計算出了最終的wordCounts RDD,然后會執(zhí)行collect算子,將所有數(shù)據(jù)拉取到Driver上,供我們遍歷和打印輸出。

通過對單詞計數(shù)程序的分析,希望能夠讓大家了解最基本的stage劃分的原理,以及stage劃分后shuffle操作是如何在兩個stage的邊界處執(zhí)行的。然后我們就知道如何快速定位出發(fā)生數(shù)據(jù)傾斜的stage對應(yīng)代碼的哪一個部分了。

比如我們在Spark Web UI或者本地log中發(fā)現(xiàn),stage1的某幾個task執(zhí)行得特別慢,判定stage1出現(xiàn)了數(shù)據(jù)傾斜,那么就可以回到代碼中,定位出stage1主要包括了reduceByKey這個shuffle類算子,此時基本就可以確定是是該算子導致了數(shù)據(jù)傾斜問題。

此時,如果某個單詞出現(xiàn)了100萬次,其他單詞才出現(xiàn)10次,那么stage1的某個task就要處理100萬數(shù)據(jù),整個stage的速度就會被這個task拖慢。

8.4.2 某個task莫名其妙內(nèi)存溢出的情況

這種情況下去定位出問題的代碼就比較容易了。我們建議直接看yarn-client模式下本地log的異常棧,或者是通過YARN查看yarn-cluster模式下的log中的異常棧。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出。然后在那行代碼附近找找,一般也會有shuffle類算子,此時很可能就是這個算子導致了數(shù)據(jù)傾斜。

但是大家要注意的是,不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜。因為自己編寫的代碼的bug,以及偶然出現(xiàn)的數(shù)據(jù)異常,也可能會導致內(nèi)存溢出。因此還是要按照上面所講的方法,通過Spark Web UI查看報錯的那個stage的各個task的運行時間以及分配的數(shù)據(jù)量,才能確定是否是由于數(shù)據(jù)傾斜才導致了這次內(nèi)存溢出。

8.5 查看導致數(shù)據(jù)傾斜的key分布情況

知道了數(shù)據(jù)傾斜發(fā)生在哪里之后,通常需要分析一下那個執(zhí)行了shuffle操作并且導致了數(shù)據(jù)傾斜的RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)。針對不同的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術(shù)方案來解決。

此時根據(jù)你執(zhí)行操作的情況不同,可以有很多種查看key分布的方式:

如果是Spark SQL中的group by、join語句導致的數(shù)據(jù)傾斜,那么就查詢一下SQL中使用的表的key分布情況。

如果是對Spark RDD執(zhí)行shuffle算子導致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()。然后對統(tǒng)計出來的各個key出現(xiàn)的次數(shù),collect/take到客戶端打印一下,就可以看到key的分布情況。

舉例來說,對于上面所說的單詞計數(shù)程序,如果確定了是stage1的reduceByKey算子導致了數(shù)據(jù)傾斜,那么就應(yīng)該看看進行reduceByKey操作的RDD中的key分布情況,在這個例子中指的就是pairs RDD。如下示例,我們可以先對pairs采樣10%的樣本數(shù)據(jù),然后使用countByKey算子統(tǒng)計出每個key出現(xiàn)的次數(shù),最后在客戶端遍歷和打印樣本數(shù)據(jù)中各個key的出現(xiàn)次數(shù)。

val?sampledPairs?=?pairs.sample(false,?0.1) val?sampledWordCounts?=?sampledPairs.countByKey() sampledWordCounts.foreach(println(_))

8.6 Spark 數(shù)據(jù)傾斜的解決方案

8.6.1 使用Hive ETL預(yù)處理數(shù)據(jù)

8.6.1.1 適用場景

導致數(shù)據(jù)傾斜的是Hive表。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個key對應(yīng)了100萬數(shù)據(jù),其他key才對應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場景需要頻繁使用Spark對Hive表執(zhí)行某個分析操作,那么比較適合使用這種技術(shù)方案。

8.6.1.2 實現(xiàn)思路

此時可以評估一下,是否可以通過Hive來進行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對數(shù)據(jù)按照key進行聚合,或者是預(yù)先和其他表進行join),然后在Spark作業(yè)中針對的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后的Hive表。此時由于數(shù)據(jù)已經(jīng)預(yù)先進行過聚合或join操作了,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。

8.6.1.3 方案實現(xiàn)原理

這種方案從根源上解決了數(shù)據(jù)傾斜,因為徹底避免了在Spark中執(zhí)行shuffle類算子,那么肯定就不會有數(shù)據(jù)傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標不治本。因為畢竟數(shù)據(jù)本身就存在分布不均勻的問題,所以Hive ETL中進行g(shù)roup by或者join等shuffle操作時,還是會出現(xiàn)數(shù)據(jù)傾斜,導致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。

8.6.1.4 方案優(yōu)缺點

優(yōu)點
實現(xiàn)起來簡單便捷,效果還非常好,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會大幅度提升。

缺點
治標不治本,Hive ETL中還是會發(fā)生數(shù)據(jù)傾斜。

8.6.1.5 方案實踐經(jīng)驗

在一些Java系統(tǒng)與Spark結(jié)合使用的項目中,會出現(xiàn)Java代碼頻繁調(diào)用Spark作業(yè)的場景,而且對Spark作業(yè)的執(zhí)行性能要求很高,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的Hive ETL,每天僅執(zhí)行一次,只有那一次是比較慢的,而之后每次Java調(diào)用Spark作業(yè)時,執(zhí)行速度都會很快,能夠提供更好的用戶體驗。

8.6.1.6 項目實踐經(jīng)驗

在美團·點評的交互式用戶行為分析系統(tǒng)中使用了這種方案,該系統(tǒng)主要是允許用戶通過Java Web系統(tǒng)提交數(shù)據(jù)分析統(tǒng)計任務(wù),后端通過Java提交Spark作業(yè)進行數(shù)據(jù)分析統(tǒng)計。要求Spark作業(yè)速度必須要快,盡量在10分鐘以內(nèi),否則速度太慢,用戶體驗會很差。所以我們將有些Spark作業(yè)的shuffle操作提前到了Hive ETL中,從而讓Spark直接使用預(yù)處理的Hive中間表,盡可能地減少Spark的shuffle操作,大幅度提升了性能,將部分作業(yè)的性能提升了6倍以上。

8.6.2 過濾少數(shù)導致傾斜的key

8.6.2.1 方案適用場景

如果發(fā)現(xiàn)導致傾斜的key就少數(shù)幾個,而且對計算本身的影響并不大的話,那么很適合使用這種方案。比如99%的key就對應(yīng)10條數(shù)據(jù),但是只有一個key對應(yīng)了100萬數(shù)據(jù),從而導致了數(shù)據(jù)傾斜。

8.6.2.2 方案實現(xiàn)思路

如果我們判斷那少數(shù)幾個數(shù)據(jù)量特別多的key,對作業(yè)的執(zhí)行和計算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個key。

比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執(zhí)行filter算子過濾掉這些key。

如果需要每次作業(yè)執(zhí)行時,動態(tài)判定哪些key的數(shù)據(jù)量最多然后再進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數(shù)量,取數(shù)據(jù)量最多的key過濾掉即可。

8.6.2.3 方案實現(xiàn)原理

將導致數(shù)據(jù)傾斜的key給過濾掉之后,這些key就不會參與計算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜。

8.6.2.4 方案優(yōu)缺點

優(yōu)點
實現(xiàn)簡單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。

缺點
適用場景不多,大多數(shù)情況下,導致傾斜的key還是很多的,并不是只有少數(shù)幾個。

8.6.2.5 方案實踐經(jīng)驗

在項目中我們也采用過這種方案解決數(shù)據(jù)傾斜。有一次發(fā)現(xiàn)某一天Spark作業(yè)在運行的時候突然OOM了,追查之后發(fā)現(xiàn),是Hive表中的某一個key在那天數(shù)據(jù)異常,導致數(shù)據(jù)量暴增。因此就采取每次執(zhí)行前先進行采樣,計算出樣本中數(shù)據(jù)量最大的幾個key之后,直接在程序中將那些key給過濾掉。

8.6.3 提高shuffle操作的并行度

8.6.3.1 方案適用場景

如果我們必須要對數(shù)據(jù)傾斜迎難而上,那么建議優(yōu)先使用這種方案,因為這是處理數(shù)據(jù)傾斜最簡單的一種方案。

8.6.3.2 方案實現(xiàn)思路

在對RDD執(zhí)行shuffle算子時,給shuffle算子傳入一個參數(shù),比如reduceByKey(1000),該參數(shù)就設(shè)置了這個shuffle算子執(zhí)行時shuffle read task的數(shù)量,即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,默認是200,對于很多場景來說都有點過小。

8.6.3.3 方案實現(xiàn)原理

增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的,那么這個task就要處理50條數(shù)據(jù)。

而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。具體原理如下圖所示。

8.6.3.4 方案優(yōu)缺點

優(yōu)點
實現(xiàn)起來比較簡單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響。

缺點
只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)實踐經(jīng)驗來看,其效果有限。

8.6.3.5 方案實踐經(jīng)驗

該方案通常無法徹底解決數(shù)據(jù)傾斜,因為如果出現(xiàn)一些極端情況,比如某個key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時嘗試使用的第一種手段,嘗試去用最簡單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。

8.6.4 兩階段聚合(局部聚合+全局聚合)

8.6.4.1 方案適用場景

對RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。

8.6.4.2 方案實現(xiàn)思路

這個方案的核心實現(xiàn)思路就是進行兩階段聚合:

第一次是局部聚合,先給每個key都打上一個隨機數(shù),比如10以內(nèi)的隨機數(shù),此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。

接著對打上隨機數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結(jié)果,就會變成了(1_hello, 2) (2_hello, 2)。

然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結(jié)果了,比如(hello, 4)。

示例代碼如下:

//?第一步,給RDD中的每個key都打上一個隨機前綴。 JavaPairRDD<String,?Long>?randomPrefixRdd?=?rdd.mapToPair(new?PairFunction<Tuple2<Long,Long>,?String,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?Long>?call(Tuple2<Long,?Long>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(10);return?new?Tuple2<String,?Long>(prefix?+?"_"?+?tuple._1,?tuple._2);}});//?第二步,對打上隨機前綴的key進行局部聚合。 JavaPairRDD<String,?Long>?localAggrRdd?=?randomPrefixRdd.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});//?第三步,去除RDD中每個key的隨機前綴。 JavaPairRDD<Long,?Long>?removedRandomPrefixRdd?=?localAggrRdd.mapToPair(new?PairFunction<Tuple2<String,Long>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<String,?Long>?tuple)throws?Exception?{long?originalKey?=?Long.valueOf(tuple._1.split("_")[1]);return?new?Tuple2<Long,?Long>(originalKey,?tuple._2);}});//?第四步,對去除了隨機前綴的RDD進行全局聚合。 JavaPairRDD<Long,?Long>?globalAggrRdd?=?removedRandomPrefixRdd.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});

8.6.4.3 方案實現(xiàn)原理

將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合,進而解決單個task處理數(shù)據(jù)量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結(jié)果。具體原理見下圖。

8.6.4.4 方案優(yōu)缺點

優(yōu)點
對于聚合類的shuffle操作導致的數(shù)據(jù)傾斜,效果是非常不錯的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜,將Spark作業(yè)的性能提升數(shù)倍以上。

缺點
僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

8.6.5 將reduce join轉(zhuǎn)為map join

8.6.5.1 方案適用場景

在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數(shù)據(jù)量比較小(比如幾百M或者一兩G),比較適用此方案。

8.6.5.2 方案實現(xiàn)思路

不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現(xiàn)join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個Broadcast變量,廣播給其他Executor節(jié)點;

接著對另外一個RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當前RDD的每一條數(shù)據(jù)按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來。

示例如下:

//?首先將數(shù)據(jù)量比較小的RDD的數(shù)據(jù),collect到Driver中來。 List<Tuple2<Long,?Row>>?rdd1Data?=?rdd1.collect() //?然后使用Spark的廣播功能,將小RDD的數(shù)據(jù)轉(zhuǎn)換成廣播變量,這樣每個Executor就只有一份RDD的數(shù)據(jù)。 //?可以盡可能節(jié)省內(nèi)存空間,并且減少網(wǎng)絡(luò)傳輸性能開銷。 final?Broadcast<List<Tuple2<Long,?Row>>>?rdd1DataBroadcast?=?sc.broadcast(rdd1Data);//?對另外一個RDD執(zhí)行map類操作,而不再是join類操作。 JavaPairRDD<String,?Tuple2<String,?Row>>?joinedRdd?=?rdd2.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?Tuple2<String,?Row>>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?Tuple2<String,?Row>>?call(Tuple2<Long,?String>?tuple)throws?Exception?{//?在算子函數(shù)中,通過廣播變量,獲取到本地Executor中的rdd1數(shù)據(jù)。List<Tuple2<Long,?Row>>?rdd1Data?=?rdd1DataBroadcast.value();//?可以將rdd1的數(shù)據(jù)轉(zhuǎn)換為一個Map,便于后面進行join操作。Map<Long,?Row>?rdd1DataMap?=?new?HashMap<Long,?Row>();for(Tuple2<Long,?Row>?data?:?rdd1Data)?{rdd1DataMap.put(data._1,?data._2);}//?獲取當前RDD數(shù)據(jù)的key以及value。String?key?=?tuple._1;String?value?=?tuple._2;//?從rdd1數(shù)據(jù)Map中,根據(jù)key獲取到可以join到的數(shù)據(jù)。Row?rdd1Value?=?rdd1DataMap.get(key);return?new?Tuple2<String,?String>(key,?new?Tuple2<String,?Row>(value,?rdd1Value));}});//?這里得提示一下。 //?上面的做法,僅僅適用于rdd1中的key沒有重復,全部是唯一的場景。 //?如果rdd1中有多個相同的key,那么就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數(shù)據(jù)進行join。 // rdd2中每條數(shù)據(jù)都可能會返回多條join后的數(shù)據(jù)。

8.6.5.3 方案實現(xiàn)原理

普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進行join,此時就是reduce join。

但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。具體原理如下圖所示。

8.6.5.4 方案優(yōu)缺點

優(yōu)點
對join操作導致的數(shù)據(jù)傾斜,效果非常好,因為根本就不會發(fā)生shuffle,也就根本不會發(fā)生數(shù)據(jù)傾斜。

缺點
適用場景較少,因為這個方案只適用于一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內(nèi)存資源,driver和每個Executor內(nèi)存中都會駐留一份小RDD的全量數(shù)據(jù)。如果我們廣播出去的RDD數(shù)據(jù)比較大,比如10G以上,那么就可能發(fā)生內(nèi)存溢出了。因此并不適合兩個都是大表的情況。

8.6.6 采樣傾斜key并分拆join操作

8.6.6.1 方案適用場景

兩個RDD/Hive表進行join的時候,如果數(shù)據(jù)量都比較大,無法采用“解決方案五”,那么此時可以看一下兩個RDD/Hive表中的key分布情況。

如果出現(xiàn)數(shù)據(jù)傾斜,是因為其中某一個RDD/Hive表中的少數(shù)幾個key的數(shù)據(jù)量過大,而另一個RDD/Hive表中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。

8.6.6.2 方案實現(xiàn)思路

對包含少數(shù)幾個數(shù)據(jù)量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統(tǒng)計一下每個key的數(shù)量,計算出來數(shù)據(jù)量最大的是哪幾個key。

然后將這幾個key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key都打上n以內(nèi)的隨機數(shù)作為前綴;

而不會導致傾斜的大部分key形成另外一個RDD。

接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應(yīng)的數(shù)據(jù)并形成一個單獨的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個0~n的前綴;

不會導致傾斜的大部分key也形成另外一個RDD。

再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。

而另外兩個普通的RDD就照常join即可。

最后將兩次join的結(jié)果使用union算子合并起來即可,就是最終的join結(jié)果。

示例如下:

//?首先從包含了少數(shù)幾個導致數(shù)據(jù)傾斜key的rdd1中,采樣10%的樣本數(shù)據(jù)。 JavaPairRDD<Long,?String>?sampledRDD?=?rdd1.sample(false,?0.1);//?對樣本數(shù)據(jù)RDD統(tǒng)計出每個key的出現(xiàn)次數(shù),并按出現(xiàn)次數(shù)降序排序。 //?對降序排序后的數(shù)據(jù),取出top 1或者top 100的數(shù)據(jù),也就是key最多的前n個數(shù)據(jù)。 //?具體取出多少個數(shù)據(jù)量最多的key,由大家自己決定,我們這里就取1個作為示范。//?每行數(shù)據(jù)變?yōu)?lt;key,1> JavaPairRDD<Long,?Long>?mappedSampledRDD?=?sampledRDD.mapToPair(new?PairFunction<Tuple2<Long,String>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<Long,?String>?tuple)throws?Exception?{return?new?Tuple2<Long,?Long>(tuple._1,?1L);}?????});//?按key累加行數(shù) JavaPairRDD<Long,?Long>?countedSampledRDD?=?mappedSampledRDD.reduceByKey(new?Function2<Long,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Long?call(Long?v1,?Long?v2)?throws?Exception?{return?v1?+?v2;}});//?反轉(zhuǎn)key和value,變?yōu)?lt;value,key> JavaPairRDD<Long,?Long>?reversedSampledRDD?=?countedSampledRDD.mapToPair(?new?PairFunction<Tuple2<Long,Long>,?Long,?Long>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Long>?call(Tuple2<Long,?Long>?tuple)throws?Exception?{return?new?Tuple2<Long,?Long>(tuple._2,?tuple._1);}});//?以行數(shù)排序key,取最多行數(shù)的key final?Long?skewedUserid?=?reversedSampledRDD.sortByKey(false).take(1).get(0)._2;//?從rdd1中分拆出導致數(shù)據(jù)傾斜的key,形成獨立的RDD。 JavaPairRDD<Long,?String>?skewedRDD?=?rdd1.filter(new?Function<Tuple2<Long,String>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?String>?tuple)?throws?Exception?{return?tuple._1.equals(skewedUserid);}});//?從rdd1中分拆出不導致數(shù)據(jù)傾斜的普通key,形成獨立的RDD。 JavaPairRDD<Long,?String>?commonRDD?=?rdd1.filter(new?Function<Tuple2<Long,String>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?String>?tuple)?throws?Exception?{return?!tuple._1.equals(skewedUserid);}?});// rdd2,就是那個所有key的分布相對較為均勻的rdd。 //?這里將rdd2中,前面獲取到的key對應(yīng)的數(shù)據(jù),過濾出來,分拆成單獨的rdd,并對rdd中的數(shù)據(jù)使用flatMap算子都擴容100倍。 //?對擴容的每條數(shù)據(jù),都打上0~100的前綴。 JavaPairRDD<String,?Row>?skewedRdd2?=?rdd2.filter(new?Function<Tuple2<Long,Row>,?Boolean>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Boolean?call(Tuple2<Long,?Row>?tuple)?throws?Exception?{return?tuple._1.equals(skewedUserid);}}).flatMapToPair(new?PairFlatMapFunction<Tuple2<Long,Row>,?String,?Row>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Iterable<Tuple2<String,?Row>>?call(Tuple2<Long,?Row>?tuple)?throws?Exception?{Random?random?=?new?Random();List<Tuple2<String,?Row>>?list?=?new?ArrayList<Tuple2<String,?Row>>();for(int?i?=?0;?i?<?100;?i++)?{list.add(new?Tuple2<String,?Row>(i?+?"_"?+?tuple._1,?tuple._2));}return?list;}});//?將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條數(shù)據(jù)都打上100以內(nèi)的隨機前綴。 //?然后將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD1?=?skewedRDD.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?String>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?String>?call(Tuple2<Long,?String>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(100);return?new?Tuple2<String,?String>(prefix?+?"_"?+?tuple._1,?tuple._2);}}).join(skewedUserid2infoRDD).mapToPair(new?PairFunction<Tuple2<String,Tuple2<String,Row>>,?Long,?Tuple2<String,?Row>>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<Long,?Tuple2<String,?Row>>?call(Tuple2<String,?Tuple2<String,?Row>>?tuple)throws?Exception?{long?key?=?Long.valueOf(tuple._1.split("_")[1]);return?new?Tuple2<Long,?Tuple2<String,?Row>>(key,?tuple._2);}});//?將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD2?=?commonRDD.join(rdd2);//?將傾斜key join后的結(jié)果與普通key join后的結(jié)果,uinon起來。 //?就是最終的join結(jié)果。 JavaPairRDD<Long,?Tuple2<String,?Row>>?joinedRDD?=?joinedRDD1.union(joinedRDD2);

8.6.6.3 方案實現(xiàn)原理

對于join導致的數(shù)據(jù)傾斜,如果只是某幾個key導致了傾斜,可以將少數(shù)幾個key分拆成獨立RDD,并附加隨機前綴打散成n份去進行join,此時這幾個key對應(yīng)的數(shù)據(jù)就不會集中在少數(shù)幾個task上,而是分散到多個task進行join了。具體原理見下圖。

8.6.6.4 方案優(yōu)缺點

優(yōu)點
對于join導致的數(shù)據(jù)傾斜,如果只是某幾個key導致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數(shù)傾斜key對應(yīng)的數(shù)據(jù)進行擴容n倍,不需要對全量數(shù)據(jù)進行擴容。避免了占用過多內(nèi)存。

缺點
如果導致傾斜的key特別多的話,比如成千上萬個key都導致數(shù)據(jù)傾斜,那么這種方式也不適合。

8.6.7 使用隨機前綴和擴容RDD進行join

8.6.7.1 方案適用場景

如果在進行join操作時,RDD中有大量的key導致數(shù)據(jù)傾斜,那么進行分拆key也沒什么意義,此時就只能使用最后一種方案來解決問題了。

8.6.7.2 方案實現(xiàn)思路

該方案的實現(xiàn)思路基本和“解決方案六”類似,首先查看RDD/Hive表中的數(shù)據(jù)分布情況,找到那個造成數(shù)據(jù)傾斜的RDD/Hive表,比如有多個key都對應(yīng)了超過1萬條數(shù)據(jù)。

然后將該RDD的每條數(shù)據(jù)都打上一個n以內(nèi)的隨機前綴。

同時對另外一個正常的RDD進行擴容,將每條數(shù)據(jù)都擴容成n條數(shù)據(jù),擴容出來的每條數(shù)據(jù)都依次打上一個0~n的前綴。

最后將兩個處理后的RDD進行join即可。

示例代碼如下:

//?首先將其中一個key分布相對較為均勻的RDD膨脹100倍。 JavaPairRDD<String,?Row>?expandedRDD?=?rdd1.flatMapToPair(new?PairFlatMapFunction<Tuple2<Long,Row>,?String,?Row>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Iterable<Tuple2<String,?Row>>?call(Tuple2<Long,?Row>?tuple)throws?Exception?{List<Tuple2<String,?Row>>?list?=?new?ArrayList<Tuple2<String,?Row>>();for(int?i?=?0;?i?<?100;?i++)?{list.add(new?Tuple2<String,?Row>(0?+?"_"?+?tuple._1,?tuple._2));}return?list;}});//?其次,將另一個有數(shù)據(jù)傾斜key的RDD,每條數(shù)據(jù)都打上100以內(nèi)的隨機前綴。 JavaPairRDD<String,?String>?mappedRDD?=?rdd2.mapToPair(new?PairFunction<Tuple2<Long,String>,?String,?String>()?{private?static?final?long?serialVersionUID?=?1L;@Overridepublic?Tuple2<String,?String>?call(Tuple2<Long,?String>?tuple)throws?Exception?{Random?random?=?new?Random();int?prefix?=?random.nextInt(100);return?new?Tuple2<String,?String>(prefix?+?"_"?+?tuple._1,?tuple._2);}});//?將兩個處理后的RDD進行join即可。 JavaPairRDD<String,?Tuple2<String,?Row>>?joinedRDD?=?mappedRDD.join(expandedRDD);

8.6.7.3 方案實現(xiàn)原理

將原先一樣的key通過附加隨機前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。

該方案與“解決方案六”的不同之處就在于,上一種方案是盡量只對少數(shù)傾斜key對應(yīng)的數(shù)據(jù)進行特殊處理,由于處理過程需要擴容RDD,因此上一種方案擴容RDD后對內(nèi)存的占用并不大;

而這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數(shù)據(jù)擴容,對內(nèi)存資源要求很高。

8.6.7.4 方案優(yōu)缺點

優(yōu)點
對join類型的數(shù)據(jù)傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。

缺點
該方案更多的是緩解數(shù)據(jù)傾斜,而不是徹底避免數(shù)據(jù)傾斜。而且需要對整個RDD進行擴容,對內(nèi)存資源要求很高。

8.6.7.5 方案實踐經(jīng)驗

曾經(jīng)開發(fā)一個數(shù)據(jù)需求的時候,發(fā)現(xiàn)一個join導致了數(shù)據(jù)傾斜。優(yōu)化之前,作業(yè)的執(zhí)行時間大約是60分鐘左右;使用該方案優(yōu)化之后,執(zhí)行時間縮短到10分鐘左右,性能提升了6倍。

8.6.8 多種方案組合使用

在實踐中發(fā)現(xiàn),很多情況下,如果只是處理較為簡單的數(shù)據(jù)傾斜場景,那么使用上述方案中的某一種基本就可以解決。但是如果要處理一個較為復雜的數(shù)據(jù)傾斜場景,那么可能需要將多種方案組合起來使用。

比如說,我們針對出現(xiàn)了多個數(shù)據(jù)傾斜環(huán)節(jié)的Spark作業(yè),可以先運用解決方案一HiveETL預(yù)處理和過濾少數(shù)導致傾斜的k,預(yù)處理一部分數(shù)據(jù),并過濾一部分數(shù)據(jù)來緩解;

其次可以對某些shuffle操作提升并行度,優(yōu)化其性能;

最后還可以針對不同的聚合或join操作,選擇一種方案來優(yōu)化其性能。

大家需要對這些方案的思路和原理都透徹理解之后,在實踐中根據(jù)各種不同的情況,靈活運用多種方案,來解決自己的數(shù)據(jù)傾斜問題。

8.7 Spark數(shù)據(jù)傾斜處理小結(jié)

總結(jié)

以上是生活随笔為你收集整理的3万字细品数据倾斜(建议收藏)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。