Spark SQL源碼解析(五)SparkPlan準備和執行階段

Spark SQL原理解析前言:html

Spark SQL源碼剖析(一)SQL解析框架Catalyst流程概述java

Spark SQL源碼解析(二)Antlr4解析Sql並生成樹sql

Spark SQL源碼解析(三)Analysis階段分析數據庫

Spark SQL源碼解析(四)Optimization和Physical Planning階段解析express

SparkPlan準備階段介紹

前面通過千辛萬苦,終於生成可實際執行的SparkPlan(即PhysicalPlan)。但在真正執行前,還須要作一些準備工做,包括在必要的地方插入一些shuffle做業,在須要的地方進行數據格式轉換等等。apache

這部份內容都在org.apache.spark.sql.execution.QueryExecution類中。咱們看看代碼session

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
  ......其餘代碼
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
  
  //調用下面的preparations,而後使用foldLeft遍歷preparations中的Rule並應用到SparkPlan
  protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
  }

  /** A sequence of rules that will be applied in order to the physical plan before execution. */
  //定義各個Rule
  protected def preparations: Seq[Rule[SparkPlan]] = Seq(
    PlanSubqueries(sparkSession),
    EnsureRequirements(sparkSession.sessionState.conf),
    CollapseCodegenStages(sparkSession.sessionState.conf),
    ReuseExchange(sparkSession.sessionState.conf),
    ReuseSubquery(sparkSession.sessionState.conf))
  ......其餘代碼
}

準備階段是去調用prepareForExecution方法,而prepareForExecution也簡單,仍是咱們早先看過的Rule那一套東西。定義一系列的Rule,而後讓Rule去匹配SparkPlan而後轉換一遍。app

這裏在於每條Rule都是幹嗎用的,這裏介紹一下吧。框架

PlanSubqueries(sparkSession)

生成子查詢,在比較早的版本,Spark SQL仍是不支持子查詢的,不過如今加上了,這條Rule實際上是對子查詢的SQL新生成一個QueryExecution(就是咱們一直分析的這個流程),還記得QueryExecution裏面的變量基本都是懶加載的吧,這些不會當即執行,都是到最後一併執行的,說白了就有點遞歸的意思。ide

EnsureRequirements(sparkSession.sessionState.conf)

這條是比較重要的,代碼量也多。主要就是驗證輸出的分區(partition)和咱們要的分區是否是同樣,不同那天然須要加入shuffle處理重分區,若是有排序需求還會排序。

CollapseCodegenStages

這個是和一個優化相關的,先介紹下相關背景。Whole stage Codegen在一些MPP數據庫被用來提升性能,主要就是將一串的算子,轉換成一段代碼(Spark sql轉換成java代碼),從而提升性能。好比下圖,一串的算子操做,能夠轉換成一個java方法,這一一來性能會有必定的提高。

這一步就是在支持Codegen的SparkPlan上添加一個WholeStageCodegenExec,不支持Codegen的SparkPlan則會添加一個InputAdapter。這一點在下面看preparations階段結果的時候能看到,還有這個優化是默認開啓的。

ReuseExchange和ReuseSubquery

這兩個都是大概一樣的功能就放一塊說了。首先Exchange是對shuffle如何進行的描述,能夠理解爲就是shuffle吧。

這裏的ReuseExchange是一個優化措施,去找有重複的Exchange的地方,而後將結果替換過去,避免重複計算。

ReuseSubquery也是一樣的道理,若是一條SQL語句中有多個相同的子查詢,那麼是不會重複計算的,會將計算的結果直接替換到重複的子查詢中去,提升性能。

這裏我略過了CollapseCodegenStages,這部分比較複雜,也沒什麼時間看,就先跳過了,大概知道這個東西是一個優化措施就好了。

那再來看看這一階段後,示例代碼會變成什麼樣吧,先看示例代碼:

//生成DataFrame
    val df = Seq((1, 1)).toDF("key", "value")
    df.createOrReplaceTempView("src")
    //調用spark.sql
    val queryCaseWhen = sql("select key from src ")

結果生成以下:

Project [_1#2 AS key#5]
+- LocalTableScan [_1#2, _2#3]

好吧這裏看仍是和以前Optimation階段同樣,不過斷點看就不大同樣了。

因爲咱們的SQL比較簡單,因此只多了兩個SparkPlan,就是WholeStageCodegenExec和InputAdapter,和上面說的是一致的!

OK,通過以上的準備以後,就要開始最後的執行階段了。

SparkPlan執行生成RDD階段

依舊是在QueryExecution裏面,

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
  ......其餘代碼
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
  ......其餘代碼
}

這裏其實是調用了以前生成的SparkPlan的execute()方法,這個方法最終會再調用它的doExecute()方法,而這個方法是各個子類本身實現的,也就是說,不一樣的SparkPlan執行的doExecute()是不同的。

經過上面的階段,咱們獲得了一棵4層的樹,不過其中WholeStageCodegenExec和InputAdapter是爲Codegen優化生成的,這裏就不討論了,忽略這兩個其實結果是同樣的。也就是說這裏只介紹ProjectExec和LocalTableScanExec兩個SparkPlan的doExecute()方法。

先是ProjectExec這個SparkPlan,咱們看看它的doExecute()代碼。

case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
  extends UnaryExecNode with CodegenSupport {
   ......其餘代碼
  protected override def doExecute(): RDD[InternalRow] = {
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val project = UnsafeProjection.create(projectList, child.output,
        subexpressionEliminationEnabled)
      project.initialize(index)
      iter.map(project)
    }
  }
  ......其餘代碼
}

能夠看到它是先遞歸去調用child(也就是LocalTableScanExec)的doExecute()方法,仍是得先去看看LocalTableScanExec生成什麼東西呀。

case class LocalTableScanExec(
    output: Seq[Attribute],
    @transient rows: Seq[InternalRow]) extends LeafExecNode {
  ......其餘代碼
	
  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
  
  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    rdd.map { r =>
      numOutputRows += 1
      r
    }
  }
	
  ......其餘代碼

能夠看到最底層的rdd就是在這裏實現的,LocalTableScanExec一開始就會生成一個lazy的rdd,在須要的時候返回。而在doExecute()方法中的numOutputRows能夠理解爲僅是一個測量值,暫時不用理會。總之這裏咱們就發現LocalTableScanExec的doExecute()其實就是返回一個parallelize生成的rdd。而後再回到ProjectExec去。

它調用child.execute().mapPartitionsWithIndexInternal {......},這裏的mapPartitionsWithIndexInternal和rdd的mapPartitionsWithIndex是相似的,區別只在於mapPartitionsWithIndexInternal只會在內部模塊使用,若是有童鞋不明白mapPartitionsWithIndex這個API,能夠百度查查看。而後重點看mapPartitionsWithIndexInternal的內部邏輯。

child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
  val project = UnsafeProjection.create(projectList, child.output,
    subexpressionEliminationEnabled)
  project.initialize(index)
  iter.map(project)
}

這裏最後一行iter.map(project),其實仍是scala的語法糖,實際大概是這樣iter.map(i => project.apply(i))。就是調用project的apply方法,對每行數據處理。而後經過追蹤,能夠發現project的實例是InterpretedUnsafeProjection,咱們看看它的apply方法。

class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection {
  ......其餘代碼
  override def apply(row: InternalRow): UnsafeRow = {
    // Put the expression results in the intermediate row.
    var i = 0
    while (i < numFields) {
      values(i) = expressions(i).eval(row)
      i += 1
    }

    // Write the intermediate row to an unsafe row.
    rowWriter.reset()
    writer(intermediate)
    rowWriter.getRow()
  }
  
  ......其餘代碼

這裏其實重點在最後三行,就是將結果寫入到result row,再返回回去。當執行完畢的時候,就會獲得最終的RDD[InternalRow],再剩下的,就交給spark core去處理了。

小結

OK,那到這裏基本就把Spark整個流程給講完了,回顧一下整個流程。
catalyst流程

其實能夠發現流程是挺簡單的,不少其餘SQL解析框架(好比calcite)也是相似的流程,只是在設計上在某些方面的取捨會有誤差。然後深刻到代碼的時候容易陷入一些細節中,固然這幾篇也省略了不少細節,不少時候細節纔是真正精髓的地方,之後有若是涉及到的時候再寫文章討論吧(/偷笑)。若是在開放過程當中涉及到SQL解析這方面的開放,應該都會是在優化方面,也就是Optimization階段增長或處理Rule,這塊就須要對代數優化理論和代碼有一些瞭解了。

限於本人水平,介紹spark sql的這幾篇文章不免有疏漏和不足的地方,歡迎在評論區評論,先謝過了~~

以上~

相關文章
相關標籤/搜索