EMR Spark Runtime Filter性能优化
背景
Join是一個非常耗費資源耗費時間的操作,特別是數(shù)據(jù)量很大的情況下。一般流程上會涉及底層表的掃描/shuffle/Join等過程, 如果我們能夠盡可能的在靠近源頭上減少參與計算的數(shù)據(jù),一方面可以提高查詢性能,另一方面也可以減少資源的消耗(網(wǎng)絡/IO/CPU等),在同樣的資源的情況下可以支撐更多的查詢。
目前在SparkSQL中有Filter下推優(yōu)化,包括兩個維度:
生成Filter
SparkSQL會從用戶的SQL語句中獲取到Filter
-
直接顯示獲取
select * from A where a=1生成Filter(a=1) on A
-
隱式推斷
select * from A, B where A.a = B.b and A.a=1推斷出Filter(b=1) on B
Filter優(yōu)化
利用生成的Filter算子可以優(yōu)化,比如:
- 將Filter盡量下推到靠近DataSource端
-
如果Filter中的列是分區(qū)列,可以提前對DataSource進行分區(qū)裁剪,只掃描需要的分區(qū)數(shù)據(jù)
Runtime Filter是針對Equi-Join場景提出的一種新的生成Filter的方式,通過動態(tài)獲取Filter內(nèi)容來做相關(guān)優(yōu)化。
Runtime Filter原理
優(yōu)化對象
Equi Join, 形如
select x,y from A join B on A.a = B.b其中A是一個小表(如維表),B是一個大表(如事實表)
備注:?A/B也可以是一個簡單的子查詢
優(yōu)化思路
如上述小表A和大表B進行Join,Join條件為A.a=B.b,實際Join過程中需要對大表進行全表掃描才能完成Join操作,極端情況下如A.a僅僅只有一條記錄,也需要對B表全表掃描,影響性能。
如果在B表掃描之前,能獲取A表的a的相關(guān)信息(如所有的a值,或者a的min/max/Bloomfilter等統(tǒng)計信息),并在實際執(zhí)行Join之前將這些信息對B表的數(shù)據(jù)進行過濾,而不是全表掃描,可以大大提高性能。
兩種場景
根據(jù)大表B參與join的key(b)的屬性,可以分別采集小表A參與join的key(a)的信息:
b是分區(qū)列
如上b為大表B的一個分區(qū)列,則可以提前收集A.a列的所有值,然后利用A.a的值對B表的b列進行分區(qū)裁剪
b不是分區(qū)列
不能做分區(qū)裁剪,只能在實際數(shù)據(jù)掃描的過程中進行過濾。可以提前收集A.a列的min/max/Bloomfilter的統(tǒng)計信息,然后利用這些統(tǒng)計信息對B表進行數(shù)據(jù)過濾,這個過濾又可以分成兩種粒度:
- 可下推到存儲層,減少數(shù)據(jù)掃描
如底層文件格式是Parquet/ORC, 可以將相關(guān)過濾謂詞(min/max等)下推到存儲層面,從而減少實際掃描的數(shù)據(jù)。 - 掃描后數(shù)據(jù)過濾
不能下推到存儲層的,可以在數(shù)據(jù)被掃描后做條件過濾,減少后續(xù)參與計算的數(shù)據(jù)量(如shuffle/join等)
Runtime Filter實現(xiàn)
Runtime Filter的實現(xiàn)主要在Catalyst中,分為4個步驟:
謂詞合成
在用戶SQL生成的邏輯執(zhí)行計劃樹(logical plan)中,尋找滿足條件的Equi-Join節(jié)點,然后根據(jù)上面的思路,在Join的大表B側(cè)插入一個新的Filter節(jié)點,如Filter(In(b, Seq(DynamicValue(a, A))), B)
謂詞下推
上面生成的新的Filter會經(jīng)過PushDownPredicate的Rule,盡量下推靠近DataSource附近
物理執(zhí)行計劃生成
該階段會將上面下推的Filter(In(b, Seq(DynamicValue(a, A))), B)轉(zhuǎn)換成物理節(jié)點(FilterExec),根據(jù)上面兩種場景會生成兩種不同的FilterExec
-
b是分區(qū)列
case class DynamicPartitionPruneFilterExec(child: SparkPlan, collectors: Seq[(Expression, SparkPlan)])extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
b是分區(qū)列,采集的是a列的所有值,如:
其中colletors就是用于采集信息的SparkPlan,因為要跑一個SQL來采集a列的所有值(select a from A group by a);
因為有可能會有多個分區(qū)列,所以這個地方是一個Seq.
-
b是非分區(qū)列
case class DynamicMinMaxFilterExec(child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
b是非分區(qū)列,采集的是a列的min/max/bloomfilter統(tǒng)計信息,如
同理上面collectors也是用戶采集信息的SparkPlan,如select min(a),max(a) from A
執(zhí)行
在物理執(zhí)行計劃實際執(zhí)行的過程中,會在DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec物理算子內(nèi)先執(zhí)行collectors獲取到a列的相關(guān)信息,然后對底層B的執(zhí)行計劃進行改寫,比如利用采集到的信息做分區(qū)裁剪/數(shù)據(jù)過濾等。
Runtime Filter性能測試
以TPC-DS 10TB的Query54為例:
Runtime Filter 關(guān)閉
??
Runtime Filter 打開
經(jīng)過DynamicPartitionPruneFilter對catalog_sales的分區(qū)進行了裁剪,實際對表的掃描從14,327,953,968減少到136,107,053,然后經(jīng)過min/max的過濾繼續(xù)減少到135,564,763;另外Runtime Filter減少了大表的掃描,shuffle的數(shù)據(jù)量以及參加Join的數(shù)據(jù)量,所以對整個集群IO/網(wǎng)絡/CPU有比較大的節(jié)省
總結(jié)
針對Equi-Join的場景,可以額外的采集小表側(cè)的信息,然后在Join之前對大表進行分區(qū)裁剪或者掃描后過濾,從而提高查詢性能,減少資源消耗。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的EMR Spark Runtime Filter性能优化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: containerd与安全沙箱的Kube
- 下一篇: 从踩坑到填坑|淘宝Web 3D应用与游戏