AST樹 SQL語法樹是編譯後被解析的樹狀結構,樹包括不少對象,每一個節點都有特定的數據類型,同事有孩子節點(TreeNode對象)。html
規則 等價規則轉化將規則用於語法樹。任何一個SQL優化器中,都會定義大量的Rule,SQL優化器遍歷全部節點。匹配全部給定規則,若是匹配成功進行相應轉換;失敗則繼續遍歷下一個節點。mysql
基於規則優化/Rule Based Optimizer/RBOgit
常見的三種規則優化:謂詞下推、常量累加、列剪枝github
100+80
優化爲180
避免每一條record都須要執行一次100+80
的操做基於代價優化/Cost Based Optimizer/CBOsql
許多基於規則的優化技術已經實現,但優化器自己仍然有很大的改進空間。例如,沒有關於數據分佈的詳細列統計信息,所以難以精確地估計過濾(filter)、鏈接(join)等數據庫操做符的輸出大小和基數 (cardinality)。因爲不許確的估計,它常常致使優化器產生次優的查詢執行計劃,此時就須要基於代價(io和cpu的消耗)的優化器,經過對錶數據量等進行優化。數據庫
Spark SQL會依據邏輯執行計劃生成至少一個物理執行計劃,隨後經過Cost Model對每一個物理執行計劃進行開銷評估,並選擇預估開銷最小的一個做爲最終的物理執行計劃送去作代碼生成。apache
具體步驟以下。網絡
首先採集原始表基本信息,例如:表數據量大小,行數,列信息(Max,min,非空,最大列長等),列直方圖app
再定義每種算子的基數評估規則,即一個數據集通過此算子執行以後基本信息變化規則。這兩步完成以後就能夠推導出整個執行計劃樹上全部中間結果集的數據基本信息框架
定義每種算子的執行代價,結合中間結果集的基本信息,此時能夠得出任意節點的執行代價
將給定執行路徑上全部算子的代價累加獲得整棵語法樹的代價
計算出全部可能語法樹代價,並選出一條代價最小的
jion操做優化:
配置自動廣播的閾值。spark.sql.autoBroadcastJoinThreshold
默認值10M
,-1
表明不進行廣播
使用Executor廣播減小Driver內存壓力。默認的BroadCastJoin會將小表的內容,所有收集到Driver中,所以須要適當的調大Driver的內存,當廣播任務比較頻繁的時候,Driver有可能由於OOM而異常退出。
此時,能夠開啓Executor廣播,配置Executor廣播參數spark.sql.bigdata.useExecutorBroadcast
爲true
,減小Driver內存壓力。
小表執行超時,會致使任務結束。默認狀況下,BroadCastJoin只容許小表計算5分鐘,超過5分鐘該任務會出現超時異常,而這個時候小表的broadcast任務依然在執行,形成資源浪費。
這種狀況下,有兩種方式處理:
spark.sql.broadcastTimeout
的數值,加大超時的時間限制。spark.sql.autoBroadcastJoinThreshold
的數值,不使用BroadCastJoin的優化。spark內部優化。經過謂詞下推和布隆過濾器對jion操做進行優化。
擴展數據源優化
擴展數據源是指mySql,Hdfs等數據源,sparkSql針對外部數據源,對查詢邏輯進行優化,使得儘量少的數據被掃描到spark內存中。
例如:SELECT * FROM MYSQL_TABLE WHERE id > 100
在沒有優化前,會將表裏面的全部數據先加載到spark內存中,而後進行篩選。在通過擴展數據源優化後,where 後面的過濾條件會在mysql中執行。
執行計劃查看
// 物理邏輯計劃,包括parsed logical plan,Analyzed Logical plan,Optimized logical plan spark.sql("select * from tab0 ").queryExecution // 查看物理執行計劃 spark.sql("select * from tab0 ").explain // 使用Spark WebUI進行查看
舉例:
// 定義DF scala> val df = park.read.option("header","true").csv("file:///user/iteblog/sales.csv") df: org.apache.spark.sql.DataFrame = [transactionId: string, customerId: string ... 2 more fields] // 給 amountPaid 列乘以1 scala> val multipliedDF = df.selectExpr("amountPaid * 1") multipliedDF: org.apache.spark.sql.DataFrame = [(amountPaid * 1): double] // 查看優化計劃 scala> println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString) 00 Project [(cast(amountPaid#89 as double) * 1.0) AS (amountPaid * 1)#91] 01 +- Relation[transactionId#86,customerId#87,itemId#88,amountPaid#89] csv
Spark中的全部計劃都是使用tree表明的。因此numberedTreeString
方法以樹形的方式打印出優化計劃。正如上面的輸出同樣。
全部的計劃都是從下往上讀的。下面是樹中的兩個節點: 一、01 Relation:表示從csv文件建立的DataFrame; 二、00 Project:表示投影(也就是須要查詢的列)。
從上面的輸出能夠看到,爲了獲得正確的結果,Spark經過cast
將amountPaid
轉換成double
類型。
繼承RuleExecutor
類,實現其apply
方法。不須要像UDF同樣註冊,默認碰到此場景會執行此優化規則。
Catalyst是一個單獨的模塊類庫,這個模塊是基於規則的系統。這個框架中的每一個規則都是針對某個特定的狀況來優化的。好比:ConstantFolding
規則用於移除查詢中的常量表達式。
Spark 2.0提供了API,咱們能夠基於這些API添加自定義的優化規則。實現可插拔方式的Catalyst規則添加。
object MultiplyOptimizationRule extends RuleExecutor[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case Multiply(left,right) if right.isInstanceOf[Literal] && right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 => println("optimization of one applied") left } }
參考:
CBO詳細規則:https://issues.apache.org/jira/secure/attachment/12823839/Spark_CBO_Design_Spec.pdf
華爲給spark2.2.0提供了: A cost-based optimizer framework for Spark SQL
在Spark SQL中定義查詢優化規則:https://www.iteblog.com/archives/1706.html#Catalyst
SparkSQL Catalyst簡介:http://hbasefly.com/2017/03/01/sparksql-catalyst/
Cost Based Optimizer in Apache Spark 2.2:https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html