日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优

發布時間:2023/12/10 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

侯亞南

數據技術處

支宸嘯

數據技術處

在大數據計算中,我們可能會遇到一個很棘手的問題——數據傾斜,此時spark任務的性能會比預期要差很多:絕大多數task都很快執行完成,但個別task執行極慢或者報OOM(內存溢出)。數據傾斜調優,就是使用各種技術方案解決不同類型的數據傾斜問題,以保證Spark作業的性能。

01

#??原 理??#

數據傾斜只會發生在shuffle過程中,在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合(groupByKey、reduceByKey、aggregateByKey)或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜,導致個別task執行極慢,整個Spark作業的運行進度是由運行時間最長的那個task決定的。

02

#??定位問題??#

當某個task運行過慢時,需要定位數據傾斜發生在第幾個stage中。如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到當前運行到了第幾個stage;如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當前運行到了第幾個stage。此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。

03

#??解決方案??#

3.1? 使用hive ETL進行預處理

如果hive表中的數據本身很不均勻,而且業務場景需要頻繁使用Spark對hive表執行某個分析操作,此時可以嘗試通過hive來進行預處理(即通過hive ETL預先對數據按照key進行聚合,或者是預先和其他表進行join),然后在Spark作業中針對的數據源是預處理后的hive表,這樣在Spark作業中就不需要使用原先的shuffle類算子執行這類操作了。但這種方式屬于治標不治本,只是把數據傾斜的發生提前到了hive ETL中,避免Spark程序發生數據傾斜而已。

3.2? 過濾少數導致傾斜的key

如果發現導致傾斜的key就少數幾個,而且對計算本身的影響并不大的話,那么可以過濾掉這些key。比如,在Spark SQL中可以使用where子句過濾掉這些key,或者在Spark Core中對RDD使用filter算子過濾掉這些key。如果需要通過動態判定哪些key的數據量最多來進行過濾,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數量,取數據量最多的key過濾掉即可。該方案實現簡單,而且效果也很好,可以完全規避掉數據傾斜。但大多數情況下,導致數據傾斜的key還是很多的,并不是只有少數幾個。

3.3? 提高shuffle操作的并行度

此方案是一種對數據傾斜迎難而上的方案,通過增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據,可以通過修改spark.sql.shuffle.partitions的值來增加shuffle read task的并行度。此方案可以有效緩解數據傾斜,但是沒有徹底解決問題,如果出現極端情況,比如某個key對應的數據量巨大,那么無論task數量增加到多少,這個key對應的數據還是可能會分配到一個task中去處理。

3.4? 局部聚合+全局聚合

該方案適用于對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合。通過將原本相同的key附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題,最后再去除掉隨機前綴,進行全局聚合,就可以得到最終的結果。但如果是join類的shuffle操作,還得用其他的解決方案。

3.5? 將reduce join轉為map join

此方案適用于大表join小表的情況,通過不使用join算子進行連接操作,而使用Broadcast變量與map類算子來實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。通過將較小RDD中的數據直接用collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照key進行比對。如果key相同的話,那么就將兩個RDD的數據按照需要的方式連接起來。此方案不適用于兩個大表join的情況。

3.6? 采樣傾斜key并分拆join操作

如果兩個RDD/hive表進行join的時候,數據量都比較大,其中某一個RDD/hive表中的少數幾個key的數據量過大,而另一個RDD/hive表中的所有key都分布比較均勻,可以將數據量大的幾個key分拆成獨立RDD,并附加隨機前綴打散成n份去進行join。此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task上去join了,最后將結果使用union算子合并起來即可。但如果導致傾斜的key特別多的話,不適合該方案。

3.7? 使用隨機前綴和擴容RDD進行join

如果在進行join操作的時候,RDD中有大量的key導致數據傾斜,可以將該RDD的每條數據都打上一個n以內的隨機前綴,同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴,最后將兩個處理后的RDD進行join即可。該方案與上一種方案的不同之處在于,上種方案是盡量只對少數傾斜key對應的數據進行特殊處理(擴容RDD),對內存的占用并不大;而該方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,只能對整個RDD進行數據擴容,對內存資源要求很高。

實際項目中,應該綜合分析數據的特征、需要進行的操作等來合理選取方案,可以多種方案組合使用。

推薦閱讀

大數據技術初探之sparkstreaming與flink技術對比
技術分享|大數據技術初探之流計算框架

總結

以上是生活随笔為你收集整理的大表与大表join数据倾斜_技术分享|大数据技术初探之Spark数据倾斜调优的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。