日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hive JOIN使用详解

發布時間:2024/1/17 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hive JOIN使用详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hive是基于Hadoop平臺的,它提供了類似SQL一樣的查詢語言HQL。有了Hive,如果使用過SQL語言,并且不理解Hadoop MapReduce運行原理,也就無法通過編程來實現MR,但是你仍然可以很容易地編寫出特定查詢分析的HQL語句,通過使用類似SQL的語法,將HQL查詢語句提交Hive系統執行查詢分析,最終Hive會幫你轉換成底層Hadoop能夠理解的MR Job。
對于最基本的HQL查詢我們不再累述,這里主要說明Hive中進行統計分析時使用到的JOIN操作。在說明Hive JOIN之前,我們先簡單說明一下,Hadoop執行MR Job的基本過程(運行機制),能更好的幫助我們理解HQL轉換到底層的MR Job后是如何執行的。我們重點說明MapReduce執行過程中,從Map端到Reduce端這個過程(Shuffle)的執行情況,如圖所示(來自《Hadoop: The Definitive Guide》):

基本執行過程,描述如下:

  • 一個InputSplit輸入到map,會運行我們實現的Mapper的處理邏輯,對數據進行映射操作。
  • map輸出時,會首先將輸出中間結果寫入到map自帶的buffer中(buffer默認大小為100M,可以通過io.sort.mb配置)。
  • map自帶的buffer使用容量達到一定門限(默認0.80或80%,可以通過io.sort.spill.percent配置),一個后臺線程會準備將buffer中的數據寫入到磁盤。
  • 這個后臺線程在將buffer中數據寫入磁盤之前,會首先將buffer中的數據進行partition(分區,partition數為Reducer的個數),對于每個的數據會基于Key進行一個in-memory排序。
  • 排序后,會檢查是否配置了Combiner,如果配置了則直接作用到已排序的每個partition的數據上,對map輸出進行化簡壓縮(這樣寫入磁盤的數據量就會減少,降低I/O操作開銷)。
  • 現在可以將經過處理的buffer中的數據寫入磁盤,生成一個文件(每次buffer容量達到設置的門限,都會對應著一個寫入到磁盤的文件)。
  • map任務結束之前,會對輸出的多個文件進行合并操作,合并成一個文件(若map輸出至少3個文件,在多個文件合并后寫入之前,如果配置了Combiner,則會運行來化簡壓縮輸出的數據,文件個數可以通過min.num.splits.for.combine配置;如果指定了壓縮map輸出,這里會根據配置對數據進行壓縮寫入磁盤),這個文件仍然保持partition和排序的狀態。
  • reduce階段,每個reduce任務開始從多個map上拷貝屬于自己partition(map階段已經做好partition,而且每個reduce任務知道應該拷貝哪個partition;拷貝過程是在不同節點之間,Reducer上拷貝線程基于HTTP來通過網絡傳輸數據)。
  • 每個reduce任務拷貝的map任務結果的指定partition,也是先將數據放入到自帶的一個buffer中(buffer默認大小為Heap內存的70%,可以通過mapred.job.shuffle.input.buffer.percent配置),如果配置了map結果進行壓縮,則這時要先將數據解壓縮后放入buffer中。
  • reduce自帶的buffer使用容量達到一定門限(默認0.66或66%,可以通過mapred.job.shuffle.merge.percent配置),或者buffer中存放的map的輸出的數量達到一定門限(默認1000,可以通過mapred.inmem.merge.threshold配置),buffer中的數據將會被寫入到磁盤中。
  • 在將buffer中多個map輸出合并寫入磁盤之前,如果設置了Combiner,則會化簡壓縮合并的map輸出。
  • 當屬于該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件,這時開始執行合并操作,并保持每個map輸出數據中Key的有序性,將多個文件合并成一個文件(在reduce端可能存在buffer和磁盤上都有數據的情況,這樣在buffer中的數據可以減少一定量的I/O寫入操作開銷)。
  • 最后,執行reduce階段,運行我們實現的Reducer中化簡邏輯,最終將結果直接輸出到HDFS中(因為Reducer運行在DataNode上,輸出結果的第一個replica直接在存儲在本地節點上)。
  • 通過上面的描述我們看到,在MR執行過程中,存在Shuffle過程的MR需要在網絡中的節點之間(Mapper節點和Reducer節點)拷貝數據,如果傳輸的數據量很大會造成一定的網絡開銷。而且,Map端和Reduce端都會通過一個特定的buffer來在內存中臨時緩存數據,如果無法根據實際應用場景中數據的規模來使用Hive,尤其是執行表的JOIN操作,有可能很浪費資源,降低了系統處理任務的效率,還可能因為內存不足造成OOME問題,導致計算任務失敗。
    下面,我們說明Hive中的JOIN操作,針對不同的JOIN方式,應該如何來實現和優化:

    生成一個MR Job

    多表連接,如果多個表中每個表都使用同一個列進行連接(出現在JOIN子句中),則只會生成一個MR Job,例如:

    SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)

    三個表a、b、c都分別使用了同一個字段進行連接,亦即同一個字段同時出現在兩個JOIN子句中,從而只生成一個MR Job。

    生成多個MR Job

    多表連接,如果多表中,其中存在一個表使用了至少2個字段進行連接(同一個表的至少2個列出現在JOIN子句中),則會至少生成2個MR Job,例如:

    SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key2)

    三個表基于2個字段進行連接,這兩個字段b.key1和b.key2同時出現在b表中。連接的過程是這樣的:首先a和b表基于a.key和b.key1進行連接,對應著第一個MR Job;表a和b連接的結果,再和c進行連接,對應著第二個MR Job。

    表連接順序優化

    多表連接,會轉換成多個MR Job,每一個MR Job在Hive中稱為JOIN階段(Stage)。在每一個Stage,按照JOIN順序中的最后一個表應該盡量是大表,因為JOIN前一階段生成的數據會存在于Reducer的buffer中,通過stream最后面的表,直接從Reducer的buffer中讀取已經緩沖的中間結果數據(這個中間結果數據可能是JOIN順序中,前面表連接的結果的Key,數據量相對較小,內存開銷就小),這樣,與后面的大表進行連接時,只需要從buffer中讀取緩存的Key,與大表中的指定Key進行連接,速度會更快,也可能避免內存緩沖區溢出。例如:

    SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)

    這個JOIN語句,會生成一個MR Job,在選擇JOIN順序的時候,數據量相比應該是b < c,表a和b基于a.key = b.key1進行連接,得到的結果(基于a和b進行連接的Key)會在Reducer上緩存在buffer中,在與c進行連接時,從buffer中讀取Key(a.key=b.key1)來與表c的c.key進行連接。
    另外,也可以通過給出一些Hint信息來啟發JOIN操作,這指定了將哪個表作為大表,從而得到優化。例如:

    ?

    SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)

    ?

    上述JOIN語句中,a表被視為大表,則首先會對表b和c進行JOIN,然后再將得到的結果與表a進行JOIN。

    基于條件的LEFT OUTER JOIN優化

    左連接時,左表中出現的JOIN字段都保留,右表沒有連接上的都為空。對于帶WHERE條件的JOIN語句,例如:

    ?
  • SELECT a.val, b.valFROM a LEFTOUTER JOIN b ON (a.key=b.key)

  • WHERE a.ds='2009-07-07'AND b.ds='2009-07-07'

  • 執行順序是,首先完成2表JOIN,然后再通過WHERE條件進行過濾,這樣在JOIN過程中可能會輸出大量結果,再對這些結果進行過濾,比較耗時??梢赃M行優化,將WHERE條件放在ON后,例如:

    ?
  • SELECT a.val, b.valFROM a LEFTOUTER JOIN b

  • ON (a.key=b.keyAND b.ds='2009-07-07'AND a.ds='2009-07-07')

  • 這樣,在JOIN的過程中,就對不滿足條件的記錄進行了預先過濾,可能會有更好的表現。

    左半連接(LEFT SEMI JOIN)

    左半連接實現了類似IN/EXISTS的查詢語義,使用關系數據庫子查詢的方式實現查詢SQL,例如:

    SELECT a.key, a.valueFROM a WHEREa.key IN (SELECT b.key FROM b);

    使用Hive對應于如下語句:

    SELECT a.key, a.valFROM a LEFTSEMI JOIN b ON (a.key= b.key)

    需要注意的是,在LEFT SEMI JOIN中,表b只能出現在ON子句后面,不能夠出現在SELECT和WHERE子句中。
    關于子查詢,這里提一下,Hive支持情況如下:

    • 在0.12版本,只支持FROM子句中的子查詢;
    • 在0.13版本,也支持WHERE子句中的子查詢。

    Map Side JOIN

    Map Side JOIN優化的出發點是,Map任務輸出后,不需要將數據拷貝到Reducer節點,降低的數據在網絡節點之間傳輸的開銷。
    多表連接,如果只有一個表比較大,其他表都很小,則JOIN操作會轉換成一個只包含Map的Job,例如:

    SELECT /*+ MAPJOIN(b) */ a.key, a.valueFROM a JOINb ON a.key = b.key

    對于表a數據的每一個Map,都能夠完全讀取表b的數據。這里,表a與b不允許執行FULL OUTER JOIN、RIGHT OUTER JOIN。

    BUCKET Map Side JOIN

    我們先看兩個表a和b的DDL,表a為:

    ?
  • CREATE TABLEa(key INT, othera STRING)

  • CLUSTERED BY(key)INTO 4 BUCKETS

  • ROW FORMAT DELIMITED

  • FIELDS TERMINATED BY'\001'

  • COLLECTION ITEMS TERMINATED BY'\002'

  • MAP KEYS TERMINATED BY'\003'

  • STORED ASSEQUENCEFILE;

  • 表b為:

    ?
  • CREATE TABLEb(key INT, otherb STRING)

  • CLUSTERED BY(key)INTO 32 BUCKETS

  • ROW FORMAT DELIMITED

  • FIELDS TERMINATED BY'\001'

  • COLLECTION ITEMS TERMINATED BY'\002'

  • MAP KEYS TERMINATED BY'\003'

  • STORED ASSEQUENCEFILE;

  • 現在要基于a.key和b.key進行JOIN操作,此時JOIN列同時也是BUCKET列,JOIN語句如下:

    SELECT /*+ MAPJOIN(b) */ a.key, a.valueFROM a JOINb ON a.key = b.key

    并且表a有4個BUCKET,表b有32個BUCKET,默認情況下,對于表a的每一個BUCKET,都會去獲取表b中的每一個BUCKET來進行JOIN,這回造成一定的開銷,因為只有表b中滿足JOIN條件的BUCKET才會真正與表a的BUCKET進行連接。
    這種默認行為可以進行優化,通過改變默認JOIN行為,只需要設置變量:

    set hive.optimize.bucketmapjoin =true

    這樣,JOIN的過程是,表a的BUCKET 1只會與表b中的BUCKET 1進行JOIN,而不再考慮表b中的其他BUCKET 2~32。
    如果上述表具有相同的BUCKET,如都是32個,而且還是排序的,亦即,在表定義中在CLUSTERED BY(key)后面增加如下約束:

    SORTED BY(key)

    則上述JOIN語句會執行一個Sort-Merge-Bucket (SMB) JOIN,同樣需要設置如下參數來改變默認行為,優化JOIN時只遍歷相關的BUCKET即可:

    ?

    ?
  • set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

  • set hive.optimize.bucketmapjoin =true;

  • set hive.optimize.bucketmapjoin.sortedmerge =true;

  • ?

    關于更多的有關JOIN優化,可以參考后面的鏈接。

    參考內容

    • 《Hadoop: The Definitive Guide》
    • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
    • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
    • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization
    • https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries
    • https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior

    原文鏈接:http://shiyanjun.cn/archives/588.html

    總結

    以上是生活随笔為你收集整理的Hive JOIN使用详解的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。