Spark SQL原理解析前言:html
Spark SQL源碼剖析(一)SQL解析框架Catalyst流程概述java
Spark SQL源碼解析(二)Antlr4解析Sql並生成樹sql
Spark SQL源碼解析(三)Analysis階段分析數據庫
Spark SQL源碼解析(四)Optimization和Physical Planning階段解析express
前面通過千辛萬苦,終於生成可實際執行的SparkPlan(即PhysicalPlan)。但在真正執行前,還須要作一些準備工做,包括在必要的地方插入一些shuffle做業,在須要的地方進行數據格式轉換等等。apache
這部份內容都在org.apache.spark.sql.execution.QueryExecution類中。咱們看看代碼session
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其餘代碼 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //調用下面的preparations,而後使用foldLeft遍歷preparations中的Rule並應用到SparkPlan protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ //定義各個Rule protected def preparations: Seq[Rule[SparkPlan]] = Seq( PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) ......其餘代碼 }
準備階段是去調用prepareForExecution方法,而prepareForExecution也簡單,仍是咱們早先看過的Rule那一套東西。定義一系列的Rule,而後讓Rule去匹配SparkPlan而後轉換一遍。app
這裏在於每條Rule都是幹嗎用的,這裏介紹一下吧。框架
生成子查詢,在比較早的版本,Spark SQL仍是不支持子查詢的,不過如今加上了,這條Rule實際上是對子查詢的SQL新生成一個QueryExecution(就是咱們一直分析的這個流程),還記得QueryExecution裏面的變量基本都是懶加載的吧,這些不會當即執行,都是到最後一併執行的,說白了就有點遞歸的意思。ide
這條是比較重要的,代碼量也多。主要就是驗證輸出的分區(partition)和咱們要的分區是否是同樣,不同那天然須要加入shuffle處理重分區,若是有排序需求還會排序。
這個是和一個優化相關的,先介紹下相關背景。Whole stage Codegen在一些MPP數據庫被用來提升性能,主要就是將一串的算子,轉換成一段代碼(Spark sql轉換成java代碼),從而提升性能。好比下圖,一串的算子操做,能夠轉換成一個java方法,這一一來性能會有必定的提高。
這一步就是在支持Codegen的SparkPlan上添加一個WholeStageCodegenExec,不支持Codegen的SparkPlan則會添加一個InputAdapter。這一點在下面看preparations階段結果的時候能看到,還有這個優化是默認開啓的。
這兩個都是大概一樣的功能就放一塊說了。首先Exchange是對shuffle如何進行的描述,能夠理解爲就是shuffle吧。
這裏的ReuseExchange是一個優化措施,去找有重複的Exchange的地方,而後將結果替換過去,避免重複計算。
ReuseSubquery也是一樣的道理,若是一條SQL語句中有多個相同的子查詢,那麼是不會重複計算的,會將計算的結果直接替換到重複的子查詢中去,提升性能。
這裏我略過了CollapseCodegenStages,這部分比較複雜,也沒什麼時間看,就先跳過了,大概知道這個東西是一個優化措施就好了。
那再來看看這一階段後,示例代碼會變成什麼樣吧,先看示例代碼:
//生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //調用spark.sql val queryCaseWhen = sql("select key from src ")
結果生成以下:
Project [_1#2 AS key#5] +- LocalTableScan [_1#2, _2#3]
好吧這裏看仍是和以前Optimation階段同樣,不過斷點看就不大同樣了。
因爲咱們的SQL比較簡單,因此只多了兩個SparkPlan,就是WholeStageCodegenExec和InputAdapter,和上面說的是一致的!
OK,通過以上的準備以後,就要開始最後的執行階段了。
依舊是在QueryExecution裏面,
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其餘代碼 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() ......其餘代碼 }
這裏其實是調用了以前生成的SparkPlan的execute()方法,這個方法最終會再調用它的doExecute()方法,而這個方法是各個子類本身實現的,也就是說,不一樣的SparkPlan執行的doExecute()是不同的。
經過上面的階段,咱們獲得了一棵4層的樹,不過其中WholeStageCodegenExec和InputAdapter是爲Codegen優化生成的,這裏就不討論了,忽略這兩個其實結果是同樣的。也就是說這裏只介紹ProjectExec和LocalTableScanExec兩個SparkPlan的doExecute()方法。
先是ProjectExec這個SparkPlan,咱們看看它的doExecute()代碼。
case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport { ......其餘代碼 protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) } } ......其餘代碼 }
能夠看到它是先遞歸去調用child(也就是LocalTableScanExec)的doExecute()方法,仍是得先去看看LocalTableScanExec生成什麼東西呀。
case class LocalTableScanExec( output: Seq[Attribute], @transient rows: Seq[InternalRow]) extends LeafExecNode { ......其餘代碼 private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") rdd.map { r => numOutputRows += 1 r } } ......其餘代碼
能夠看到最底層的rdd就是在這裏實現的,LocalTableScanExec一開始就會生成一個lazy的rdd,在須要的時候返回。而在doExecute()方法中的numOutputRows能夠理解爲僅是一個測量值,暫時不用理會。總之這裏咱們就發現LocalTableScanExec的doExecute()其實就是返回一個parallelize生成的rdd。而後再回到ProjectExec去。
它調用child.execute().mapPartitionsWithIndexInternal {......},這裏的mapPartitionsWithIndexInternal和rdd的mapPartitionsWithIndex是相似的,區別只在於mapPartitionsWithIndexInternal只會在內部模塊使用,若是有童鞋不明白mapPartitionsWithIndex這個API,能夠百度查查看。而後重點看mapPartitionsWithIndexInternal的內部邏輯。
child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) }
這裏最後一行iter.map(project),其實仍是scala的語法糖,實際大概是這樣iter.map(i => project.apply(i))。就是調用project的apply方法,對每行數據處理。而後經過追蹤,能夠發現project的實例是InterpretedUnsafeProjection,咱們看看它的apply方法。
class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { ......其餘代碼 override def apply(row: InternalRow): UnsafeRow = { // Put the expression results in the intermediate row. var i = 0 while (i < numFields) { values(i) = expressions(i).eval(row) i += 1 } // Write the intermediate row to an unsafe row. rowWriter.reset() writer(intermediate) rowWriter.getRow() } ......其餘代碼
這裏其實重點在最後三行,就是將結果寫入到result row,再返回回去。當執行完畢的時候,就會獲得最終的RDD[InternalRow],再剩下的,就交給spark core去處理了。
OK,那到這裏基本就把Spark整個流程給講完了,回顧一下整個流程。
其實能夠發現流程是挺簡單的,不少其餘SQL解析框架(好比calcite)也是相似的流程,只是在設計上在某些方面的取捨會有誤差。然後深刻到代碼的時候容易陷入一些細節中,固然這幾篇也省略了不少細節,不少時候細節纔是真正精髓的地方,之後有若是涉及到的時候再寫文章討論吧(/偷笑)。若是在開放過程當中涉及到SQL解析這方面的開放,應該都會是在優化方面,也就是Optimization階段增長或處理Rule,這塊就須要對代數優化理論和代碼有一些瞭解了。
限於本人水平,介紹spark sql的這幾篇文章不免有疏漏和不足的地方,歡迎在評論區評論,先謝過了~~
以上~