Spark SQL / Catalyst 內部原理 與 RBO

原創文章,轉載請務必將下面這段話置於文章開頭處。
本文轉發自技術世界原文連接 http://www.jasongj.com/spark/rbo/sql

本文所述內容均基於 2018年9月10日 Spark 最新 Release 2.3.1 版本。後續將持續更新apache

Spark SQL 架構

Spark SQL 的總體架構以下圖所示
json


Spark SQL Catalyst

從上圖可見,不管是直接使用 SQL 語句仍是使用 DataFrame,都會通過以下步驟轉換成 DAG 對 RDD 的操做架構

  • Parser 解析 SQL,生成 Unresolved Logical Plan
  • 由 Analyzer 結合 Catalog 信息生成 Resolved Logical Plan
  • Optimizer根據預先定義好的規則對 Resolved Logical Plan 進行優化並生成 Optimized Logical Plan
  • Query Planner 將 Optimized Logical Plan 轉換成多個 Physical Plan
  • CBO 根據 Cost Model 算出每一個 Physical Plan 的代價並選取代價最小的 Physical Plan 做爲最終的 Physical Plan
  • Spark 以 DAG 的方法執行上述 Physical Plan
  • 在執行 DAG 的過程當中,Adaptive Execution 根據運行時信息動態調整執行計劃從而提升執行效率

Parser

Spark SQL 使用 Antlr 進行記法和語法解析,並生成 UnresolvedPlan。oop

當用戶使用 SparkSession.sql(sqlText : String) 提交 SQL 時,SparkSession 最終會調用 SparkSqlParser 的 parsePlan 方法。該方法分兩步post

  • 使用 Antlr 生成的 SqlBaseLexer 對 SQL 進行詞法分析,生成 CommonTokenStream
  • 使用 Antlr 生成的 SqlBaseParser 進行語法分析,獲得 LogicalPlan

如今兩張表,分別定義以下性能

CREATE TABLE score (
  id INT,
  math_score INT,
  english_score INT
)
CREATE TABLE people (
  id INT,
  age INT,
  name INT
)

對其進行關聯查詢以下優化

SELECT sum(v)
FROM (
  SELECT score.id,
    100 + 80 + score.math_score + score.english_score AS v
  FROM people
  JOIN score
  ON people.id = score.id
  AND people.age > 10
) tmp

生成的 UnresolvedPlan 以下圖所示。
ui


Spark SQL Parser

從上圖可見spa

  • 查詢涉及的兩張表,被解析成了兩個 UnresolvedRelation,也即只知道這們是兩張表,卻並不知道它們是 EXTERNAL TABLE 仍是 MANAGED TABLE,也不知道它們的數據存在哪兒,更不知道它們的表結構如何
  • sum(v) 的結果未命名
  • Project 部分只知道是選擇出了屬性,卻並不知道這些屬性屬於哪張表,更不知道其數據類型
  • Filter 部分也不知道數據類型

Spark SQL 解析出的 UnresolvedPlan 以下所示

== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias tmp
   +- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#493]
      +- 'Filter (('people.id = 'score.id) && ('people.age > 10))
         +- 'Join Inner
            :- 'UnresolvedRelation `people`
            +- 'UnresolvedRelation `score`

Analyzer

從 Analyzer 的構造方法可見

  • Analyzer 持有一個 SessionCatalog 對象的引用
  • Analyzer 繼承自 RuleExecutor[LogicalPlan],所以可對 LogicalPlan 進行轉換
class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {

Analyzer 包含了以下的轉換規則

lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveSubqueryColumnAliases ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables(conf) ::
      ResolveTimeZone(conf) ::
      ResolvedUuidExpressions ::
      TypeCoercion.typeCoercionRules(conf) ++
      extendedResolutionRules : _*),
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
    Batch("View", Once,
      AliasViewChild(conf)),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Subquery", Once,
      UpdateOuterReferences),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

例如, ResolveRelations 用於分析查詢用到的 Table 或 View。本例中 UnresolvedRelation (people) 與 UnresolvedRelation (score) 被解析爲 HiveTableRelation (json.people) 與 HiveTableRelation (json.score),並列出其各自包含的字段名。

經 Analyzer 分析後獲得的 Resolved Logical Plan 以下所示

== Analyzed Logical Plan ==
sum(v): bigint
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- SubqueryAlias tmp
   +- Project [id#500, (((100 + 80) + math_score#501) + english_score#502) AS v#493]
      +- Filter ((id#496 = id#500) && (age#497 > 10))
         +- Join Inner
            :- SubqueryAlias people
            :  +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
            +- SubqueryAlias score
               +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]

Analyzer 分析先後的 LogicalPlan 對好比下


Spark SQL Analyzer

由上圖可見,分析後,每張表對應的字段集,字段類型,數據存儲位置都已肯定。Project 與 Filter 操做的字段類型以及在表中的位置也已肯定。

有了這些信息,已經能夠直接將該 LogicalPlan 轉換爲 Physical Plan 進行執行。

可是因爲不一樣用戶提交的 SQL 質量不一樣,直接執行會形成不一樣用戶提交的語義相同的不一樣 SQL 執行效率差距甚遠。換句話說,若是要保證較高的執行效率,用戶須要作大量的 SQL 優化,使用體驗大大下降。

爲了儘量保證不管用戶是否熟悉 SQL 優化,提交的 SQL 質量如何, Spark SQL 都能以較高效率執行,還需在執行前進行 LogicalPlan 優化。

Optimizer

Spark SQL 目前的優化主要是基於規則的優化,即 RBO (Rule-based optimization)

  • 每一個優化以 Rule 的形式存在,每條 Rule 都是對 Analyzed Plan 的等價轉換
  • RBO 設計良好,易於擴展,新的規則能夠很是方便地嵌入進 Optimizer
  • RBO 目前已經足夠好,但仍然須要更多規則來 cover 更多的場景
  • 優化思路主要是減小參與計算的數據量以及計算自己的代價

PushdownPredicate
PushdownPredicate 是最多見的用於減小參與計算的數據量的方法。

前文中直接對兩表進行 Join 操做,而後再 進行 Filter 操做。引入 PushdownPredicate 後,可先對兩表進行 Filter 再進行 Join,以下圖所示。


Spark SQL RBO Predicate Pushdown

當 Filter 可過濾掉大部分數據時,參與 Join 的數據量大大減小,從而使得 Join 操做速度大大提升。

這裏須要說明的是,此處的優化是 LogicalPlan 的優化,從邏輯上保證了將 Filter 下推後因爲參與 Join 的數據量變少而提升了性能。另外一方面,在物理層面,Filter 下推後,對於支持 Filter 下推的 Storage,並不須要將表的全量數據掃描出來再過濾,而是直接只掃描符合 Filter 條件的數據,從而在物理層面極大減小了掃描表的開銷,提升了執行速度。

ConstantFolding
本文的 SQL 查詢中,Project 部分包含了 100 + 800 + match_score + english_score 。若是不進行優化,那若是有一億條記錄,就會計算一億次 100 + 80,很是浪費資源。所以可經過 ConstantFolding 將這些常量合併,從而減小沒必要要的計算,提升執行速度。


Spark SQL RBO Constant Folding

ColumnPruning
在上圖中,Filter 與 Join 操做會保留兩邊全部字段,而後在 Project 操做中篩選出須要的特定列。若是能將 Project 下推,在掃描表時就只篩選出知足後續操做的最小字段集,則能大大減小 Filter 與 Project 操做的中間結果集數據量,從而極大提升執行速度。

Spark SQL RBO Column Pruning

這裏須要說明的是,此處的優化是邏輯上的優化。在物理上,Project 下推後,對於列式存儲,如 Parquet 和 ORC,可在掃描表時就只掃描須要的列而跳過不須要的列,進一步減小了掃描開銷,提升了執行速度。

通過如上優化後的 LogicalPlan 以下

== Optimized Logical Plan ==
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- Project [((180 + math_score#501) + english_score#502) AS v#493]
   +- Join Inner, (id#496 = id#500)
      :- Project [id#496]
      :  +- Filter ((isnotnull(age#497) && (age#497 > 10)) && isnotnull(id#496))
      :     +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
      +- Filter isnotnull(id#500)
         +- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]

SparkPlanner

獲得優化後的 LogicalPlan 後,SparkPlanner 將其轉化爲 SparkPlan 即物理計劃。

本例中因爲 score 表數據量較小,Spark 使用了 BroadcastJoin。所以 score 表通過 Filter 後直接使用 BroadcastExchangeExec 將數據廣播出去,而後結合廣播數據對 people 表使用 BroadcastHashJoinExec 進行 Join。再通過 Project 後使用 HashAggregateExec 進行分組聚合。


Spark SQL RBO Column Pruning

至此,一條 SQL 從提交到解析、分析、優化以及執行的完整過程就介紹完畢。

本文介紹的 Optimizer 屬於 RBO,實現簡單有效。它屬於 LogicalPlan 的優化,全部優化均基於 LogicalPlan 自己的特色,未考慮數據自己的特色,也未考慮算子自己的代價。下文將介紹 CBO,它充分考慮了數據自己的特色(如大小、分佈)以及操做算子的特色(中間結果集的分佈及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃,即 SparkPlan。

相關文章
相關標籤/搜索