EMR Spark Runtime Filter性能優化

背景

Join是一個很是耗費資源耗費時間的操做,特別是數據量很大的狀況下。通常流程上會涉及底層表的掃描/shuffle/Join等過程, 若是咱們可以儘量的在靠近源頭上減小參與計算的數據,一方面能夠提升查詢性能,另外一方面也能夠減小資源的消耗(網絡/IO/CPU等),在一樣的資源的狀況下能夠支撐更多的查詢。網絡

目前在SparkSQL中有Filter下推優化,包括兩個維度:性能

生成Filter

SparkSQL會從用戶的SQL語句中獲取到Filter測試

  • 直接顯示獲取優化

    select * from A where a=1

    生成Filter(a=1) on Aui

  • 隱式推斷spa

    select * from A, B where A.a = B.b and A.a=1

    推斷出Filter(b=1) on Bcode

Filter優化

利用生成的Filter算子能夠優化,好比:對象

  • 將Filter儘可能下推到靠近DataSource端
  • 若是Filter中的列是分區列,能夠提早對DataSource進行分區裁剪,只掃描須要的分區數據

Runtime Filter是針對Equi-Join場景提出的一種新的生成Filter的方式,經過動態獲取Filter內容來作相關優化。資源

Runtime Filter原理

優化對象

Equi Join, 形如get

select x,y from A join B on A.a = B.b

其中A是一個小表(如維表),B是一個大表(如事實表)
備註: A/B也能夠是一個簡單的子查詢

優化思路

如上述小表A和大表B進行Join,Join條件爲A.a=B.b,實際Join過程當中須要對大表進行全表掃描才能完成Join操做,極端狀況下如A.a僅僅只有一條記錄,也須要對B表全表掃描,影響性能。

若是在B表掃描以前,能獲取A表的a的相關信息(如全部的a值,或者a的min/max/Bloomfilter等統計信息),並在實際執行Join以前將這些信息對B表的數據進行過濾,而不是全表掃描,能夠大大提升性能。

兩種場景

根據大表B參與join的key(b)的屬性,能夠分別採集小表A參與join的key(a)的信息:

b是分區列

如上b爲大表B的一個分區列,則能夠提早收集A.a列的全部值,而後利用A.a的值對B表的b列進行分區裁剪

b不是分區列

不能作分區裁剪,只能在實際數據掃描的過程當中進行過濾。能夠提早收集A.a列的min/max/Bloomfilter的統計信息,而後利用這些統計信息對B表進行數據過濾,這個過濾又能夠分紅兩種粒度:

  • 可下推到存儲層,減小數據掃描
    如底層文件格式是Parquet/ORC, 能夠將相關過濾謂詞(min/max等)下推到存儲層面,從而減小實際掃描的數據。
  • 掃描後數據過濾
    不能下推到存儲層的,能夠在數據被掃描後作條件過濾,減小後續參與計算的數據量(如shuffle/join等)

Runtime Filter實現

Runtime Filter的實現主要在Catalyst中,分爲4個步驟:

謂詞合成

在用戶SQL生成的邏輯執行計劃樹(logical plan)中,尋找知足條件的Equi-Join節點,而後根據上面的思路,在Join的大表B側插入一個新的Filter節點,如Filter(In(b, Seq(DynamicValue(a, A))), B)

謂詞下推

上面生成的新的Filter會通過PushDownPredicate的Rule,儘可能下推靠近DataSource附近

物理執行計劃生成

該階段會將上面下推的Filter(In(b, Seq(DynamicValue(a, A))), B)轉換成物理節點(FilterExec),根據上面兩種場景會生成兩種不一樣的FilterExec

  • b是分區列
    b是分區列,採集的是a列的全部值,如:

    case class DynamicPartitionPruneFilterExec(
      child: SparkPlan, collectors: Seq[(Expression, SparkPlan)])
      extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper

其中colletors就是用於採集信息的SparkPlan,由於要跑一個SQL來採集a列的全部值(select a from A group by a);
由於有可能會有多個分區列,因此這個地方是一個Seq.

  • b是非分區列
    b是非分區列,採集的是a列的min/max/bloomfilter統計信息,如

    case class DynamicMinMaxFilterExec(
      child: SparkPlan, collectors: Seq[(Expression, SparkPlan)])
    extends DynamicFilterUnaryExecNode with CodegenSupport with PredicateHelper

同理上面collectors也是用戶採集信息的SparkPlan,如select min(a),max(a) from A

執行

在物理執行計劃實際執行的過程當中,會在DynamicPartitionPruneFilterExec/DynamicMinMaxFilterExec物理算子內先執行collectors獲取到a列的相關信息,而後對底層B的執行計劃進行改寫,好比利用採集到的信息作分區裁剪/數據過濾等。

Runtime Filter性能測試

以TPC-DS 10TB的Query54爲例:

Runtime Filter 關閉

Runtime Filter 打開

通過DynamicPartitionPruneFilter對catalog_sales的分區進行了裁剪,實際對錶的掃描從14,327,953,968減小到136,107,053,而後通過min/max的過濾繼續減小到135,564,763;另外Runtime Filter減小了大表的掃描,shuffle的數據量以及參加Join的數據量,因此對整個集羣IO/網絡/CPU有比較大的節省

總結

針對Equi-Join的場景,能夠額外的採集小表側的信息,而後在Join以前對大表進行分區裁剪或者掃描後過濾,從而提升查詢性能,減小資源消耗。



本文做者:寒沙牧

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索