日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

如何查询spark版本_掌握Spark SQL中的查询执行

發(fā)布時(shí)間:2024/1/23 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何查询spark版本_掌握Spark SQL中的查询执行 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

了解您的查詢計(jì)劃

自從Spark 2.x以來,由于SQL和聲明性DataFrame API,在Spark中查詢數(shù)據(jù)已成為一種奢侈。 僅使用幾行高級(jí)代碼就可以表達(dá)非常復(fù)雜的邏輯并執(zhí)行復(fù)雜的轉(zhuǎn)換。 API的最大好處是用戶無需考慮執(zhí)行問題,而可以讓優(yōu)化器找出執(zhí)行查詢的最有效方法。 有效的查詢執(zhí)行通常是一個(gè)要求,不僅因?yàn)橘Y源可能變得昂貴,而且還通過減少最終用戶等待計(jì)算結(jié)果的時(shí)間,使最終用戶的工作更加舒適。

Spark SQL優(yōu)化器確實(shí)已經(jīng)相當(dāng)成熟,尤其是在即將發(fā)布的3.0版本中,它將引入一些新的內(nèi)部?jī)?yōu)化功能,例如動(dòng)態(tài)分區(qū)修剪和自適應(yīng)查詢執(zhí)行。 優(yōu)化器在內(nèi)部使用查詢計(jì)劃,通常可以簡(jiǎn)化查詢并通過各種規(guī)則進(jìn)行優(yōu)化。 例如,它可以更改某些轉(zhuǎn)換的順序,或者如果最終輸出不需要它們,則可以完全刪除它們。 盡管進(jìn)行了所有聰明的優(yōu)化,但是在某些情況下,人腦仍可以做得更好。 在本文中,我們將研究其中一種情況,并了解如何通過簡(jiǎn)單的技巧使Spark朝著更有效的執(zhí)行方向發(fā)展。

該代碼在當(dāng)前版本為2.4.5的Spark中進(jìn)行了測(cè)試(編寫于2020年6月),并針對(duì)Spark 3.0.0-preview2進(jìn)行了檢查,以查看即將到來的Spark 3.0的可能更改。

型號(hào)范例

現(xiàn)在讓我首先介紹一個(gè)簡(jiǎn)單的例子,我們將嘗試實(shí)現(xiàn)有效的執(zhí)行。 假設(shè)我們有json格式的數(shù)據(jù),其結(jié)構(gòu)如下:

{"id": 1, "user_id": 100, "price": 50}{"id": 2, "user_id": 100, "price": 200}{"id": 3, "user_id": 101, "price": 120}{"id": 4, "price": 120}

每個(gè)記錄都像一個(gè)事務(wù),因此user_id列可能包含很多重復(fù)的值(可能包括空值),除了這三列之外,還可以有許多其他字段來描述事務(wù)。 現(xiàn)在,我們的查詢將基于兩個(gè)相似聚合的并集,其中每個(gè)聚合在某些情況下會(huì)有所不同。 在第一個(gè)聚合中,我們要選擇價(jià)格總和小于50的用戶,在第二個(gè)聚合中,我們要選擇價(jià)格總和大于100的用戶。此外,在第二個(gè)聚合中,我們只考慮 記錄user_id不為null的地方。 這個(gè)模型示例只是實(shí)踐中可能發(fā)生的更復(fù)雜情況的簡(jiǎn)化版本,為簡(jiǎn)單起見,我們將在本文中使用它。 這是一種使用PySpark的DataFrame API表示這種查詢的基本方式(非常相似,我們也可以使用Scala API編寫該查詢):

df = spark.read.json(data_path)df_small = ( df .groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") < 50))df_big = ( df .filter(col("user_id").isNotNull()) .groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") > 100) )result = df_small.union(df_big)

解釋計(jì)劃

為查詢實(shí)現(xiàn)良好性能的關(guān)鍵是能夠理解和解釋查詢計(jì)劃。 可以通過調(diào)用Spark DataFrame上的explain函數(shù)來顯示計(jì)劃本身,或者如果查詢已經(jīng)在運(yùn)行(或已完成),我們還可以轉(zhuǎn)到Spark UI并在SQL選項(xiàng)卡中找到計(jì)劃。

SQL選項(xiàng)卡包含集群中已完成和正在運(yùn)行的查詢的列表,因此通過選擇查詢,我們將看到物理計(jì)劃的圖形表示(此處,我刪除了指標(biāo)信息以使圖變小):

該計(jì)劃具有樹形結(jié)構(gòu),其中每個(gè)節(jié)點(diǎn)代表一些運(yùn)算符,這些運(yùn)算符包含一些有關(guān)執(zhí)行的信息。 我們可以看到在示例中,有兩個(gè)分支,其根在底部,葉在頂部,開始執(zhí)行。 葉子Scan json表示從源中讀取數(shù)據(jù),然后有一對(duì)HashAggregate運(yùn)算符負(fù)責(zé)聚合,在它們之間存在代表隨機(jī)播放的Exchange。 過濾器運(yùn)算符攜帶有關(guān)過濾條件的信息。

該計(jì)劃具有典型的聯(lián)合操作形狀,聯(lián)合中的每個(gè)DataFrame都有一個(gè)新分支,并且由于在我們的示例中,兩個(gè)DataFrame都基于相同的數(shù)據(jù)源,因此這意味著該數(shù)據(jù)源將被掃描兩次。 現(xiàn)在我們可以看到仍有改進(jìn)的空間。 僅對(duì)數(shù)據(jù)源進(jìn)行一次掃描可以帶來很好的優(yōu)化效果,尤其是在I / O昂貴的情況下。

從概念上講,我們要在這里實(shí)現(xiàn)的是重用一些計(jì)算-掃描數(shù)據(jù)并計(jì)算聚合,因?yàn)檫@些操作在兩個(gè)DataFrame中都是相同的,并且原則上只計(jì)算一次就足夠了。

快取

如何在Spark中重用計(jì)算的一種典型方法是使用緩存。 可以在DataFrame上調(diào)用函數(shù)緩存:

df.cache()

這是一個(gè)懶惰的轉(zhuǎn)換,這意味著在我們調(diào)用某些操作后,數(shù)據(jù)將被放入緩存層。 緩存是Spark中使用的非常普遍的技術(shù),但是它有其局限性,尤其是在緩存的數(shù)據(jù)很大且群集上的資源有限的情況下。 還需要注意的是,將數(shù)據(jù)存儲(chǔ)在緩存層(內(nèi)存或磁盤)中會(huì)帶來一些額外的開銷,并且操作本身并非免費(fèi)的。 從整個(gè)DataFrame df調(diào)用緩存也不是最佳選擇,原因是它會(huì)嘗試將所有列都放入內(nèi)存,而這可能是不必要的。 更謹(jǐn)慎的方法是選擇將在以下查詢中使用的所有列的超集,然后在此選擇之后調(diào)用緩存函數(shù)。

交換重用

除了緩存之外,還有另一種文獻(xiàn)中沒有很好描述的技術(shù),該技術(shù)基于重用Exchange。 Exchange運(yùn)算符表示隨機(jī)播放,它是群集上的物理數(shù)據(jù)移動(dòng)。當(dāng)必須重新組織(重新分區(qū))數(shù)據(jù)時(shí)通常會(huì)發(fā)生這種情況,而聚合,聯(lián)接和其他一些轉(zhuǎn)換通常需要這些數(shù)據(jù)。隨機(jī)播放的重要之處在于,當(dāng)對(duì)數(shù)據(jù)進(jìn)行重新分區(qū)時(shí),Spark始終會(huì)在進(jìn)行隨機(jī)播放寫入時(shí)將其保存在磁盤上(這是內(nèi)部行為,不受最終用戶的控制)。并且由于它已保存在磁盤上,因此以后可以根據(jù)需要重新使用。如果發(fā)現(xiàn)機(jī)會(huì),Spark確實(shí)會(huì)重用數(shù)據(jù)。每當(dāng)Spark檢測(cè)到從葉節(jié)點(diǎn)到Exchange的同一分支在計(jì)劃中的某處重復(fù)時(shí),就會(huì)發(fā)生這種情況。如果存在這種情況,則意味著這些重復(fù)的分支表示相同的計(jì)算,因此僅計(jì)算一次然后重用它就足夠了。我們可以從計(jì)劃中識(shí)別出Spark是否找到了這種情況,因?yàn)檫@些分支將像這樣合并在一起:

在我們的示例中,Spark沒有重用Exchange,但是通過一個(gè)簡(jiǎn)單的技巧,我們可以促使他這樣做。 不能在我們的查詢中重用Exchange的原因是右分支中的過濾器與過濾條件user_id不為null。 過濾器確實(shí)是我們兩個(gè)數(shù)據(jù)幀中唯一的區(qū)別,因此,如果我們可以消除這種區(qū)別并使兩個(gè)分支相同,Spark將負(fù)責(zé)其余部分并重用Exchange。

調(diào)整計(jì)劃

我們?nèi)绾问狗种嗤?#xff1f; 好吧,如果唯一的區(qū)別是過濾器,那么我們當(dāng)然可以切換轉(zhuǎn)換的順序,并在聚合之后調(diào)用過濾器,因?yàn)檫@不會(huì)影響所產(chǎn)生結(jié)果的正確性。 但是有一個(gè)陷阱! 如果我們這樣移動(dòng)過濾器:

df_big = ( df.groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") > 100) .filter(col("price").isNotNull()))

并檢查最終查詢計(jì)劃,我們將發(fā)現(xiàn)該計(jì)劃根本沒有改變! 解釋很簡(jiǎn)單-優(yōu)化器將過濾器移回了。

從概念上講,最好了解查詢計(jì)劃有兩種主要類型:邏輯計(jì)劃和物理計(jì)劃。 邏輯計(jì)劃在變成物理計(jì)劃(即將要執(zhí)行的最終計(jì)劃)之前經(jīng)歷優(yōu)化階段。 當(dāng)我們更改某些轉(zhuǎn)換時(shí),它會(huì)反映在邏輯計(jì)劃中,但隨后我們將失去對(duì)后續(xù)步驟的控制。 優(yōu)化器將應(yīng)用一組優(yōu)化規(guī)則,這些規(guī)則主要基于某些啟發(fā)式算法。 與我們的示例相關(guān)的規(guī)則稱為PushDownPredicate,該規(guī)則可確保盡快應(yīng)用過濾器并將其推向源頭。 基于這樣的思想,首先過濾數(shù)據(jù),然后對(duì)精簡(jiǎn)后的數(shù)據(jù)集進(jìn)行計(jì)算,效率更高。 該規(guī)則在大多數(shù)情況下確實(shí)非常有用,但是在這種情況下,它正在與我們作戰(zhàn)。

要在計(jì)劃中實(shí)現(xiàn)過濾器的自定義位置,我們必須限制優(yōu)化器。 自Spark 2.4起,這是可能的,因?yàn)榇嬖谝粋€(gè)配置設(shè)置,該設(shè)置使我們可以列出要從優(yōu)化器中排除的所有優(yōu)化規(guī)則:

spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")

設(shè)置此配置并再次運(yùn)行查詢后,我們將看到過濾器現(xiàn)在保持在所需的位置。 這兩個(gè)分支實(shí)際上是相同的,Spark現(xiàn)在將重用Exchange! 現(xiàn)在,僅對(duì)數(shù)據(jù)集進(jìn)行一次掃描,并且對(duì)聚合進(jìn)行同樣的處理。

在Spark 3.0中,情況有所改變,優(yōu)化規(guī)則現(xiàn)在有了一個(gè)不同的名稱-PushDownPredicates,并且還有一個(gè)附加規(guī)則還負(fù)責(zé)推送過濾器PushPredicateThroughNonJoin,因此我們實(shí)際上需要同時(shí)將它們都排除在外 目標(biāo)。

最后的想法

我們可以看到,通過這種技術(shù),Spark開發(fā)人員使我們能夠控制優(yōu)化器。 但是權(quán)力也伴隨著責(zé)任。 讓我們列出使用此技術(shù)時(shí)要牢記的幾點(diǎn):

· 當(dāng)我們停止PushDownPredicate時(shí),我們將負(fù)責(zé)查詢中的所有過濾器,而不僅僅是我們要重新定位的過濾器。 可能還有其他重要的過濾器要盡快進(jìn)行,例如分區(qū)過濾器,因此我們需要確保它們的位置正確。

· 限制優(yōu)化器并維護(hù)過濾器是用戶方面的一些額外工作,因此值得這樣做。 在我們的模型示例中,可能會(huì)在I / O開銷很大的情況下加快查詢速度,因?yàn)槲覀儗?shí)現(xiàn)僅對(duì)數(shù)據(jù)進(jìn)行一次掃描。 如果數(shù)據(jù)集具有很多列,則對(duì)于非列格式的文件格式(例如json或csv)可能就是這種情況。

· 同樣,如果數(shù)據(jù)集很小,則可能不值得花更多的精力來控制優(yōu)化器,因?yàn)楹?jiǎn)單的緩存就可以完成工作。 但是,當(dāng)數(shù)據(jù)集很大時(shí),將數(shù)據(jù)存儲(chǔ)在緩存層中的開銷將變得顯而易見。 另一方面,重用的Exchange將不會(huì)帶來任何額外的開銷,因?yàn)闊o論如何計(jì)算的混洗都將存儲(chǔ)在磁盤上。

· 該技術(shù)基于Spark的內(nèi)部行為,該行為沒有官方文檔,如果此功能發(fā)生更改,則可能很難找到它。 在我們的示例中,我們可以看到Spark 3.0中實(shí)際上有一個(gè)更改,其中一個(gè)規(guī)則被重命名,而另一個(gè)規(guī)則被添加。

結(jié)論

我們已經(jīng)看到,要獲得最佳性能可能需要了解查詢計(jì)劃。 通過使用一組啟發(fā)式規(guī)則優(yōu)化我們的查詢,Spark優(yōu)化器可以很好地完成工作。 但是,在某些情況下,這些規(guī)則會(huì)錯(cuò)過最佳配置。 有時(shí)重寫查詢就足夠了,但有時(shí)卻不能,因?yàn)橥ㄟ^重寫查詢,我們將獲得不同的邏輯計(jì)劃,但是我們無法直接控制將要執(zhí)行的物理計(jì)劃。 從Spark 2.4開始,我們可以使用排除規(guī)則的配置設(shè)置,該設(shè)置允許我們限制優(yōu)化器,從而將Spark導(dǎo)航到更自定義的物理計(jì)劃。

在許多情況下,依靠?jī)?yōu)化器將導(dǎo)致制定具有相當(dāng)高效執(zhí)行力的可靠計(jì)劃,但是,在大多數(shù)情況下,關(guān)鍵性能工作負(fù)載尤為重要,因此有必要檢查最終計(jì)劃并看看我們是否可以通過采用該計(jì)劃來改進(jìn)它。 控制優(yōu)化器。

(本文翻譯自David Vrba的文章《Be in charge of Query Execution in Spark SQL》,參考:https://towardsdatascience.com/be-in-charge-of-query-execution-in-spark-sql-c83d1e16b9b8)

總結(jié)

以上是生活随笔為你收集整理的如何查询spark版本_掌握Spark SQL中的查询执行的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。