Hive On Spark优化
1.Hive官方建議的Hive On Spark優(yōu)化
?mapreduce.input.fileinputformat.split.maxsize=750000000hive.vectorized.execution.enabled=true?hive.cbo.enable=truehive.optimize.reducededuplication.min.reducer=4hive.optimize.reducededuplication=truehive.orc.splits.include.file.footer=falsehive.merge.mapfiles=truehive.merge.sparkfiles=falsehive.merge.smallfiles.avgsize=16000000hive.merge.size.per.task=256000000hive.merge.orcfile.stripe.level=truehive.auto.convert.join=truehive.auto.convert.join.noconditionaltask=truehive.auto.convert.join.noconditionaltask.size=894435328hive.optimize.bucketmapjoin.sortedmerge=falsehive.map.aggr.hash.percentmemory=0.5hive.map.aggr=truehive.optimize.sort.dynamic.partition=falsehive.stats.autogather=truehive.stats.fetch.column.stats=truehive.vectorized.execution.reduce.enabled=falsehive.vectorized.groupby.checkinterval=4096hive.vectorized.groupby.flush.percent=0.1hive.compute.query.using.stats=truehive.limit.pushdown.memory.usage=0.4hive.optimize.index.filter=truehive.exec.reducers.bytes.per.reducer=67108864hive.smbjoin.cache.rows=10000hive.exec.orc.default.stripe.size=67108864hive.fetch.task.conversion=morehive.fetch.task.conversion.threshold=1073741824hive.fetch.task.aggr=falsemapreduce.input.fileinputformat.list-status.num-threads=5spark.kryo.referenceTracking=falsespark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCDH 建議的hive on spark優(yōu)化
?https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/admin_hos_tuning.html#hos_tuning2.集群規(guī)劃
HA: HDFS,YARN
2臺(tái),3臺(tái)節(jié)點(diǎn) ,運(yùn)行master
其他節(jié)點(diǎn)運(yùn)行 Datanode , Nodemanger 。
Kafka 一般選擇 擁有SSD的節(jié)點(diǎn)。
中小公司 10 - 20 足夠。
3.YARN配置
單個(gè)Nodemanger貢獻(xiàn)給YARN的core,mem
?在Nodemanger節(jié)點(diǎn),配置yarn-site.xml?yarn.nodemanager.resource.memory-mb?yarn.nodemanager.resource.cpu-vcoresYARN的資源總和等于所有NM提供的core和mem的總和。
4.Container資源的上下限
?yarn.scheduler.maximum-allocation-mbyarn.scheduler.minimum-allocation-mb?yarn.scheduler.minimum-allocation-cpu-vcoresyarn.scheduler.maximum-allocation-cpu-vcores需要參考集群中所運(yùn)行的最大App需要的上下限。
配置上下限后,所有提交的APP所申請(qǐng)的資源,不能超過(guò)Container的上下限,超過(guò),提交失敗。
5.Executor的參數(shù)
CPU: 每個(gè)Executor運(yùn)行的容器中,配置core >= 4。 如果希望YARN的資源充分利用(沒(méi)有空閑), CPU設(shè)置為 集群總CPU數(shù)的因子(約數(shù))
Executor內(nèi)存: 業(yè)界經(jīng)驗(yàn),CPU : MEM = 1:4 。
Driver內(nèi)存:
?yarn.nodemanager.resource.memory-mb設(shè)置為X,若X>50G,則Driver可設(shè)置為12G,若12G<X<50G,則Driver可設(shè)置為4G。若1G<X<12G,則Driver可設(shè)置為1G。Spark的內(nèi)存模型,每個(gè)容器中的內(nèi)存,不管是Executor還是Driver,都要默認(rèn)使用 10%作為預(yù)留內(nèi)存。
配置預(yù)留內(nèi)存 : ceil( 容器中申請(qǐng)的總的內(nèi)存數(shù) / 10 )
spark.executor.memoryOverhead
spark.yarn.driver.memoryOverhead
配置非預(yù)留內(nèi)存: 容器中申請(qǐng)的總的內(nèi)存數(shù) - ceil( 容器中申請(qǐng)的總的內(nèi)存數(shù) / 10 )
spark.driver.memory
spark.executor.memory
如果希望只是影響Hive On Spark,在Hive的conf目錄中的 spark-defaults.conf中配置
如果希望影響所有的SparkApp,在Spark的conf目錄中的 spark-defaults.conf中配置
6.Executor的個(gè)數(shù)
靜態(tài)配置: 指定起多少個(gè)
?--num-executors n?--conf spark.executor.instances n動(dòng)態(tài)配置:
具體參考3.3
?spark.dynamicAllocation.enabled truespark.shuffle.service.enabled truespark.dynamicAllocation.executorIdleTimeout 60sspark.dynamicAllocation.initialExecutors ? 1spark.dynamicAllocation.minExecutors 1spark.dynamicAllocation.maxExecutors 11spark.dynamicAllocation.schedulerBacklogTimeout 1sspark.shuffle.useOldFetchProtocol ? true....7.group by優(yōu)化
什么都不用干,默認(rèn)開(kāi)啟了map端聚合
?set hive.map.aggr=true;8.Join優(yōu)化
map join 僅僅適用于 大表 join 小表
大表 join 大表 : 表不是分桶表,只能走 common(reduce) join 如果都是分桶表,分桶的字段就是join的字段,走 SMB join 大表 join 小表 : 表不是分桶表,能走 map join 是分桶表,分桶的字段就是join的字段, 走 bucket map join(特殊情形的map join)
?--啟用map join自動(dòng)轉(zhuǎn)換set hive.auto.convert.join=true;--common join轉(zhuǎn)map join小表閾值set hive.auto.convert.join.noconditionaltask.size = xxxx 參考小表的加入到內(nèi)存中大小 rawDataSizemap join涵蓋數(shù)倉(cāng)中99%的場(chǎng)景。
數(shù)倉(cāng)中基于維度建模創(chuàng)建表,最經(jīng)典的是 星型模型(99%的業(yè)務(wù)場(chǎng)景)
事實(shí)表 join 維度表
9.數(shù)據(jù)傾斜優(yōu)化
小文件和文件小是兩回事。
1B的文件: 如果數(shù)據(jù)總量就是1B , 就是文件小。
如果數(shù)據(jù)總量是1TB, 就是小文件。
本質(zhì): 在shuffle后,大key被分導(dǎo)了一個(gè)ReduceTask,造成這個(gè)ReduceTask運(yùn)行時(shí)間遠(yuǎn)遠(yuǎn)大于其他的ReduceTask,拖累全局。
第一種: 避免大key被shuffle。
16行 是key 大
如果group by場(chǎng)景,開(kāi)啟Map端聚合
第二種: 避免shuffle
join時(shí)傾斜,直接使用MapJoin
第三種: 開(kāi)啟skewjoin(hive默認(rèn)不推薦,效果不明顯)
10.并行度
默認(rèn)使用 CombineHiveInputFormat把輸入目錄中的所有文件合并成一個(gè)整體,以整體為大小去切片。
不修改,防止輸入的小文件過(guò)多。
能調(diào)整:
?set mapreduce.input.fileinputformat.split.maxsize=256M;?希望調(diào)大并行度,設(shè)置以上值變小。反之,調(diào)小。reduce端:
?熟悉數(shù)據(jù),自己設(shè)置?set mapreduce.job.reduces=n;?不熟悉數(shù)據(jù),讓hive推測(cè)(基于準(zhǔn)確的統(tǒng)計(jì)信息去推測(cè)):set mapreduce.job.reduces=-1;--執(zhí)行DML語(yǔ)句時(shí),收集表級(jí)別的統(tǒng)計(jì)信息set hive.stats.autogather=true;--執(zhí)行DML語(yǔ)句時(shí),收集字段級(jí)別的統(tǒng)計(jì)信息set hive.stats.column.autogather=true;--計(jì)算Reduce并行度時(shí),從上游Operator統(tǒng)計(jì)信息獲得輸入數(shù)據(jù)量set hive.spark.use.op.stats=true;--計(jì)算Reduce并行度時(shí),使用列級(jí)別的統(tǒng)計(jì)信息估算輸入數(shù)據(jù)量set hive.stats.fetch.column.stats=true;11.小文件優(yōu)化
?輸出合并小文件:set hive.merge.sparkfiles=true;12.默認(rèn)
?矢量化,只有對(duì)ORC類型的文件才有效總結(jié)
以上是生活随笔為你收集整理的Hive On Spark优化的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【架构设计】阿里开源COLA 4.0 -
- 下一篇: MCC配置问题