日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL的3种Join实现

發(fā)布時間:2025/3/19 数据库 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL的3种Join实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

引言

Join是SQL語句中的常用操作,良好的表結(jié)構(gòu)能夠?qū)?shù)據(jù)分散在不同的表中,使其符合某種范式,減少表冗余、更新容錯等。而建立表和表之間關(guān)系的最佳方式就是Join操作。

對于Spark來說有3中Join的實現(xiàn),每種Join對應(yīng)著不同的應(yīng)用場景:

  • Broadcast Hash Join : 適合一張較小的表和一張大表進(jìn)行join
  • Shuffle Hash Join : ?適合一張小表和一張大表進(jìn)行join,或者是兩張小表之間的join
  • Sort Merge Join : 適合兩張較大的表之間進(jìn)行join
  • 前兩者都基于的是Hash Join,只不過在hash join之前需要先shuffle還是先broadcast。下面將詳細(xì)的解釋一下這三種不同的join的具體原理

    ?

    ?Hash Join

    先來看看這樣一條SQL語句:

    select * from order,item where item.id = order.i_id

    很簡單一個Join節(jié)點,參與join的兩張表是item和order,join key分別是item.id以及order.i_id。現(xiàn)在假設(shè)這個Join采用的是hash join算法,整個過程會經(jīng)歷三步:

    1. 確定Build Table以及Probe Table:這個概念比較重要,Build Table使用join key構(gòu)建Hash Table,而Probe Table使用join key進(jìn)行探測,探測成功就可以join在一起。通常情況下,小表會作為Build Table,大表作為Probe Table。此事例中item為Build Table,order為Probe Table;

    2. 構(gòu)建Hash Table:依次讀取Build Table(item)的數(shù)據(jù),對于每一行數(shù)據(jù)根據(jù)join key(item.id)進(jìn)行hash,hash到對應(yīng)的Bucket,生成hash table中的一條記錄。數(shù)據(jù)緩存在內(nèi)存中,如果內(nèi)存放不下需要dump到外存;

    3. 探測:再依次掃描Probe Table(order)的數(shù)據(jù),使用相同的hash函數(shù)映射Hash Table中的記錄,映射成功之后再檢查join條件(item.id = order.i_id),如果匹配成功就可以將兩者join在一起。

    基本流程可以參考上圖,這里有兩個小問題需要關(guān)注:

    1. hash join性能如何?很顯然,hash join基本都只掃描兩表一次,可以認(rèn)為o(a+b),較之最極端的笛卡爾集運算a*b,不知甩了多少條街;

    2. 為什么Build Table選擇小表?道理很簡單,因為構(gòu)建的Hash Table最好能全部加載在內(nèi)存,效率最高;這也決定了hash join算法只適合至少一個小表的join場景,對于兩個大表的join場景并不適用。

    ?

    上文說過,hash join是傳統(tǒng)數(shù)據(jù)庫中的單機join算法,在分布式環(huán)境下需要經(jīng)過一定的分布式改造,說到底就是盡可能利用分布式計算資源進(jìn)行并行化計算,提高總體效率。hash join分布式改造一般有兩種經(jīng)典方案:

    1. broadcast hash join:將其中一張小表廣播分發(fā)到另一張大表所在的分區(qū)節(jié)點上,分別并發(fā)地與其上的分區(qū)記錄進(jìn)行hash join。broadcast適用于小表很小,可以直接廣播的場景;

    2. shuffler hash join:一旦小表數(shù)據(jù)量較大,此時就不再適合進(jìn)行廣播分發(fā)。這種情況下,可以根據(jù)join key相同必然分區(qū)相同的原理,將兩張表分別按照join key進(jìn)行重新組織分區(qū),這樣就可以將join分而治之,劃分為很多小join,充分利用集群資源并行化。

    ?

    Broadcast Hash Join

    大家知道,在數(shù)據(jù)庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯(lián)系人、物品種類等,一般數(shù)據(jù)有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。

    因為Join操作是對兩個表中key值相同的記錄進(jìn)行連接,在SparkSQL中,對兩個表做Join最直接的方式是先根據(jù)key分區(qū),再在每個分區(qū)中把key值相同的記錄拿出來做連接操作。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操作,我們應(yīng)該盡可能的設(shè)計Spark應(yīng)用使其避免大量的shuffle。

    當(dāng)維度表和事實表進(jìn)行Join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部數(shù)據(jù)分發(fā)到每個節(jié)點上,供事實表使用。executor存儲維度表的全部數(shù)據(jù),一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join,如下圖所示:

    Table B是較小的表,黑色表示將其廣播到每個executor節(jié)點上,Table A的每個partition會通過block manager取到Table A的數(shù)據(jù)。根據(jù)每條記錄的Join Key取到Table B中相對應(yīng)的記錄,根據(jù)Join Type進(jìn)行操作。這個過程比較簡單,不做贅述。

    Broadcast Join的條件有以下幾個:

    1. 被廣播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的值,默認(rèn)是10M (或者加了broadcast join的hint)

    2. 基表不能被廣播,比如left outer join時,只能廣播右表

    看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用于廣播較小的表,否則數(shù)據(jù)的冗余傳輸就遠(yuǎn)大于shuffle的開銷;另外,廣播時需要將被廣播的表現(xiàn)collect到driver端,當(dāng)頻繁有廣播出現(xiàn)時,對driver的內(nèi)存也是一個考驗。

    ?

    如下圖所示,broadcast hash join可以分為兩步:

    1. broadcast階段:將小表廣播分發(fā)到大表所在的所有主機。廣播算法可以有很多,最簡單的是先發(fā)給driver,driver再統(tǒng)一分發(fā)給所有executor;要不就是基于bittorrete的p2p思路;

    2. hash join階段:在每個executor上執(zhí)行單機版hash join,小表映射,大表試探;

    SparkSQL規(guī)定broadcast hash join執(zhí)行的基本條件為被廣播小表必須小于參數(shù)spark.sql.autoBroadcastJoinThreshold,默認(rèn)為10M。

    ?

    Shuffle Hash Join

    當(dāng)一側(cè)的表比較小時,我們選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver段,然后被冗余分發(fā)到每個executor上,所以當(dāng)表比較大時,采用broadcast join會對driver端和executor端造成較大的壓力。

    但由于Spark是一個分布式的計算引擎,可以通過分區(qū)的形式將大批量的數(shù)據(jù)劃分成n份較小的數(shù)據(jù)集進(jìn)行并行計算。這種思想應(yīng)用到Join上便是Shuffle Hash Join了。利用key相同必然分區(qū)相同的這個原理,兩個表中,key相同的行都會被shuffle到同一個分區(qū)中,SparkSQL將較大表的join分而治之,先將表劃分成n個分區(qū),再對兩個表中相對應(yīng)分區(qū)的數(shù)據(jù)分別進(jìn)行Hash Join,這樣即在一定程度上減少了driver廣播一側(cè)表的壓力,也減少了executor端取整張被廣播表的內(nèi)存消耗。其原理如下圖:

    Shuffle Hash Join分為兩步:

    1. 對兩張表分別按照join keys進(jìn)行重分區(qū),即shuffle,目的是為了讓有相同join keys值的記錄分到對應(yīng)的分區(qū)中

    2. 對對應(yīng)分區(qū)中的數(shù)據(jù)進(jìn)行join,此處先將小表分區(qū)構(gòu)造為一張hash表,然后根據(jù)大表分區(qū)中記錄的join keys值拿出來進(jìn)行匹配

    Shuffle Hash Join的條件有以下幾個:

    1. 分區(qū)的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認(rèn)是10M?

    2. 基表不能被廣播,比如left outer join時,只能廣播右表

    3. 一側(cè)的表要明顯小于另外一側(cè),小的一側(cè)將被廣播(明顯小于的定義為3倍小,此處為經(jīng)驗值)

    我們可以看到,在一定大小的表中,SparkSQL從時空結(jié)合的角度來看,將兩個表進(jìn)行重新分區(qū),并且對小表中的分區(qū)進(jìn)行hash化,從而完成join。在保持一定復(fù)雜度的基礎(chǔ)上,盡量減少driver和executor的內(nèi)存壓力,提升了計算時的穩(wěn)定性。

    ?

    在大數(shù)據(jù)條件下如果一張表很小,執(zhí)行join操作最優(yōu)的選擇無疑是broadcast hash join,效率最高。但是一旦小表數(shù)據(jù)量增大,廣播所需內(nèi)存、帶寬等資源必然就會太大,broadcast hash join就不再是最優(yōu)方案。此時可以按照join key進(jìn)行分區(qū),根據(jù)key相同必然分區(qū)相同的原理,就可以將大表join分而治之,劃分為很多小表的join,充分利用集群資源并行化。如下圖所示,shuffle hash join也可以分為兩步:

    1. shuffle階段:分別將兩個表按照join key進(jìn)行分區(qū),將相同join key的記錄重分布到同一節(jié)點,兩張表的數(shù)據(jù)會被重分布到集群中所有節(jié)點。這個過程稱為shuffle

    2. hash join階段:每個分區(qū)節(jié)點上的數(shù)據(jù)單獨執(zhí)行單機hash join算法。

    看到這里,可以初步總結(jié)出來如果兩張小表join可以直接使用單機版hash join;如果一張大表join一張極小表,可以選擇broadcast hash join算法;而如果是一張大表join一張小表,則可以選擇shuffle hash join算法;那如果是兩張大表進(jìn)行join呢?

    ?

    Sort Merge Join

    上面介紹的兩種實現(xiàn)對于一定大小的表比較適用,但當(dāng)兩個表都非常大時,顯然無論適用哪種都會對計算內(nèi)存造成很大壓力。這是因為join時兩者采取的都是hash join,是將一側(cè)的數(shù)據(jù)完全加載到內(nèi)存中,使用hash code取join keys值相等的記錄進(jìn)行連接。

    當(dāng)兩個表都非常大時,SparkSQL采用了一種全新的方案來對表進(jìn)行Join,即Sort Merge Join。這種實現(xiàn)方式不用將一側(cè)數(shù)據(jù)全部加載后再進(jìn)星hash join,但需要在join前將數(shù)據(jù)排序,如下圖所示:

    可以看到,首先將兩張表按照join keys進(jìn)行了重新shuffle,保證join keys值相同的記錄會被分在相應(yīng)的分區(qū)。分區(qū)后對每個分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序后再對相應(yīng)的分區(qū)內(nèi)的記錄進(jìn)行連接,如下圖示:

    看著很眼熟吧?也很簡單,因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續(xù)取左邊,反之取右邊。

    可以看出,無論分區(qū)有多大,Sort Merge Join都不用把某一側(cè)的數(shù)據(jù)全部加載到內(nèi)存中,而是即用即取即丟,從而大大提升了大數(shù)據(jù)量下sql join的穩(wěn)定性。

    ?

    SparkSQL對兩張大表join采用了全新的算法-sort-merge join,如下圖所示,整個過程分為三個步驟:

    1. shuffle階段:將兩張大表根據(jù)join key進(jìn)行重新分區(qū),兩張表數(shù)據(jù)會分布到整個集群,以便分布式并行處理;

    2. sort階段:對單個分區(qū)節(jié)點的兩表數(shù)據(jù),分別進(jìn)行排序;

    3. merge階段:對排好序的兩張分區(qū)表數(shù)據(jù)執(zhí)行join操作。join操作很簡單,分別遍歷兩個有序序列,碰到相同join key就merge輸出,否則取更小一邊,見下圖示意:

    經(jīng)過上文的分析,可以明確每種Join算法都有自己的適用場景,數(shù)據(jù)倉庫設(shè)計時最好避免大表與大表的join查詢,SparkSQL也可以根據(jù)內(nèi)存資源、帶寬資源適量將參數(shù)spark.sql.autoBroadcastJoinThreshold調(diào)大,讓更多join實際執(zhí)行為broadcast hash join。

    ?

    參考來源:

    https://www.cnblogs.com/0xcafedaddy/p/7614299.html

    轉(zhuǎn)載于:https://www.cnblogs.com/JP6907/p/10721436.html

    總結(jié)

    以上是生活随笔為你收集整理的SparkSQL的3种Join实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。