如何查询spark版本_掌握Spark SQL中的查询执行
了解您的查詢計劃
自從Spark 2.x以來,由于SQL和聲明性DataFrame API,在Spark中查詢數據已成為一種奢侈。 僅使用幾行高級代碼就可以表達非常復雜的邏輯并執(zhí)行復雜的轉換。 API的最大好處是用戶無需考慮執(zhí)行問題,而可以讓優(yōu)化器找出執(zhí)行查詢的最有效方法。 有效的查詢執(zhí)行通常是一個要求,不僅因為資源可能變得昂貴,而且還通過減少最終用戶等待計算結果的時間,使最終用戶的工作更加舒適。
Spark SQL優(yōu)化器確實已經相當成熟,尤其是在即將發(fā)布的3.0版本中,它將引入一些新的內部優(yōu)化功能,例如動態(tài)分區(qū)修剪和自適應查詢執(zhí)行。 優(yōu)化器在內部使用查詢計劃,通常可以簡化查詢并通過各種規(guī)則進行優(yōu)化。 例如,它可以更改某些轉換的順序,或者如果最終輸出不需要它們,則可以完全刪除它們。 盡管進行了所有聰明的優(yōu)化,但是在某些情況下,人腦仍可以做得更好。 在本文中,我們將研究其中一種情況,并了解如何通過簡單的技巧使Spark朝著更有效的執(zhí)行方向發(fā)展。
該代碼在當前版本為2.4.5的Spark中進行了測試(編寫于2020年6月),并針對Spark 3.0.0-preview2進行了檢查,以查看即將到來的Spark 3.0的可能更改。
型號范例
現在讓我首先介紹一個簡單的例子,我們將嘗試實現有效的執(zhí)行。 假設我們有json格式的數據,其結構如下:
{"id": 1, "user_id": 100, "price": 50}{"id": 2, "user_id": 100, "price": 200}{"id": 3, "user_id": 101, "price": 120}{"id": 4, "price": 120}每個記錄都像一個事務,因此user_id列可能包含很多重復的值(可能包括空值),除了這三列之外,還可以有許多其他字段來描述事務。 現在,我們的查詢將基于兩個相似聚合的并集,其中每個聚合在某些情況下會有所不同。 在第一個聚合中,我們要選擇價格總和小于50的用戶,在第二個聚合中,我們要選擇價格總和大于100的用戶。此外,在第二個聚合中,我們只考慮 記錄user_id不為null的地方。 這個模型示例只是實踐中可能發(fā)生的更復雜情況的簡化版本,為簡單起見,我們將在本文中使用它。 這是一種使用PySpark的DataFrame API表示這種查詢的基本方式(非常相似,我們也可以使用Scala API編寫該查詢):
df = spark.read.json(data_path)df_small = ( df .groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") < 50))df_big = ( df .filter(col("user_id").isNotNull()) .groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") > 100) )result = df_small.union(df_big)解釋計劃
為查詢實現良好性能的關鍵是能夠理解和解釋查詢計劃。 可以通過調用Spark DataFrame上的explain函數來顯示計劃本身,或者如果查詢已經在運行(或已完成),我們還可以轉到Spark UI并在SQL選項卡中找到計劃。
SQL選項卡包含集群中已完成和正在運行的查詢的列表,因此通過選擇查詢,我們將看到物理計劃的圖形表示(此處,我刪除了指標信息以使圖變小):
該計劃具有樹形結構,其中每個節(jié)點代表一些運算符,這些運算符包含一些有關執(zhí)行的信息。 我們可以看到在示例中,有兩個分支,其根在底部,葉在頂部,開始執(zhí)行。 葉子Scan json表示從源中讀取數據,然后有一對HashAggregate運算符負責聚合,在它們之間存在代表隨機播放的Exchange。 過濾器運算符攜帶有關過濾條件的信息。
該計劃具有典型的聯合操作形狀,聯合中的每個DataFrame都有一個新分支,并且由于在我們的示例中,兩個DataFrame都基于相同的數據源,因此這意味著該數據源將被掃描兩次。 現在我們可以看到仍有改進的空間。 僅對數據源進行一次掃描可以帶來很好的優(yōu)化效果,尤其是在I / O昂貴的情況下。
從概念上講,我們要在這里實現的是重用一些計算-掃描數據并計算聚合,因為這些操作在兩個DataFrame中都是相同的,并且原則上只計算一次就足夠了。
快取
如何在Spark中重用計算的一種典型方法是使用緩存。 可以在DataFrame上調用函數緩存:
df.cache()這是一個懶惰的轉換,這意味著在我們調用某些操作后,數據將被放入緩存層。 緩存是Spark中使用的非常普遍的技術,但是它有其局限性,尤其是在緩存的數據很大且群集上的資源有限的情況下。 還需要注意的是,將數據存儲在緩存層(內存或磁盤)中會帶來一些額外的開銷,并且操作本身并非免費的。 從整個DataFrame df調用緩存也不是最佳選擇,原因是它會嘗試將所有列都放入內存,而這可能是不必要的。 更謹慎的方法是選擇將在以下查詢中使用的所有列的超集,然后在此選擇之后調用緩存函數。
交換重用
除了緩存之外,還有另一種文獻中沒有很好描述的技術,該技術基于重用Exchange。 Exchange運算符表示隨機播放,它是群集上的物理數據移動。當必須重新組織(重新分區(qū))數據時通常會發(fā)生這種情況,而聚合,聯接和其他一些轉換通常需要這些數據。隨機播放的重要之處在于,當對數據進行重新分區(qū)時,Spark始終會在進行隨機播放寫入時將其保存在磁盤上(這是內部行為,不受最終用戶的控制)。并且由于它已保存在磁盤上,因此以后可以根據需要重新使用。如果發(fā)現機會,Spark確實會重用數據。每當Spark檢測到從葉節(jié)點到Exchange的同一分支在計劃中的某處重復時,就會發(fā)生這種情況。如果存在這種情況,則意味著這些重復的分支表示相同的計算,因此僅計算一次然后重用它就足夠了。我們可以從計劃中識別出Spark是否找到了這種情況,因為這些分支將像這樣合并在一起:
在我們的示例中,Spark沒有重用Exchange,但是通過一個簡單的技巧,我們可以促使他這樣做。 不能在我們的查詢中重用Exchange的原因是右分支中的過濾器與過濾條件user_id不為null。 過濾器確實是我們兩個數據幀中唯一的區(qū)別,因此,如果我們可以消除這種區(qū)別并使兩個分支相同,Spark將負責其余部分并重用Exchange。
調整計劃
我們如何使分支相同? 好吧,如果唯一的區(qū)別是過濾器,那么我們當然可以切換轉換的順序,并在聚合之后調用過濾器,因為這不會影響所產生結果的正確性。 但是有一個陷阱! 如果我們這樣移動過濾器:
df_big = ( df.groupBy("user_id") .agg(sum("price").alias("price")) .filter(col("price") > 100) .filter(col("price").isNotNull()))并檢查最終查詢計劃,我們將發(fā)現該計劃根本沒有改變! 解釋很簡單-優(yōu)化器將過濾器移回了。
從概念上講,最好了解查詢計劃有兩種主要類型:邏輯計劃和物理計劃。 邏輯計劃在變成物理計劃(即將要執(zhí)行的最終計劃)之前經歷優(yōu)化階段。 當我們更改某些轉換時,它會反映在邏輯計劃中,但隨后我們將失去對后續(xù)步驟的控制。 優(yōu)化器將應用一組優(yōu)化規(guī)則,這些規(guī)則主要基于某些啟發(fā)式算法。 與我們的示例相關的規(guī)則稱為PushDownPredicate,該規(guī)則可確保盡快應用過濾器并將其推向源頭。 基于這樣的思想,首先過濾數據,然后對精簡后的數據集進行計算,效率更高。 該規(guī)則在大多數情況下確實非常有用,但是在這種情況下,它正在與我們作戰(zhàn)。
要在計劃中實現過濾器的自定義位置,我們必須限制優(yōu)化器。 自Spark 2.4起,這是可能的,因為存在一個配置設置,該設置使我們可以列出要從優(yōu)化器中排除的所有優(yōu)化規(guī)則:
spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")設置此配置并再次運行查詢后,我們將看到過濾器現在保持在所需的位置。 這兩個分支實際上是相同的,Spark現在將重用Exchange! 現在,僅對數據集進行一次掃描,并且對聚合進行同樣的處理。
在Spark 3.0中,情況有所改變,優(yōu)化規(guī)則現在有了一個不同的名稱-PushDownPredicates,并且還有一個附加規(guī)則還負責推送過濾器PushPredicateThroughNonJoin,因此我們實際上需要同時將它們都排除在外 目標。
最后的想法
我們可以看到,通過這種技術,Spark開發(fā)人員使我們能夠控制優(yōu)化器。 但是權力也伴隨著責任。 讓我們列出使用此技術時要牢記的幾點:
· 當我們停止PushDownPredicate時,我們將負責查詢中的所有過濾器,而不僅僅是我們要重新定位的過濾器。 可能還有其他重要的過濾器要盡快進行,例如分區(qū)過濾器,因此我們需要確保它們的位置正確。
· 限制優(yōu)化器并維護過濾器是用戶方面的一些額外工作,因此值得這樣做。 在我們的模型示例中,可能會在I / O開銷很大的情況下加快查詢速度,因為我們將實現僅對數據進行一次掃描。 如果數據集具有很多列,則對于非列格式的文件格式(例如json或csv)可能就是這種情況。
· 同樣,如果數據集很小,則可能不值得花更多的精力來控制優(yōu)化器,因為簡單的緩存就可以完成工作。 但是,當數據集很大時,將數據存儲在緩存層中的開銷將變得顯而易見。 另一方面,重用的Exchange將不會帶來任何額外的開銷,因為無論如何計算的混洗都將存儲在磁盤上。
· 該技術基于Spark的內部行為,該行為沒有官方文檔,如果此功能發(fā)生更改,則可能很難找到它。 在我們的示例中,我們可以看到Spark 3.0中實際上有一個更改,其中一個規(guī)則被重命名,而另一個規(guī)則被添加。
結論
我們已經看到,要獲得最佳性能可能需要了解查詢計劃。 通過使用一組啟發(fā)式規(guī)則優(yōu)化我們的查詢,Spark優(yōu)化器可以很好地完成工作。 但是,在某些情況下,這些規(guī)則會錯過最佳配置。 有時重寫查詢就足夠了,但有時卻不能,因為通過重寫查詢,我們將獲得不同的邏輯計劃,但是我們無法直接控制將要執(zhí)行的物理計劃。 從Spark 2.4開始,我們可以使用排除規(guī)則的配置設置,該設置允許我們限制優(yōu)化器,從而將Spark導航到更自定義的物理計劃。
在許多情況下,依靠優(yōu)化器將導致制定具有相當高效執(zhí)行力的可靠計劃,但是,在大多數情況下,關鍵性能工作負載尤為重要,因此有必要檢查最終計劃并看看我們是否可以通過采用該計劃來改進它。 控制優(yōu)化器。
(本文翻譯自David Vrba的文章《Be in charge of Query Execution in Spark SQL》,參考:https://towardsdatascience.com/be-in-charge-of-query-execution-in-spark-sql-c83d1e16b9b8)
總結
以上是生活随笔為你收集整理的如何查询spark版本_掌握Spark SQL中的查询执行的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据流程图顶层一层二层_只需三个公式,三
- 下一篇: mysql启动报错2002_mysql登