spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案
Spark SQL小文件是指文件大小顯著小于hdfs block塊大小的的文件。過于繁多的小文件會(huì)給HDFS帶來很嚴(yán)重的性能瓶頸,對(duì)任務(wù)的穩(wěn)定和集群的維護(hù)會(huì)帶來極大的挑戰(zhàn)。
一般來說,通過Hive調(diào)度的MR任務(wù)都可以簡(jiǎn)單設(shè)置如下幾個(gè)小文件合并的參數(shù)來解決任務(wù)產(chǎn)生的小文件問題:
set?hive.merge.mapfiles=true;set?hive.merge.mapredfiles=true;
set?hive.merge.size.per.task=xxxx;
set?hive.merge.smallfiles.avgsize=xxx;
然而在我們將離線調(diào)度任務(wù)逐步從Hive遷移到Spark的過程中,由于Spark本身并不支持小文件合并功能,小文件問題日益突出,對(duì)集群穩(wěn)定性造成很大影響,一度阻礙了我們的遷移工作。
為了解決小文件問題,我們經(jīng)歷了從開始的不斷調(diào)整參數(shù)到后期的代碼開發(fā)等不同階段,這里給大家做一個(gè)簡(jiǎn)單的分享。
1. Spark為什么會(huì)產(chǎn)生小文件
Spark生成的文件數(shù)量直接取決于RDD里partition的數(shù)量和表分區(qū)數(shù)量。注意這里的兩個(gè)分區(qū)概念并不相同,RDD的分區(qū)與任務(wù)并行度相關(guān),而表分區(qū)則是Hive的分區(qū)數(shù)目。生成的文件數(shù)目一般是RDD分區(qū)數(shù)和表分區(qū)的乘積。因此,當(dāng)任務(wù)并行度過高或者分區(qū)數(shù)目很大時(shí),很容易產(chǎn)生很多的小文件。
圖1:Spark RDD分區(qū)數(shù)
因此,如果需要從參數(shù)調(diào)整來減少生成的文件數(shù)目,就只能通過減少最后一個(gè)階段RDD的分區(qū)數(shù)來達(dá)到了(減少分區(qū)數(shù)目限制于歷史數(shù)據(jù)和上下游關(guān)系,難以修改)
2. 基于社區(qū)版本的參數(shù)進(jìn)行調(diào)整的方案
2.1?不含有Shuffle算子的簡(jiǎn)單靜態(tài)分區(qū)SQL?
這樣的SQL比較簡(jiǎn)單,主要是filter上游表一部分?jǐn)?shù)據(jù)寫入到下游表,或者是兩張表簡(jiǎn)單UNION起來的任務(wù),這種任務(wù)的分區(qū)數(shù)目主要是由讀取文件時(shí)Partition數(shù)目決定的。
?因?yàn)閺腟park 2.4以來,對(duì)Hive orc表和parquet支持已經(jīng)很不錯(cuò)了,為了加快運(yùn)行速率,我們開啟了將Hive orc/parquet表自動(dòng)轉(zhuǎn)為DataSource的參數(shù)。對(duì)于這種DataSource表的類型,partition數(shù)目主要是由如下三個(gè)參數(shù)控制其關(guān)系。
spark.sql.files.opencostinbytes;
spark.default.parallelism;
其關(guān)系如下圖所示,因此可以通過調(diào)整這三個(gè)參數(shù)來輸入數(shù)據(jù)的分片進(jìn)行調(diào)整:
? ? ? ?
而非DataSource表,使用CombineInputFormat來讀取數(shù)據(jù),因此主要是通過MR參數(shù)來進(jìn)行分片調(diào)整:
mapreduce.input.fileinputformat.split.minsize
雖然我們可以通過調(diào)整輸入數(shù)據(jù)的分片來對(duì)最終文件數(shù)量進(jìn)行調(diào)整,但是這樣的調(diào)整是不穩(wěn)定的,上游數(shù)據(jù)大小發(fā)生一些輕微的變化,就可能帶來參數(shù)的重新適配。
為了簡(jiǎn)單粗暴的解決這個(gè)問題,我們對(duì)這樣的SQL加了repartition的hint,引入了新的shuffle,保證文件數(shù)量是一個(gè)固定值。
2.2?帶有Shuffle算子的靜態(tài)分區(qū)任務(wù)?
在ISSUE SPARK-9858中,引入了一個(gè)新的參數(shù):
spark.sql.adaptive.shuffle.targetPostShuffleInputSize,
后期基于spark adaptive又對(duì)這個(gè)參數(shù)做了進(jìn)一步增強(qiáng),可以動(dòng)態(tài)的調(diào)整partition數(shù)量,盡可能保證每個(gè)task處理targetPostShuffleInputSize大小的數(shù)據(jù),因此這個(gè)參數(shù)我們也可以用來在一定程度上控制生成的文件數(shù)量。
2.3?動(dòng)態(tài)分區(qū)任務(wù)??
動(dòng)態(tài)分區(qū)任務(wù)因?yàn)榇嬖谥謪^(qū)這一變量,單純調(diào)整rdd這邊的partition數(shù)目很難把控整體的文件數(shù)量。
在hive里,我們可以通過設(shè)置hive.optimize.sort.dynamic.partition來緩解動(dòng)態(tài)分區(qū)產(chǎn)生文件過多導(dǎo)致任務(wù)執(zhí)行時(shí)task節(jié)點(diǎn)經(jīng)常oom的狀況。這樣的參數(shù)會(huì)引入新的的shuffle,來對(duì)數(shù)據(jù)進(jìn)行重排序,將相同的partition分給同一個(gè)task處理,從而避免了一個(gè)task同時(shí)持有多個(gè)文件句柄。
因此,我們可以借助這樣的思想,使用distribute by語句來修改sql,從而控制文件數(shù)量。一般而言,假設(shè)我們想對(duì)于每個(gè)分區(qū)生成不超過N個(gè)文件,則可以在SQL末尾增加DISTRIBUTE BY [動(dòng)態(tài)分區(qū)列],ceil(rand() * N)。
3. 自研可合并文件的commitProtocol方案
綜上種種,每個(gè)方法都存在一定的弊端,眾多規(guī)則也在實(shí)際使用過程中對(duì)業(yè)務(wù)方造成很大困擾。
因此我們產(chǎn)生了想在spark這邊實(shí)現(xiàn)和hive類似的小文件合并機(jī)制。在幾個(gè)可能的方案選型中,我們最終選擇了:重寫spark.sql.sources.commitProtocolClass方法。
一方面,該方案對(duì)Spark代碼無侵入,便于Spark源碼的維護(hù),另一方面,該方案對(duì)業(yè)務(wù)方使用友好,可以動(dòng)態(tài)通過set命令設(shè)置,如果出現(xiàn)問題回滾也十分方便。業(yè)務(wù)方在使用過程中,只需要簡(jiǎn)單設(shè)置:
spark.sql.sources.commitProtocolClass,即可控制是否開啟小文件合并。
在開啟小文件合并參數(shù)后,我們會(huì)在commit階段拿到生成的所有文件,引入兩個(gè)新的job來對(duì)這些文件進(jìn)行處理。首先我們?cè)诘谝粋€(gè)job獲取到所有大小小于spark.compact.smallfile.size的文件,在查找完成后按照spark.compact.size參數(shù)值對(duì)組合文件,并在第二個(gè)job中對(duì)這些文件進(jìn)行合并。
☆?END?☆
招聘信息OPPO互聯(lián)網(wǎng)技術(shù)團(tuán)隊(duì)招聘一大波崗位,涵蓋C++、Go、OpenJDK、Java、DevOps、容器、Linux內(nèi)核開發(fā)、產(chǎn)品經(jīng)理、項(xiàng)目經(jīng)理等多個(gè)方向,請(qǐng)?jiān)诠娞?hào)后臺(tái)回復(fù)關(guān)鍵詞“招聘”查看查詳細(xì)信息。
你可能還喜歡OPPO自研ESA DataFlow架構(gòu)與實(shí)踐
OPPO 實(shí)時(shí)數(shù)倉揭秘:從頂層設(shè)計(jì)實(shí)現(xiàn)離線與實(shí)時(shí)的平滑遷移
OPPO異地多活實(shí)踐——緩存篇
OPPO百萬級(jí)高并發(fā)MongoDB集群性能數(shù)十倍提升優(yōu)化實(shí)踐(上)
OPPO百萬級(jí)高并發(fā)MongoDB集群性能數(shù)十倍提升優(yōu)化實(shí)踐(下)
更多技術(shù)干貨
掃碼關(guān)注
OPPO互聯(lián)網(wǎng)技術(shù)
?我就知道你“在看”總結(jié)
以上是生活随笔為你收集整理的spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: qt显示rgba8888 如何改 fra
- 下一篇: centos8安装MySQL依赖_cen