Spark SQL源碼解析(四)Optimization和Physical Planning階段解析

Spark SQL原理解析前言:html

Spark SQL源碼剖析(一)SQL解析框架Catalyst流程概述git

Spark SQL源碼解析(二)Antlr4解析Sql並生成樹sql

Spark SQL源碼解析(三)Analysis階段分析數據庫

前面已經介紹了SQL parse,將一條SQL語句使用antlr4解析成語法樹並使用訪問者模式生成Unresolved LogicalPlan,而後是Analysis階段將Unresolved LogicalPlan轉換成Resolved LogicalPlan。這一篇咱們介紹Optimization階段,和生成Physical Planning階段。express

通過這兩個階段後,就差很少要到最後轉換成Spark的RDD任務了。apache

Spark SQL Optimization階段概述

先來看看Logical Optimization階段。app

上一篇咱們討論了Analysis階段如何生成一個真正的Logical Plan樹。這一階段聽名字就知道是優化階段,Spark SQL中有兩個部分的優化,第一部分就是這裏,是rule-base階段的優化,就是根據各類關係代數的優化規則,對生成的Logical Plan適配,匹配到就進行相應的優化邏輯。這些規則大概有:投影消除,constant folding,替換null值,布爾表達式簡化等等。固然大部分規則細節我也不是很清楚,僅僅能從名字推斷一二。這框架

同時還能夠添加本身的優化rule,也比較容易實現,論文中就給出了一段自定義優化rule的代碼:ide

object DecimalAggregates extends Rule[LogicalPlan] {
  /** Maximum number of decimal digits in a Long */
  val MAX_LONG_DIGITS = 18
  def apply(plan: LogicalPlan): LogicalPlan = {
    plan transformAllExpressions {
      case Sum(e @ DecimalType.Expression(prec , scale))
        if prec + 10 <= MAX_LONG_DIGITS =>
          MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale)
  }
}

這段代碼的大意是自定義了一個rule,若是匹配到SUM的表達式,那就執行相應的邏輯,論文裏描述這裏是找到對應的小數並將其轉換爲未縮放的64位LONG。具體邏輯看不是很明白不過不重要,重要的是編寫本身的優化rule很方便就是。源碼分析

順便點一下另外一種優化,名字叫作cost-base優化(CBO),是發生在Physical Planning階段的,這裏就先賣個關子,後面說到的時候再討論吧。

而後看到源碼的時候,會發現Optimizer這個類也是繼承自RuleExecutor,繼承這個類以後的流程基本都是同樣的。前面分析Analysis階段的時候已經有詳細介紹過這個流程,這裏就不展開說了。

其實這優化器的重點應該是各類優化規則,這裏我以爲更多的是設計到關係代數表達式優化理論方面的知識,這部分我也不甚精通,因此也就不說了。對這塊感興趣的童鞋能夠看看網上別人的文章,這裏順便列幾個可能有幫助的博客,

下面仍是來看看最開始的例子進行Optimization階段後會變成什麼樣吧,先看看以前的示例代碼:

val df = Seq((1, 1)).toDF("key", "value")
    df.createOrReplaceTempView("src")
    val queryCaseWhen = sql("select key from src ")

而後在Optimization優化階段後,變成了:

Project [_1#2 AS key#5]
+- LocalRelation [_1#2, _2#3]

好吧,看起來沒什麼變化,與Analysis階段相比,也就少了個SubqueryAlias ,符合預期。不過也對,就一條SELECT語句能優化到哪去啊。

Physical Planning生成階段概述

相比較於Logical Plan,Physical plan算是Spark能夠去執行的東西了,固然本質上它也是一棵樹。

前面說到,Spark有一種cost-based的優化。主要就在這一階段,在這一階段,會生成一個或多個Physical Plan,而後使用cost model預估各個Physical Plan的處理性能,最後選擇一個最優的Physical Plan。這裏最主要優化的是join操做,當觸發join操做的時候,會根據左右兩邊的數據集判斷,而後決定使用Broadcast join,仍是傳統的Hash join,抑或是MergeSort join,有關這幾種join的區別這裏就不詳細解釋了,有興趣童鞋能夠百度看看。

除了cost-based優化,這一階段也依舊會有rule-based優化,因此說RuleExecutor這個類是很重要的,前面提到的Analysis階段也好,Optimization階段也好,包括這裏的Physical Plan階段,只要是涉及到rule-based優化,都會跟RuleExecutor這個類扯上關係。固然這樣無疑是極大使用了面向對象的特性,不一樣的階段編寫不一樣的rule就行,一次編寫,處處複用。

Physical Planning源碼分析

首先是在QueryExecution中調度,

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
  ......其餘代碼
  lazy val sparkPlan: SparkPlan = {
    SparkSession.setActiveSession(sparkSession)
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(optimizedPlan)).next()
  }
  ......其餘代碼
}

這裏的planner是org.apache.spark.sql.execution.SparkPlanner這個類,而這個類繼承自org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父類QueryPlanner中實現的。和RuleExecution相似,QueryPlanner中有一個返回Seq[GenericStrategy[PhysicalPlan]]的方法:def strategies: Seq[GenericStrategy[PhysicalPlan]],這個方法會在子類(也就是SparkPlanner)重寫,而後被QueryPlanner的plan()方法調用。

咱們來看看SparkPlanner中strategies方法的重寫,再來看QueryPlanner的plan()方法吧。

class SparkPlanner(
    val sparkContext: SparkContext,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods)
  extends SparkStrategies {
  ......其餘代碼
  override def strategies: Seq[Strategy] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      PythonEvals ::
      DataSourceV2Strategy ::
      FileSourceStrategy ::
      DataSourceStrategy(conf) ::
      SpecialLimits ::
      Aggregation ::
      Window ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)
	......其餘代碼

strategies()返回策略列表,是生成策略GenericStrategy,這是個具體的抽象類,位於org.apache.spark.sql.catalyst.planning包。所謂生成策略,就是決定若是根據Logical Plan生成Physical Plan的策略。好比上面介紹的join操做能夠生成Broadcast join,Hash join,抑或是MergeSort join,就是一種生成策略,具體的類就是上面代碼中的JoinSelection。每一個生成策略GenericStrategy都是object,其apply()方法返回的是Seq[SparkPlan],這裏的SparkPlan就是PhysicalPlan(注意:下文會將SparkPlan和PhysicalPlan混着用)。

明白了生成策略後,就能夠來看看QueryPlanner的plan()方法了。

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
  ......其餘代碼
  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))	//迭代調用並平鋪,變成Iterator[SparkPlan]

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)	

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p.eq(placeholder) => childPlan
                }
              }
            }
        }
      }
    }

    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }
  
  ......其餘代碼
}

這裏的流程其實不難,主要工做其實就是調用各個生成策略GenericStrategy的apply()方法,生成Iterator[SparkPlan]。後面很大部分代碼是處理佔位符,按個人理解,在生成Logical Plan的時候,可能有些無心義的佔位符,這種須要使用子節點替換調它。倒數第三行prunePlans()方法按註釋說是用來去掉bad plan的,但看實際代碼只是原封不動返回。

這樣最終就獲得一個Iterator[SparkPlan],每一個SparkPlan就是可執行的物理操做了。

大體流程就是如此,固然具體到一些生成策略沒有細說,包括輸入源策略,聚合策略等等,每個都蠻複雜的,這裏就不細說,有興趣能夠自行查閱。

對了,最後還要看看示例代碼到這一步變成什麼樣了,先上示例代碼:

//生成DataFrame
    val df = Seq((1, 1)).toDF("key", "value")
    df.createOrReplaceTempView("src")
    //調用spark.sql
    val queryCaseWhen = sql("select key from src ")

通過Physical Planning階段後,變成以下:

Project [_1#2 AS key#5]
+- LocalTableScan [_1#2, _2#3]

對比上面的optimized階段,直觀看就是LocalRelation變成LocalTableScan。變得更加具體了,但實際上,Project也變了,雖然打印名字相同,但一個的類型是Project,本質上是LogicalPlan。而一個是ProjectExec,本質上是SparkPlan(也就是PhysicalPlan)。這一點經過斷點看的更清楚。

到這一步已經很解決終點了,後面再通過一個Preparations階段就能生成RDD了,剩下的部分留待下篇介紹吧。

以上~

相關文章
相關標籤/搜索