SQL做爲一門標準的、通用的、簡單的DSL,在大數據分析中有着愈來愈重要的地位;Spark在批處理引擎領域當前也是處於絕對的地位,而Spark2.0中的SparkSQL也支持ANSI-SQL 2003標準。所以SparkSQL在大數據分析中的地位不言而喻。
本文將經過分析一條SQL在Spark中的解析執行過程來梳理SparkSQL執行的一個流程。sql
val spark = SparkSession.builder().appName("TestSql").master("local[*]").enableHiveSupport().getOrCreate() val df = spark.sql("select sepal_length,class from origin_csvload.csv_iris_qx order by sepal_length limit 10 ") df.show(3)
咱們在數倉中新建了一張表origin_csvload.csv_iris_qx
,而後經過SparkSQL執行了一條SQL,因爲整個過程因爲是懶加載的,須要經過Terminal方法觸發,此處咱們選擇show方法來觸發。緩存
sql
方法會執行如下3個重點:session
sessionState.sqlParser.parsePlan(sqlText)
:將SQL字符串經過ANTLR解析成邏輯計劃(Parsed Logical Plan)sparkSession.sessionState.executePlan(logicalPlan)
:執行邏輯計劃,此處爲懶加載,只新建QueryExecution
實例,並不會觸發實際動做。須要注意的是QueryExecution
實際上是包含了SQL解析執行的4個階段計劃(解析、分析、優化、執行)QueryExecution.assertAnalyzed()
:觸發語法分析,獲得分析計劃(Analyzed Logical Plan)def sql(sqlText: String): DataFrame = { //1:Parsed Logical Plan Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) } def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan)//d-1 qe.assertAnalyzed()//d-2 new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } //d-1 def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) //2:Analyzed Logical Plan lazy val analyzed: LogicalPlansparkSession.sessionState.analyzer.executeAndCheck(logical)
sql解析後計劃以下:app
== Parsed Logical Plan == 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Sort ['sepal_length ASC NULLS FIRST], true +- 'Project ['sepal_length, 'class] +- 'UnresolvedRelation `origin_csvload`.`csv_iris_qx`
主要是將SQL一一對應地翻譯成了catalyst的操做,此時數據表並無被解析,只是簡單地識別爲表。而分析後的計劃則包含了字段的位置、類型,表的具體類型(parquet)等信息。源碼分析
== Analyzed Logical Plan == sepal_length: double, class: string GlobalLimit 10 +- LocalLimit 10 +- Sort [sepal_length#0 ASC NULLS FIRST], true +- Project [sepal_length#0, class#4] +- SubqueryAlias `origin_csvload`.`csv_iris_qx` +- Relation[sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,class#4] parquet
此處有個比較有意思的點,UnresolvedRelation origin_csvload.csv_iris_qx
被翻譯成了一個子查詢別名,讀取文件出來的數據註冊成了一個表,這個是沒必要要的,後續的優化會消除這個子查詢別名。大數據
以DataSet的show
方法爲例,show
的方法調用鏈爲showString->getRows->take->head->withAction,咱們先來看看withAction
方法:優化
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan) private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { val result= SQLExecution.withNewExecutionId(sparkSession, qe) { action(qe.executedPlan) } result }
withAction
方法主要執行以下邏輯:
1. 拿到緩存的解析計劃,使用遍歷優化器執行解析計劃,獲得若干優化計劃。
2. 獲取第一個優化計劃,遍歷執行前優化得到物理執行計劃,這是已經能夠執行的計劃了。
3. 執行物理計劃,返回實際結果。至此,這條SQL之旅就結束了。ui
//3:Optimized Logical Plan,withCachedData爲Analyzed Logical Plan,即緩存的變量analyzed lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) lazy val sparkPlan: SparkPlan = planner.plan(ReturnAnswer(optimizedPlan)).next() //4:Physical Plan lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
優化後的計劃以下,能夠看到SubqueryAliases已經沒有了。spa
== Optimized Logical Plan == GlobalLimit 10 +- LocalLimit 10 +- Sort [sepal_length#0 ASC NULLS FIRST], true +- Project [sepal_length#0, class#4] +- Relation[sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,class#4] parquet
具體的優化點以下圖所示,行首有!
表示優化的地方。
scala
其中"=== Result of Batch Finish Analysis ==="表示"Finish Analysis"的規則簇(參見附錄一)被應用成功,能夠看到該規則簇中有一個消除子查詢別名的規則EliminateSubqueryAliases
Batch("Finish Analysis", Once, EliminateSubqueryAliases, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates)
最後根據物理計劃生成規則(附錄二)能夠獲得物理計劃,這就是已經能夠執行的計劃了。具體以下:
== Physical Plan == TakeOrderedAndProject(limit=10, orderBy=[sepal_length#0 ASC NULLS FIRST], output=[sepal_length#0,class#4]) +- *(1) Project [sepal_length#0, class#4] +- *(1) FileScan parquet origin_csvload.csv_iris_qx[sepal_length#0,class#4] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://di124:8020/user/hive/warehouse/origin_csvload.db/csv_iris_qx], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<sepal_length:double,class:string>
本文簡述了一條SQL是如何從字符串通過詞法解析、語法解析、規則優化等步驟轉化成可執行的物理計劃,最後以一個Terminal方法觸發邏輯返回結果。本文可爲後續SQL優化提供必定思路,以後可再詳述具體的SQL優化原則。
分析計劃會依次應用以下優化:
experimentalMethods.extraOptimizations
,當前也沒有。生成物理執行計劃的規則以下:
本文由博客一文多發平臺 OpenWrite 發佈!