sparkSql catalyst優化器

相關概念

  • AST樹 SQL語法樹是編譯後被解析的樹狀結構,樹包括不少對象,每一個節點都有特定的數據類型,同事有孩子節點(TreeNode對象)。html

  • 規則 等價規則轉化將規則用於語法樹。任何一個SQL優化器中,都會定義大量的Rule,SQL優化器遍歷全部節點。匹配全部給定規則,若是匹配成功進行相應轉換;失敗則繼續遍歷下一個節點。mysql

Catalyst工做流程

  • Parser,利用ANTLR將sparkSql字符串解析爲抽象語法樹AST,稱爲unresolved logical plan/ULP
  • Analyzer,藉助於數據元數據catalog將ULP解析爲logical plan/LP
  • Optimizer,根據各類RBO,CBO優化策略獲得optimized logical plan/OLP,主要是對Logical Plan進行剪枝,合併等操做,進而刪除掉一些無用計算,或對一些計算的多個步驟進行合併

優化分類

基於規則優化/Rule Based Optimizer/RBOgit

  • 一種經驗式、啓發式優化思路,基於已知的優化方法定義具體的規則進行優化。
  • 對於核心優化算子join有點力不從心,如兩張表執行join,到底使用broadcaseHashJoin仍是sortMergeJoin,目前sparkSql是經過手工設定參數來肯定的,若是一個表的數據量小於某個閾值(默認10M)就使用broadcastHashJoin

常見的三種規則優化:謂詞下推、常量累加、列剪枝github

  • 謂詞下推:掃描數據量過濾,好比合並兩個過濾條件爲一個減小一次過濾掃描
  • 常量累加:減小常量操做,eg:從100+80優化爲180避免每一條record都須要執行一次100+80的操做
  • 列剪枝(去掉不須要的列):對列式數據庫提升掃描效率,減小網絡、內存數據量消耗

基於代價優化/Cost Based Optimizer/CBOsql

許多基於規則的優化技術已經實現,但優化器自己仍然有很大的改進空間。例如,沒有關於數據分佈的詳細列統計信息,所以難以精確地估計過濾(filter)、鏈接(join)等數據庫操做符的輸出大小和基數 (cardinality)。因爲不許確的估計,它常常致使優化器產生次優的查詢執行計劃,此時就須要基於代價(io和cpu的消耗)的優化器,經過對錶數據量等進行優化。數據庫

Spark SQL會依據邏輯執行計劃生成至少一個物理執行計劃,隨後經過Cost Model對每一個物理執行計劃進行開銷評估,並選擇預估開銷最小的一個做爲最終的物理執行計劃送去作代碼生成。apache

具體步驟以下。網絡

  1. 首先採集原始表基本信息,例如:表數據量大小,行數,列信息(Max,min,非空,最大列長等),列直方圖app

  2. 再定義每種算子的基數評估規則,即一個數據集通過此算子執行以後基本信息變化規則。這兩步完成以後就能夠推導出整個執行計劃樹上全部中間結果集的數據基本信息框架

  3. 定義每種算子的執行代價,結合中間結果集的基本信息,此時能夠得出任意節點的執行代價

  4. 將給定執行路徑上全部算子的代價累加獲得整棵語法樹的代價

  5. 計算出全部可能語法樹代價,並選出一條代價最小的

jion操做優化

  1. 配置自動廣播的閾值。spark.sql.autoBroadcastJoinThreshold默認值10M,-1表明不進行廣播

  2. 使用Executor廣播減小Driver內存壓力。默認的BroadCastJoin會將小表的內容,所有收集到Driver中,所以須要適當的調大Driver的內存,當廣播任務比較頻繁的時候,Driver有可能由於OOM而異常退出。

    此時,能夠開啓Executor廣播,配置Executor廣播參數spark.sql.bigdata.useExecutorBroadcasttrue,減小Driver內存壓力。

  3. 小表執行超時,會致使任務結束。默認狀況下,BroadCastJoin只容許小表計算5分鐘,超過5分鐘該任務會出現超時異常,而這個時候小表的broadcast任務依然在執行,形成資源浪費。

    這種狀況下,有兩種方式處理:

    • 調整spark.sql.broadcastTimeout的數值,加大超時的時間限制。
    • 下降spark.sql.autoBroadcastJoinThreshold的數值,不使用BroadCastJoin的優化。
  4. spark內部優化。經過謂詞下推和布隆過濾器對jion操做進行優化。

    • 針對每一個join評估當前兩張表使用每種join策略的代價,根據代價估算肯定一種代價最小的方案

    • 不一樣physical plans輸入到代價模型(目前是統計),調整join順序,減小中間shuffle數據集大小,達到最優輸出

    • 詳見...

擴展數據源優化

擴展數據源是指mySql,Hdfs等數據源,sparkSql針對外部數據源,對查詢邏輯進行優化,使得儘量少的數據被掃描到spark內存中。

例如:SELECT * FROM MYSQL_TABLE WHERE id > 100在沒有優化前,會將表裏面的全部數據先加載到spark內存中,而後進行篩選。在通過擴展數據源優化後,where 後面的過濾條件會在mysql中執行。

執行計劃查看

// 物理邏輯計劃,包括parsed logical plan,Analyzed Logical plan,Optimized logical plan
spark.sql("select * from tab0 ").queryExecution
// 查看物理執行計劃
spark.sql("select * from tab0 ").explain
// 使用Spark WebUI進行查看

舉例:

// 定義DF
scala> val df = park.read.option("header","true").csv("file:///user/iteblog/sales.csv")
df: org.apache.spark.sql.DataFrame = [transactionId: string, customerId: string ... 2 more fields]
// 給 amountPaid 列乘以1
scala> val multipliedDF = df.selectExpr("amountPaid * 1")
multipliedDF: org.apache.spark.sql.DataFrame = [(amountPaid * 1): double]
// 查看優化計劃 
scala> println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)
00 Project [(cast(amountPaid#89 as double) * 1.0) AS (amountPaid * 1)#91]
01 +- Relation[transactionId#86,customerId#87,itemId#88,amountPaid#89] csv

Spark中的全部計劃都是使用tree表明的。因此numberedTreeString方法以樹形的方式打印出優化計劃。正如上面的輸出同樣。

  全部的計劃都是從下往上讀的。下面是樹中的兩個節點:   一、01 Relation:表示從csv文件建立的DataFrame;   二、00 Project:表示投影(也就是須要查詢的列)。

從上面的輸出能夠看到,爲了獲得正確的結果,Spark經過castamountPaid轉換成double類型。

自定義優化器

繼承RuleExecutor類,實現其apply方法。不須要像UDF同樣註冊,默認碰到此場景會執行此優化規則。

Catalyst是一個單獨的模塊類庫,這個模塊是基於規則的系統。這個框架中的每一個規則都是針對某個特定的狀況來優化的。好比:ConstantFolding規則用於移除查詢中的常量表達式。

Spark 2.0提供了API,咱們能夠基於這些API添加自定義的優化規則。實現可插拔方式的Catalyst規則添加。

object MultiplyOptimizationRule extends RuleExecutor[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
       case Multiply(left,right) if right.isInstanceOf[Literal] &&
           right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
           println("optimization of one applied")
           left
       }
}

參考:

CBO詳細規則:https://issues.apache.org/jira/secure/attachment/12823839/Spark_CBO_Design_Spec.pdf

華爲給spark2.2.0提供了: A cost-based optimizer framework for Spark SQL

在Spark SQL中定義查詢優化規則:https://www.iteblog.com/archives/1706.html#Catalyst

SparkSQL Catalyst簡介:http://hbasefly.com/2017/03/01/sparksql-catalyst/

Cost Based Optimizer in Apache Spark 2.2:https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html

相關文章
相關標籤/搜索