Hive Join Strategies hive的连接策略
2019獨角獸企業重金招聘Python工程師標準>>>
Common Join
最為普通的join策略,不受數據量的大小影響,也可以叫做reduce side join ,最沒效率的一種join 方式. 它由一個mapreduce job 完成.
首先將大表和小表分別進行map 操作, 在map shuffle 的階段每一個map output key 變成了table_name_tag_prefix + join_column_value , 但是在進行partition 的時候它仍然只使用join_column_value 進行hash.
每一個reduce 接受所有的map 傳過來的split , 在reducce 的shuffle 階段,它將map output key 前面的table_name_tag_prefix 給舍棄掉進行比較. 因為reduce 的個數可以由小表的大小進行決定,所以對于每一個節點的reduce 一定可以將小表的split 放入內存變成hashtable. 然后將大表的每一條記錄進行一條一條的比較.
真正的Join在reduce階段。
MapJoin
Map Join 的計算步驟分兩步,將小表的數據變成hashtable廣播到所有的map 端,將大表的數據進行合理的切分,然后在map 階段的時候用大表的數據一行一行的去探測(probe) 小表的hashtable. 如果join key 相等,就寫入HDFS.
map join 之所以叫做map join 是因為它所有的工作都在map 端進行計算.
hive 在map join 上做了幾個優化:
hive 0.6 的時候默認認為寫在select 后面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示進行設定. hive 0.7 的時候這個計算是自動化的,它首先會自動判斷哪個是小表,哪個是大表,這個參數由(hive.auto.convert.join=true)來控制. 然后控制小表的大小由(hive.smalltable.filesize=25000000L)參數控制(默認是25M),當小表超過這個大小,hive 會默認轉化成common join. 你可以查看HIVE-1642.
首先小表的Map 階段它會將自己轉化成MapReduce Local Task ,然后從HDFS 取小表的所有數據,將自己轉化成Hashtable file 并壓縮打包放入DistributedCache 里面.
目前hive 的map join 有幾個限制,一個是它打算用BloomFilter 來實現hashtable , BloomFilter 大概比hashtable 省8-10倍的內存, 但是BloomFilter 的大小比較難控制.
現在DistributedCache 里面hashtable默認的復制是3份,對于一個有1000個map 的大表來說,這個數字太小,大多數map 操作都等著DistributedCache 復制.
優化后的map-join
Converting Common Join into Map Join
判斷誰是大表誰是小表(小表的標準就是size小于hive.mapjoin.smalltable.filesize的值)
Hive在Compile階段的時候對每一個common join會生成一個conditional task,并且對于每一個join table,會假設這個table是大表,生成一個mapjoin task,然后把這些mapjoin tasks裝進
conditional task(List<Task<? extends Serializable>> resTasks),同時會映射大表的alias和對應的mapjoin task。在runtime運行時,resolver會讀取每個table alias對應的input file size,如果小表的file size比設定的threshold要低 (hive.mapjoin.smalltable.filesize,默認值為25M),那么就會執行converted mapjoin task。對于每一個mapjoin task同時會設置一個backup task,就是先前的common join task,一旦mapjoin task執行失敗了,則會啟用backup task
Performance Bottleneck
性能瓶頸
1、Distributed Cache is the potential performance bottleneck
分布式緩存是一個潛在的性能瓶頸
A、Large hashtable file will slow down the propagation of Distributed Cache
大的hashtable文件將會減速分布式緩存的傳播
B、Mappers are waiting for the hashtables file from Distributed Cache
Mapper排隊等待從分布式緩存獲取hashtables(因為默認一個hashtable緩存是三份,如果mappers數量太多需要一個一個的等待)
2、Compress and archive all the hashtable file into a tar file.
壓縮和歸檔所有的hashtable文件為一個tar文件。
Bucket Map Join
Why:
Total table/partition size is big, not good for mapjoin.
How:
set hive.optimize.bucketmapjoin = true;
1. Work together with map join
2. All join tables are bucketized, and each small table?s bucket number can be divided by big table?s bucket number.
所有join的表是bucketized并且小表的bucket數量是大表bucket數量的整數倍
3. Bucket columns == Join columns
hive 建表的時候支持hash 分區通過指定clustered by (col_name,xxx ) into number_buckets buckets 關鍵字.
當連接的兩個表的join key 就是bucket column 的時候,就可以通過
hive.optimize.bucketmapjoin= true
來控制hive 執行bucket map join 了, 需要注意的是你的小表的number_buckets 必須是大表的倍數. 無論多少個表進行連接這個條件都必須滿足.(其實如果都按照2的指數倍來分bucket, 大表也可以是小表的倍數,不過這中間需要多計算一次,對int 有效,long 和string 不清楚)
Bucket Map Join 執行計劃分兩步,第一步先將小表做map 操作變成hashtable 然后廣播到所有大表的map端,大表的map端接受了number_buckets 個小表的hashtable并不需要合成一個大的hashtable,直接可以進行map 操作,map 操作會產生number_buckets 個split,每個split 的標記跟小表的hashtable 標記是一樣的, 在執行projection 操作的時候,只需要將小表的一個hashtable 放入內存即可,然后將大表的對應的split 拿出來進行判斷,所以其內存限制為小表中最大的那個hashtable 的大小.
Bucket Map Join 同時也是Map Side Join 的一種實現,所有計算都在Map 端完成,沒有Reduce 的都被叫做Map Side Join ,Bucket 只是hive 的一種hash partition 的實現,另外一種當然是值分區.
create table a (xxx) partition by (col_name)
不過一般hive 中兩個表不一定會有同一個partition key, 即使有也不一定會是join key. 所以hive 沒有這種基于值的map side join, hive 中的list partition 主要是用來過濾數據的而不是分區. 兩個主要參數為(hive.optimize.cp = true 和 hive.optimize.pruner=true)
hadoop 源代碼中默認提供map side join 的實現, 你可以在hadoop 源碼的src/contrib/data_join/src 目錄下找到相關的幾個類. 其中TaggedMapOutput 即可以用來實現hash 也可以實現list , 看你自己決定怎么分區. Hadoop Definitive Guide 第8章關于map side join 和side data distribution 章節也有一個例子示例怎樣實現值分區的map side join.
上圖解釋:b表是大表,a,c是小表并且都是整數倍,將a,c表加入內存先join然后到每個b表的map去做匹配。
Sort Merge Bucket Map Join
Why:
No limit on file/partition/table size.
How:
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
1.Work together with bucket map join
將bucket加入到map join中
2.Bucket columns == Join columns == sort columns
Bucket Map?Join?并沒有解決map?join 在小表必須完全裝載進內存的限制, 如果想要在一個reduce 節點的大表和小表都不用裝載進內存,必須使兩個表都在join?key 上有序才行,你可以在建表的時候就指定sorted byjoin?key?或者使用index 的方式.
做法還是兩邊要做hash bucket,而且每個bucket內部要進行排序。這樣一來當兩邊bucket要做局部join的時候,只需要用類似merge sort算法中的merge操作一樣把兩個bucket順序遍歷一遍即可完成,這樣甚至都不用把一個bucket完整的加載成hashtable,這對性能的提升會有很大幫助。
set hive.optimize.bucketmapjoin?= true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
Bucket columns ==?Join?columns == sort columns
這樣小表的數據可以每次只讀取一部分,然后還是用大表一行一行的去匹配,這樣的join?沒有限制內存的大小. 并且也可以執行全外連接.
Skew Join
Join bottlenecked on the reducer who gets the
skewed key
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold
轉載于:https://my.oschina.net/CostBasedOptimizatio/blog/388277
總結
以上是生活随笔為你收集整理的Hive Join Strategies hive的连接策略的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: find : 路径必须在表达式之前
- 下一篇: android 开发 命名规范