Join是一個很是耗費資源耗費時間的操做,特別是數據量很大的狀況下。通常流程上會涉及底層表的掃描/shuffle/Join等過程, 若是咱們可以儘量的在靠近源頭上減小參與計算的數據,一方面能夠提升查詢性能,另外一方面也能夠減小資源的消耗(網絡/IO/CPU等),在一樣的資源的狀況下能夠支撐更多的查詢。網絡
目前在SparkSQL中有Filter下推優化,包括兩個維度:性能
SparkSQL會從用戶的SQL語句中獲取到Filter測試
直接顯示獲取優化
select * from A where a=1
生成Filter(a=1) on Aui
隱式推斷spa
select * from A, B where A.a = B.b and A.a=1
推斷出Filter(b=1) on Bcode
利用生成的Filter算子能夠優化,好比:對象
若是Filter中的列是分區列,能夠提早對DataSource進行分區裁剪,只掃描須要的分區數據blog
Runtime Filter
是針對Equi-Join場景提出的一種新的生成Filter的方式,經過動態獲取Filter內容來作相關優化。資源
Equi Join, 形如
select x,y from A join B on A.a = B.b
其中A是一個小表(如維表),B是一個大表(如事實表)
備註: A/B也能夠是一個簡單的子查詢
如上述小表A和大表B進行Join,Join條件爲A.a=B.b,實際Join過程當中須要對大表進行全表掃描才能完成Join操做,極端狀況下如A.a僅僅只有一條記錄,也須要對B表全表掃描,影響性能。
若是在B表掃描以前,能獲取A表的a的相關信息(如全部的a值,或者a的min/max/Bloomfilter等統計信息),並在實際執行Join以前將這些信息對B表的數據進行過濾,而不是全表掃描,能夠大大提升性能。
根據大表B參與join的key(b)的屬性,能夠分別採集小表A參與join的key(a)的信息:
如上b爲大表B的一個分區列,則能夠提早收集A.a列的全部值
,而後利用A.a的值對B表的b列進行分區裁剪
不能作分區裁剪,只能在實際數據掃描的過程當中進行過濾。能夠提早收集A.a列的min/max/Bloomfilter的統計信息
,而後利用這些統計信息對B表進行數據過濾,這個過濾又能夠分紅兩種粒度:
Runtime Filter的實現主要在Catalyst中,分爲4個步驟:
在用戶SQL生成的邏輯執行計劃樹(logical plan)中,尋找知足條件的Equi-Join節點,而後根據上面的思路,在Join的大表B側插入一個新的Filter節點,如Filter(In(b, Seq(DynamicValue(a, A))), B)
上面生成的新的Filter會通過PushDownPredicate的Rule,儘可能下推靠近DataSource附近
該階段會將上面下推的Filter(In(b, Seq(DynamicValue(a, A))), B)
轉換成物理節點(FilterExec),根據上面兩種場景會生成兩種不一樣的FilterExec
b是分區列
b是分區列,採集的是a列的全部值,如:
case class DynamicPartitionPruneFilterExec( child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
其中colletors就是用於採集信息的SparkPlan,由於要跑一個SQL來採集a列的全部值(select a from A group by a
);
由於有可能會有多個分區列,因此這個地方是一個Seq.
b是非分區列
b是非分區列,採集的是a列的min/max/bloomfilter統計信息,如
case class DynamicMinMaxFilterExec( child: SparkPlan, collectors: Seq[(Expression, SparkPlan)]) extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper
同理上面collectors也是用戶採集信息的SparkPlan,如select min(a),max(a) from A
在物理執行計劃實際執行的過程當中,會在DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec物理算子內先執行collectors獲取到a列的相關信息,而後對底層B的執行計劃進行改寫,好比利用採集到的信息作分區裁剪/數據過濾等。
以TPC-DS 10TB的Query54爲例:
通過DynamicPartitionPruneFilter對catalog_sales的分區進行了裁剪,實際對錶的掃描從14,327,953,968減小到136,107,053,而後通過min/max的過濾繼續減小到135,564,763;另外Runtime Filter減小了大表的掃描,shuffle的數據量以及參加Join的數據量,因此對整個集羣IO/網絡/CPU有比較大的節省
針對Equi-Join的場景,能夠額外的採集小表側的信息,而後在Join以前對大表進行分區裁剪或者掃描後過濾,從而提升查詢性能,減小資源消耗。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。