HIVE查询优化
所有的調(diào)優(yōu)都離不開對(duì)CPU、內(nèi)存、IO這三樣資源的權(quán)衡及調(diào)整
Hive QL的執(zhí)行本質(zhì)上是MR任務(wù)的運(yùn)行,因此優(yōu)化主要考慮到兩個(gè)方面:Mapreduce任務(wù)優(yōu)化、SQL語(yǔ)句優(yōu)化
一、Mapreduce任務(wù)優(yōu)化
1、設(shè)置合理的task數(shù)量(map task、reduce task)
這里有幾個(gè)考慮的點(diǎn),一方面Hadoop MR task的啟動(dòng)及初始化時(shí)間較長(zhǎng),如果task過(guò)多,可能會(huì)導(dǎo)致任務(wù)啟動(dòng)和初始化時(shí)間遠(yuǎn)超邏輯處理時(shí)間,這種情況白白浪費(fèi)了計(jì)算資源。另一方面,如果任務(wù)復(fù)雜,task過(guò)少又會(huì)導(dǎo)致任務(wù)遲遲不能完成,這種情況又使計(jì)算資源沒(méi)有充分利用。
因?yàn)槠渥x取輸入使用Hadoop API,所以其map數(shù)量由以下參數(shù)共同決定:
minSize = Math.max(job.getLong(“mapred.min.split.size”, 1), minSplitSize);
mapred.min.split.size:設(shè)置每個(gè)map處理的最小數(shù)據(jù)量
minSplitSize:一般默認(rèn)為都為1,可由子類復(fù)寫函數(shù)protected void setMinSplitSize(long minSplitSize) 重新設(shè)置。
blockSize:默認(rèn)的HDFS文件塊大小
goalSize=totalSize/numSplits 期望的每個(gè)Map處理的split大小,僅僅是期望的
numSplits :是在 job 啟動(dòng)時(shí)通過(guò)JobConf.setNumMapTasks(int n) 設(shè)置的值,是給框架的map數(shù)量的提示 totalSize :整個(gè)job所有輸入的總大小
splitSize的計(jì)算方式如下:
max( minSize, min(blockSize, goalSize) )
最終map數(shù)量由以下方式計(jì)算得出:
map數(shù)量=totalSize/splitSize
可以看出在調(diào)整map數(shù)量時(shí),可通過(guò)調(diào)整blockSize和mapred.min.split.size的方式實(shí)現(xiàn),但是調(diào)整blockSize可能并不現(xiàn)實(shí),所以程序執(zhí)行時(shí)通過(guò)設(shè)置mapred.min.split.size參數(shù)來(lái)設(shè)定。
當(dāng)然,需要特別注意的是,如果文件特別大,需要支持分割才能進(jìn)行分片,產(chǎn)生多個(gè)map,否則單個(gè)文件不可分割那就一個(gè)map。如果文件都特別小(比blockSize都小),可以使用CombineFileInputFormat將input path中的小文件合并成再送給mapper處理。
reduce task個(gè)數(shù)確定:
1)設(shè)置set mapred.reduce.tasks=?
2)若1)沒(méi)有設(shè)置,hive通過(guò)下面兩個(gè)參數(shù)來(lái)計(jì)算
set hive.exec.reducers.maxs=? (每個(gè)job最大的task數(shù)量)
set hive.exec.reducers.bytes.per.reducer = ? (每個(gè)reduce任務(wù)處理的數(shù)據(jù)量,默認(rèn)為1G)
2、對(duì)小文件合并
過(guò)多的小文件對(duì)于執(zhí)行引擎起map reduce任務(wù)進(jìn)行處理非常不利,一個(gè)是每個(gè)小文件起一個(gè)task去處理,非常浪費(fèi)資源,另一個(gè)是過(guò)多的小文件也對(duì)NameNode造成很大壓力(每個(gè)小文件都要記錄一個(gè)元數(shù)據(jù) 150 byte)。所以,減少小文件的數(shù)量也是一個(gè)優(yōu)化措施。
盡可能避免產(chǎn)生:
1)在執(zhí)行hive查詢時(shí)合理設(shè)置reduce的數(shù)量
2)可以使用hadoop archive命令將多個(gè)小文件進(jìn)行歸檔
3)動(dòng)態(tài)分區(qū)插入數(shù)據(jù)可能會(huì)產(chǎn)生大量小文件,所以在使用時(shí)盡量避免動(dòng)態(tài)分區(qū)
4)對(duì)map和reduce的輸出進(jìn)行merge
set hive.merge.mapfiles = true //設(shè)置map端輸出進(jìn)行合并,默認(rèn)為true
set hive.merge.mapredfiles = true //設(shè)置reduce端輸出進(jìn)行合并,默認(rèn)為false
set hive.merge.size.per.task = 25610001000 //設(shè)置合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 //當(dāng)輸出文件的平均大小小于該值時(shí),啟動(dòng)一個(gè)獨(dú)立的MapReduce任務(wù)進(jìn)行文件merge
若已產(chǎn)生小文件:
1)set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 用于設(shè)置在執(zhí)行map前對(duì)小文件進(jìn)行合并,以下參數(shù)設(shè)置決定了合并文件的大小,并最終減小map任務(wù)數(shù)
set mapred.max.split.size=256000000 //每個(gè)Map最大輸入大小(這個(gè)值決定了合并后文件的數(shù)量)
set mapred.min.split.size.per.node=100000000 //一個(gè)節(jié)點(diǎn)上split的最小大小(這個(gè)值決定了多個(gè)DataNode上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000 //一個(gè)機(jī)架下split的最小大小(這個(gè)值決定了多個(gè)機(jī)架上的文件是否需要合并)
3、避免數(shù)據(jù)傾斜
不管task數(shù)量是多少,發(fā)生數(shù)據(jù)傾斜就會(huì)大大影響查詢效率。
1)設(shè)置hive.map.aggr=true
該配置可將頂層的聚合操作放在map端進(jìn)行,以減輕reduce端的壓力。
4、并行執(zhí)行
Hive會(huì)將查詢轉(zhuǎn)化成一個(gè)或者多個(gè)階段,這些階段可能包括:MR階段、抽樣階段、合并階段、limit階段等。默認(rèn)單次執(zhí)行一個(gè)階段,可進(jìn)行如下配置,使得兩個(gè)并不沖突的階段可以并行執(zhí)行,提高查詢效率。
set hive.exec.parallel=true // 可以開啟并行執(zhí)行
set hive.exec.parallel.thread.number=16 // 同一個(gè)sql允許最大并行度,默認(rèn)為8
5、本地模式
這一情況適用于數(shù)據(jù)集小的情況,并且啟動(dòng)和運(yùn)行MR任務(wù)消耗的時(shí)間可能超過(guò)邏輯處理的時(shí)候,可配置hive在適當(dāng)情況下自動(dòng)進(jìn)行此項(xiàng)優(yōu)化
條件如下:
輸入數(shù)據(jù)量大小必須小于 hive.exec.mode.local.auto.inputbytes.max(默認(rèn)128MB)
map任務(wù)數(shù)量必須小于 hive.exec.mode.local.auto.tasks.max(默認(rèn)4)
需開啟設(shè)置:
set hive.exec.mode.local.auto=true // 默認(rèn)為false
6、JVM重用
hadoop也通過(guò)JVM來(lái)執(zhí)行map或者reduce任務(wù),當(dāng)任務(wù)非常多時(shí),啟動(dòng)并初始化任務(wù)的耗時(shí)會(huì)大大增加,有時(shí)甚至比邏輯執(zhí)行時(shí)間都要長(zhǎng),所以可通過(guò)JVM重用使得JVM實(shí)例在同一個(gè)job時(shí)可以由多task重用,避免增加過(guò)多的啟動(dòng)和初始化時(shí)間。可通過(guò)如下參數(shù)設(shè)置:
set mapred.job.reuse.jvm.num.tasks=10; // 10為重用個(gè)數(shù)
二、SQL語(yǔ)句優(yōu)化
SQL優(yōu)化有些原則是通用的,部分可參考前一篇總結(jié):
這里針對(duì)HIVE來(lái)進(jìn)行一些說(shuō)明:
1、列裁剪分區(qū)裁剪這些基本縮小查找范圍的自不必說(shuō)。
2、join優(yōu)化
1)在查詢語(yǔ)句中如果小表在join的左邊,那么hive可能將小表放入分布式緩存,實(shí)現(xiàn)map-side join。我們不用考慮哪張表是小表而調(diào)整join順序,新版hive提供了 hive.auto.convert.join 可以來(lái)優(yōu)化,默認(rèn)是啟動(dòng)的。
hive.mapjoin.smalltable.filesize=25000000(默認(rèn)值) // 小于該值的表在join時(shí)被hive認(rèn)為是小表
2)在特定情況下,大表也可以使用map-side join,需要滿足以下條件
- 表數(shù)據(jù)必須按照ON語(yǔ)句中的鍵進(jìn)行分桶
- 其中一張表的分桶數(shù)必須是另一張表的分桶數(shù)的倍數(shù)
hive這種情況下在map階段可以進(jìn)行分桶連接,這一功能需要通過(guò)如下設(shè)置進(jìn)行開啟
set hive.enforce.sortmergebucketmapjoin=true
如果分桶表數(shù)據(jù)是按照連接鍵或者桶的鍵進(jìn)行排序的,hive可以進(jìn)行一個(gè)更快的合并-連接(sort-merge join),需要如下配置
hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat
hive.auto.convert.sortmerge.join=true
hive.enforce.sortmergebucketmapjoin=true
3、group by操作
可先進(jìn)行map端部分聚合,然后在reduce端最終聚合,進(jìn)行如下設(shè)置
set hive.map.aggr=true; // 開啟Map端聚合參數(shù)設(shè)置
set hive.grouby.mapaggr.checkinterval=100000; // 聚合的鍵對(duì)應(yīng)的記錄條數(shù)超過(guò)這個(gè)值則會(huì)進(jìn)行分拆
如下設(shè)置可在出現(xiàn)數(shù)據(jù)傾斜時(shí)分兩個(gè)MR作業(yè)來(lái)完成,而不是在一個(gè)job中完成
set hive.groupby.skewindata = true (默認(rèn)為false) // 在第一步mapreduce中map的結(jié)果隨機(jī)分布于到reduce,于是這一步中每個(gè)reduce只做部分聚合,在下一個(gè)mapreduce中再進(jìn)行最終聚合。
4、使用LEFT SEMI JOIN (左半連接)替代 IN/EXISTS 子查詢
hive中前者 LEFT SEMI JOIN 是后者 IN/EXISTS 更高效的實(shí)現(xiàn)
例如:
select a.id, a.name from a where a.id in (select b.id from b);
應(yīng)該改為 select a.id, a.name from a left semi job b on a.id=b.id;
5、選擇合適的分區(qū)、分桶、排序方式
distribute by : 用來(lái)在map端按鍵對(duì)數(shù)據(jù)進(jìn)行拆分,根據(jù)reduce的個(gè)數(shù)進(jìn)行數(shù)據(jù)分發(fā),默認(rèn)是采用hash算法
cluster by :除了有distribute by的功能外,還能對(duì)查詢結(jié)果進(jìn)行排序,等于distribute by + sort by
sort by :數(shù)據(jù)在進(jìn)入reducer之前就排好序,根據(jù)指定值對(duì)進(jìn)入到同一reducer的所有行進(jìn)行排序
order by :對(duì)所有數(shù)據(jù)進(jìn)行排序(全局有序),若數(shù)據(jù)量很大,可能需要很多時(shí)間
注:嚴(yán)格模式( hive.mapred.mode=strict 默認(rèn)為 nonstrict ) 用于 hive 在特定情況下阻止任務(wù)的提交,針對(duì)以下三種情況
(1)對(duì)于分區(qū)表,不加分區(qū)字段過(guò)濾條件,不能執(zhí)行
(2)對(duì)于order by語(yǔ)句,必須使用limit語(yǔ)句
(3)限制笛卡爾積的查詢(join的時(shí)候不使用on,而使用where的)
三、其它
1、存儲(chǔ)格式
優(yōu)先選擇列式存儲(chǔ)格式如orc、parquet,這將使hive查詢時(shí)避免全部掃描,列數(shù)據(jù)存儲(chǔ)在一起,只需拿相應(yīng)列即可,能夠大大縮短響應(yīng)時(shí)間
2、壓縮方式
在mapreduce的世界中,性能瓶頸主要來(lái)自于磁盤IO和網(wǎng)絡(luò)IO,CPU基本不構(gòu)成壓力,根據(jù)需要選擇合適的壓縮方式,如下圖可參考
3、vectorized query( 向量化查詢 Hive 0.13中引入 )
通過(guò)一次性批量執(zhí)行1024行而不是每次單行執(zhí)行,提供掃描、聚合、篩選器和連接等操作的性能。通過(guò)如下方式啟用:
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
4、CBO(cost based query optimization)基于成本的優(yōu)化
hive 0.14.0 加入,hive1.1.0 后默認(rèn)開啟,hive底層自動(dòng)優(yōu)化多個(gè)join的順序并選擇合適的join算法,優(yōu)化每個(gè)查詢的執(zhí)行邏輯和物理執(zhí)行計(jì)劃。使用時(shí)需進(jìn)行如下設(shè)置
set hive.cbo.enable = true;
set hive.compute.query.using.stats = true;
set hive.stats.fetch.column.stats = true;
set hive.stats.fetch.partition.stats = true;
5、推測(cè)執(zhí)行
偵測(cè)執(zhí)行緩慢的任務(wù)(通常由于負(fù)載不均衡或者資源分布原因?qū)е?#xff09;,通過(guò)執(zhí)行其備份任務(wù)對(duì)相同數(shù)據(jù)進(jìn)行重算,加速獲取單個(gè)task的結(jié)果,以此來(lái)提高整體的執(zhí)行效率,可由如下參數(shù)控制:
set mapred.map.tasks.speculative.execution=true
set mapred.reduce.tasks.speculative.execution=true
以上內(nèi)容,如有錯(cuò)誤,請(qǐng)看到的小伙伴幫忙指正,多謝。
總結(jié)
- 上一篇: 使用medusa进行ssh爆破
- 下一篇: 跳槽没有20%以上的加薪就等于降薪?我: