Spark SQL源碼解析(三)Analysis階段分析

Spark SQL原理解析前言:
Spark SQL源碼剖析(一)SQL解析框架Catalyst流程概述html

Spark SQL源碼解析(二)Antlr4解析Sql並生成樹sql

Analysis階段概述

首先,這裏須要引入一個新概念,前面介紹SQL parse階段,會使用antlr4,將一條SQL語句解析成語法樹,而後使用antlr4的訪問者模式遍歷生成語法樹,也就是Logical Plan。但其實,SQL parse這一階段生成的Logical Plan是被稱爲Unresolved Logical Plan。所謂unresolved,就是說SQL語句中的對象都是未解釋的。數據庫

好比說一條語句SELECT col FROM sales,當咱們不知道col的具體類型(Int,String,仍是其餘),甚至是否在sales表中有col這一個列的時候,就稱之爲是Unresolved的。express

而在analysis階段,主要就是解決這個問題,也就是將Unresolved的變成Resolved的。Spark SQL經過使用Catalyst rule和Catalog來跟蹤數據源的table信息。並對Unresolved應用以下的rules(rule能夠理解爲一條一條的規則,當匹配到樹某些節點的時候就會被應用)。apache

  • 從Catalog中,查詢Unresolved Logical Plan中對應的關係(relations)
  • 根據輸入屬性名(好比上述的col列),映射到具體的屬性
  • 肯定哪些屬性引用相同的值並賦予它們惟一的ID(這個是論文中的內容,看不是很明白,不過主要是方便後面優化器實現的)
  • Propagating and coercing types through expressions,這個看着也是有點迷,大概是對數據進行強制轉換,方便後續對1 + col 這樣的數據進行處理。

而處理事後,就會真正生成一棵Resolved Logical Plan,接下來就去看看源碼裏面是怎麼實現的吧。app

Analysis階段詳細解析

經過跟蹤調用代碼,在調用完SQL parse的內容後,就會跑去org.apache.spark.sql.execution.QueryExecution這個類中執行,後面包括Logical Optimization階段,Physical Planning階段,生成RDD任務階段都是在這個類中進行調度的。不過這次只介紹Analysis。框架

在QueryExecution中,會去調用org.apache.spark.sql.catalyst.Analyzer這個類,這個類是繼承自org.apache.spark.sql.catalyst.rules.RuleExecutor,記住這個,後面還有不少個階段都是經過繼承這個類實現的,實現原理也和Analysis階段類似。機器學習

繼承自RuleExecutor的類,包括這裏的Analyzer類,都是在自身實現大量的rule,而後註冊到batch變量中,這裏大概貼點代碼瞅瞅。ide

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {
  
  ......其餘代碼
    lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.ResolveCoalesceHints,
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
      
    ......其餘代碼

先大概說下batches這個變量吧,batches是由Batch的列表構成。而Batch的具體簽名以下:函數

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  ......其餘代碼
  protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
  ......其餘代碼
}

一個Batch由策略Strategy,和一組Rule構成,其中策略Strategy主要是區分迭代次數用的,按個人理解,某些rule能夠迭代屢次,越屢次效果會越好,相似機器學習的學習過程。而策略Strategy會規定迭代一次仍是固定次數。而rule就是具體的應用規則了,這個先略過。

在Analyzer這個類中,你會發現很大篇幅的代碼都是各類各樣rule的實現。而後最終,Analyzer會去調用super.execute()方法,也就是調用父類(RuleExecutor)的方法執行具體邏輯。而父類又會去調用這個batches變量,循環來與Sql Parse階段生成的Unresolved Logical Plan作匹配,匹配到了就執行具體的驗證。仍是貼下代碼看看吧。

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
    //遍歷Analyzer中定義的batchs變量
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
	  //這裏的continue決定是否再次循環,由batch的策略(固定次數或單次),以及該batch對plan的做用效果這二者控制
      while (continue) {
	    //調用foldLeft讓batch中每條rule應用於plan,而後就是執行對應rule規則邏輯了
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime

            if (!result.fastEquals(plan)) {
              queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
              queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
            queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
            queryExecutionMetrics.incNumExecution(rule.ruleName)

            // Run the structural integrity checker against the plan after each rule.
            if (!isPlanIntegral(result)) {
              val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                "the structural integrity of the plan is broken."
              throw new TreeNodeException(result, message, null)
            }

            result
        }
        iteration += 1
		//策略的生效地方
        if (iteration > batch.strategy.maxIterations) {
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
            if (Utils.isTesting) {
              throw new TreeNodeException(curPlan, message, null)
            } else {
              logWarning(message)
            }
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logDebug(
          s"""
            |=== Result of Batch ${batch.name} ===
            |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
          """.stripMargin)
      } else {
        logTrace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }

}

其實這個類的邏輯不難懂,就是遍歷batchs變量,而每一個batch又會去使用scala的foldLeft函數,遍歷應用裏面的每條rule。而後根據Batch的策略以及將新生成的Plan與舊的Plan比較,決定是否要再次遍歷。而後最後將新生成的Plan輸出。

若是不清楚scala的foldLeft函數內容,能夠百度下看看,不難懂的。而後跟RuleExecutor有關的基本都是這個套路,區別只在於rule的不一樣。

接下來咱們來看看具體是若是應用一條rule,將Unresolved LogicalPlan轉換成Resolved LogicalPlan吧。

Rule介紹

前面說到,在Analyzer中重寫了Batchs變量,Batchs包含多個Batch,每一個Batch又有多個Rule,因此不可能所有看過來,慶幸的是,要了解Unresolved LogicalPlan轉換成Resolved LogicalPlan,只須要看一個Rule就行,那就是ResolveRelations這個Rule,咱們就只介紹這個Rule來管中窺豹。

各自Rule基本都是object類型,也就是靜態的,且繼承自Rule這個抽象類,Rule很簡單,就一個ruleName變量喝一個apply方法用以實現邏輯,而後就沒了。因此重點仍是在繼承後的實現邏輯。

前面提到,從Unresolved到Resolved的過程,能夠理解爲就是將SQL語句中的類型和字段,映射到實體表中的字段信息。而存儲實體表元數據信息的,是Catalog,到具體的類,是org.apache.spark.sql.catalyst.catalog.SessionCatalog。

咱們來看看具體的邏輯代碼:

object ResolveRelations extends Rule[LogicalPlan] {
    ......其餘代碼
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }
	......其餘代碼
}

邏輯其實也蠻簡單的,就是匹配UnresolvedRelation(就是Unresolved的節點),而後遞歸去Catlog中獲取對應的元數據信息,遞歸將它及子節點變成Resoulved。不過還有個要提的是,SQL中對應的,有多是文件數據,或是數據庫中的表,抑或是視圖(view),針對文件數據是不會轉換的,轉換成Resolved會在後面進行。而表和視圖則會當即轉換。

最後,接上一篇的例子,接着來看看,通過Analysis階段後,LogicalPlan變成什麼樣吧,上一篇SQL parse使用的示例代碼:

//生成DataFrame
    val df = Seq((1, 1)).toDF("key", "value")
    df.createOrReplaceTempView("src")
    //調用spark.sql
    val queryCaseWhen = sql("select key from src ")

通過上次介紹的SQL parse後是變成這樣:

'Project ['key]
+- 'UnresolvedRelation `src`

這裏的涵義上篇已介紹,再也不贅述,而通過本次的Analysis後,則會變成這樣

Project [key#5]
+- SubqueryAlias `src`
   +- Project [_1#2 AS key#5, _2#3 AS value#6]
      +- LocalRelation [_1#2, _2#3]

能夠發現,主要就是對UnresolvedRelation進行展開,如今咱們能夠發現src有兩個字段,分別是key和value及其對應的別名(1#2,2#3)。這裏還有一個SubqueryAlias,這個我也不是很明白,按源碼裏面的說法,這裏的subquery僅用來提供屬性的做用域信息,Analysis階段事後就就能夠將其刪除,因此在Optimization階段後會發現SubqueryAlias消失了。

小結

OK,那今天就先介紹到這裏吧,主要綜述了Analysis的內容,而後介紹RuleExecution的邏輯,最後簡單看了個Rule的具體內容以及承接SQL parse階段的例子。有興趣的童鞋能夠本身去順着思路翻源碼看看。

以上~

相關文章
相關標籤/搜索