Spark SQL

Dataset:生成DataFrame的一系列wrap接口,統一的調度接口。有從DataSource生成DataFrame,有從RDD生成DataFrame。node

SparkSession:經過Dataset來建立DataFrame,觸發DataFrame的執行等。sql

 

SparkPlan:物理執行計劃express

QueryExecution:執行查詢,生成SparkPlanapache

SqlExecution:管理QueryExecution列表,調度執行等。json

 

ExistingRDD:RDD格式的DataFrame。數組

DataSource:外部數據存儲格式,如cvs、jdbc、txt、parquest等存儲格式。在SparkSession中format()函數會引用到ExistingRDD或者DataSource。session

    1. SparkSession

建立DataFrame,有三種模式,一種是sql()主要是訪問Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開始建立;還有一種是read/format格式,從json/txt/csv等數據源格式建立。app

先看看第三種方式的建立流程。ide

      1. read/format

def read: DataFrameReader = new DataFrameReader(self)函數

SparkSession.read()方法直接建立DataFrameReader,而後再DataFrameReader的load()方法來導入外部數據源。load()方法主要邏輯以下:

/**

   * Loads input in as a `DataFrame`, for data sources that support multiple paths.

   * Only works if the source is a HadoopFsRelationProvider.

   *

   * @since 1.6.0

   */

  @scala.annotation.varargs

  def load(paths: String*): DataFrame = {

    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {

      throw new AnalysisException("Hive data source can only be used with tables, you can not " +

        "read files of Hive data source directly.")

    }

 

    sparkSession.baseRelationToDataFrame(

      DataSource.apply(

        sparkSession,

        paths = paths,

        userSpecifiedSchema = userSpecifiedSchema,

        className = source,

        options = extraOptions.toMap).resolveRelation())

  }

建立對應數據源類型的DataSourceDataSource解析成BaseRelation,而後經過SparkSessionbaseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation建立LogicalRelation,而後調用Dataset.ofRows方法從LogicalRelation建立DataFrameDataFrame實際就是Dataset

type DataFrame = Dataset[Row]

baseRelationToDataFrame的定義:

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {

    Dataset.ofRows(self, LogicalRelation(baseRelation))

  }

Dataset.ofRows方法主要是將邏輯計劃轉換成物理計劃,而後生成新的Dataset

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {

    val qe = sparkSession.sessionState.executePlan(logicalPlan)

    qe.assertAnalyzed()

    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))

  }

      1. 執行

SparkSession的執行關鍵是如何從LogicalPlan生成物理計劃。咱們試試跟蹤這部分邏輯。

def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>

    plan.executeCollect().head.getLong(0)

  }

Datasetcount()動做觸發物理計劃的執行,調用物理計劃planexecuteCollect方法,該方法實際上會調用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。

def executeCollect(): Array[InternalRow] = {

    val byteArrayRdd = getByteArrayRdd()

 

    val results = ArrayBuffer[InternalRow]()

    byteArrayRdd.collect().foreach { bytes =>

      decodeUnsafeRows(bytes).foreach(results.+=)

    }

    results.toArray

  }

      1. HadoopFsRelation

須要跟蹤下如何從HadoopFsRelation生成物理計劃(也就是SparkPlan)

經過FileSourceStrategy來解析。它在FileSourceScanExec上疊加Filter和Projection等操做,看看FileSourceScanExec的定義:

/**

 * Physical plan node for scanning data from HadoopFsRelations.

 *

 * @param relation The file-based relation to scan.

 * @param output Output attributes of the scan, including data attributes and partition attributes.

 * @param requiredSchema Required schema of the underlying relation, excluding partition columns.

 * @param partitionFilters Predicates to use for partition pruning.

 * @param dataFilters Filters on non-partition columns.

 * @param metastoreTableIdentifier identifier for the table in the metastore.

 */

case class FileSourceScanExec(

    @transient relation: HadoopFsRelation,

    output: Seq[Attribute],

    requiredSchema: StructType,

    partitionFilters: Seq[Expression],

    dataFilters: Seq[Expression],

    override val metastoreTableIdentifier: Option[TableIdentifier])

  extends DataSourceScanExec with ColumnarBatchScan  {

。。。

}

它的主要執行代碼doExecute()的功能邏輯以下

protected override def doExecute(): RDD[InternalRow] = {

    if (supportsBatch) {

      // in the case of fallback, this batched scan should never fail because of:

      // 1) only primitive types are supported

      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields

      WholeStageCodegenExec(this).execute()

    } else {

      val unsafeRows = {

        val scan = inputRDD

        if (needsUnsafeRowConversion) {

          scan.mapPartitionsWithIndexInternal { (index, iter) =>

            val proj = UnsafeProjection.create(schema)

            proj.initialize(index)

            iter.map(proj)

          }

        } else {

          scan

        }

      }

      val numOutputRows = longMetric("numOutputRows")

      unsafeRows.map { r =>

        numOutputRows += 1

        r

      }

    }

  }

inputRDD有兩種方式建立,一是createBucketedReadRDD,二是createNonBucketedReadRDD。二者沒有本質的區別,僅僅是文件分區規則的不一樣。

private lazy val inputRDD: RDD[InternalRow] = {

    val readFile: (PartitionedFile) => Iterator[InternalRow] =

      relation.fileFormat.buildReaderWithPartitionValues(

        sparkSession = relation.sparkSession,

        dataSchema = relation.dataSchema,

        partitionSchema = relation.partitionSchema,

        requiredSchema = requiredSchema,

        filters = pushedDownFilters,

        options = relation.options,

        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

 

    relation.bucketSpec match {

      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>

        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)

      case _ =>

        createNonBucketedReadRDD(readFile, selectedPartitions, relation)

    }

  }

createNonBucketedReadRDD調用FileScanRDD :

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

 

    1. SparkPlan

SparkPlan是物理執行計劃的基類。sql語句經過分析和優化造成logicPlan,logicPlan根據數據源的類型生成具體的sparkPlan。

def sql(sqlText: String): DataFrame = {

    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))

  }

經過SqlParser類的parsePlan方法將sql語句解析成logicPlan,最後經過Dataset.ofRows方法將logicPlan生成Dataframe

這裏存在兩種sessionState,一種是Hive的,另外一種是in-memory類型的。這裏咱們看看in-memory類型的。文件是spark.sql.internal.SessionState.scala。

SessionState中主要的方法是:

def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)

從邏輯計劃生成QueryExecution,而SparkPlan是QueryPlan的子類。QueryExecution和QueryPlan的關係應該是QueryPlan是物理計劃,而QueryExecution是物理計劃的實際執行。要注意的一點是SessionState中的createQueryExecution方法是SessionState的構造函數入參傳進去的。默認實現代碼以下:

protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>

    new QueryExecution(session, plan)

  }

直接從logicPlan構造QueryExecution對象。

方法

說明

doExecute

構造出RDD[InternalRow],在sparkPlan的子類中具體實現。

總的來講經過各類datasource或hive來生成迭代器,建立出對應的RDD。

executeCollect ()

得到Array[InternalRow]。實際啓動job

executeToIterator

def executeToIterator(): Iterator[InternalRow] = {

    getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows)

  }

實際啓動job執行了。

 

 

 

 

 

 

 

 

 

 

      1. QueryExecution

LogicPlan生成QueryExecution,表明了邏輯計劃的實際執行。

方法

說明

analyzed

分析logicPlan,生成新的分析過的logicPlan

assertAnalyzed()

調用analyzed,檢查是否分析(sparkSession.sessionState.analyzer.checkAnalysis(analyzed))

optimizedPlan

優化logicPlan,返回優化後的logicPlan

sparkPlan

從optimizedPlan規劃出最優的sparkPlan

executedPlan

從sparkPlan生成等待執行的sparkPlan,調用prepareForExecution方法。入參和返回類型相同。

toRdd

生成RDD[InternalRow],實現代碼是直接調用executedPlan.execute()方法。

 

 

 

 

      1. 總結

SparkSession.sql()方法的流程大體以下:

首先從SessionState對sql語句分析生成LogicPlan;

而後用QueryExecution對logicPlan進行各類預處理、包括分析優化等,生成SprkPlan;

最後從優化後的sparkPlan調用execute方法生成RDD[InternalRow]對象,用dataset.ofRows方法從RDD生成Dataframe。

    1. LogicPlan

LogicPlan是QueryPlan的子類。

 

    1. LocalRelation

本地Relation,也算是本地LogicPlan的一種。表示內存本地的dataset數據。

 

    1. LogicalRelation

邏輯關係,入參是BaseRelation。讀cvs,json,txt等文件是經過logicalRelation來構造邏輯計劃的,此時cvs,json,txt等都是DataSource,構造BaseRelation。

LogicalRelation繼承leafNode,而leafNode繼承LogicPlan。其餘沒有多少額外的邏輯。

注意的是unaryNode也是繼承LogicPlan

 

 

 

    1. basicLogicalOperators

基本的邏輯計劃處理函數,在解析logicPlan時須要調用到裏面一系列邏輯去解析。這裏的類都是講邏輯計劃生成新的logicPlan。包括UnaryNodeLeafNode等類型(他們都是LogicPlan的子類)。

方法

說明

Generate

生成新的列,是否在原來列的基礎上加上新的列。

Filter

過濾。主要邏輯:

override protected def validConstraints: Set[Expression] = {

    val predicates = splitConjunctivePredicates(condition)

      .filterNot(SubqueryExpression.hasCorrelatedSubquery)

    child.constraints.union(predicates.toSet)

  }

將條件變成predicates,添加到childconstraints裏。

Subquery

子查詢,好像沒有更多操做。簡單的設置output爲child的output

Project

主要處理別名,將別名加到childconstraints裏。

Intersect

鏈接操做,將left\right兩個logicPlan鏈接,constraints也鏈接

Except

排除right的logicPlan,constraints設置爲leftconstraints

Union

鏈接多個logicPlan

Sort

排序。設置了Seq[SortOrder]。

Range

id字段範圍?

Aggregate

聚合函數類。定義:

case class Aggregate(

    groupingExpressions: Seq[Expression],

    aggregateExpressions: Seq[NamedExpression],

    child: LogicalPlan)

  extends UnaryNode

 

 

 

 

 

    1. analyzer

sql分析器。在BaseSessionStateBuilder中初始化。調用catalyst的analyzer。

protected def analyzer: Analyzer = new Analyzer(catalog, conf) {

    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =

      new FindDataSourceTable(session) +:

        new ResolveSQLOnFile(session) +:

        customResolutionRules

 

    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =

      PreprocessTableCreation(session) +:

        PreprocessTableInsertion(conf) +:

        DataSourceAnalysis(conf) +:

        customPostHocResolutionRules

 

    override val extendedCheckRules: Seq[LogicalPlan => Unit] =

      PreWriteCheck +:

        HiveOnlyCheck +:

        customCheckRules

  }

分析器定義一系列的Rule,每一個RulelogicPlan處理替換生成新的logicPlan。用execution/datasources/rules.scala文件中定義的各類Rule示例。

      1. ResolveSQLOnFile

建立表或者添加數據到表時,檢查表結構和sql之間是否匹配,是否存在sql錯誤(如表分區不存在)等。

截圖部分示例代碼簡單說明其機制:

// When we append data to an existing table, check if the given provider, partition columns,

    // bucket spec, etc. match the existing table, and adjust the columns order of the given query

    // if necessary.

    case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))

        if query.resolved && catalog.tableExists(tableDesc.identifier) =>

      // This is guaranteed by the parser and `DataFrameWriter`

      assert(tableDesc.provider.isDefined)

 

      val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)

      val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))

      val tableName = tableIdentWithDB.unquotedString

      val existingTable = catalog.getTableMetadata(tableIdentWithDB)

 

      if (existingTable.tableType == CatalogTableType.VIEW) {

        throw new AnalysisException("Saving data into a view is not allowed.")

      }

 

      // Check if the specified data source match the data source of the existing table.

      val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)

      val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)

      // TODO: Check that options from the resolved relation match the relation that we are

      // inserting into (i.e. using the same compression).

      if (existingProvider != specifiedProvider) {

        throw new AnalysisException(s"The format of the existing table $tableName is " +

          s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +

          s"`${specifiedProvider.getSimpleName}`.")

      }

 

      if (query.schema.length != existingTable.schema.length) {

        throw new AnalysisException(

          s"The column number of the existing table $tableName" +

            s"(${existingTable.schema.catalogString}) doesn't match the data schema" +

            s"(${query.schema.catalogString})")

      }

 

      val resolver = sparkSession.sessionState.conf.resolver

      val tableCols = existingTable.schema.map(_.name)

 

      // As we are inserting into an existing table, we should respect the existing schema, preserve

      // the case and adjust the column order of the given DataFrame according to it, or throw

      // an exception if the column names do not match.

      val adjustedColumns = tableCols.map { col =>

        query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {

          val inputColumns = query.schema.map(_.name).mkString(", ")

          throw new AnalysisException(

            s"cannot resolve '$col' given input columns: [$inputColumns]")

        }

      }

若是sql錯誤,這個階段就會拋出異常,不往下執行了。

      1. PreprocessTableInsertion

分析insert into語法,判斷列名稱是否存在,列數組和內容數組是否數量匹配等,錯誤時拋出異常。

 

    1. optimizer

sql優化器。在BaseSessionStateBuilder中初始化。調用catalyst的optimizer。

protected def optimizer: Optimizer = {

    new SparkOptimizer(catalog, conf, experimentalMethods) {

      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =

        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules

    }

  }

      1. 分區優化

若是一個sql語句中的DISTINCT類操做,如(distinct,max,min)等做用的col是分區的col。那麼對該sql優化,將查詢限制在該分區進行。

用到spark-catalyst相關類。

如select col1, max(col2) from tbl1 group by col1。會優化成select col1.partion,max(col2) from tbl1[col1 partion]這樣的形式,這裏只是示意。

 

 

    1. planner

物理計劃規劃器,從logicPlan中生成多個可供選擇的sparkPlan。SparkPlanner繼承SparkStrategies,SparkStrategies繼承QueryPlanner。

從下面方法建立:

protected def planner: SparkPlanner = {

    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {

      override def extraPlanningStrategies: Seq[Strategy] =

        super.extraPlanningStrategies ++ customPlanningStrategies

    }

  }

plan方法:

      1. SparkStrategies

組合條件生成SparkPlan的策略,生成哪一個子類的SparkPlan。

根據logicPlan的類型不一樣,生成不一樣的UnaryExecNode,它的子類有:

名稱

說明

SortExec

重寫doExecute(): RDD[InternalRow]方法。用sorter迭代器(UnsafeExternalRowSorter)。而UnsafeExternalRowSorter使用了UnsafeExternalSorter類(這是Spark Core中的一個類,用於對迭代器排序:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;)

FilterExec

執行過濾。

 

 

 

 

 

 

 

UnsafeExternalRowSorter

排序迭代器實現。重寫doExecute()函數:

protected override def doExecute(): RDD[InternalRow] = {

    val numOutputRows = longMetric("numOutputRows")

    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>

      val predicate = newPredicate(condition, child.output)

      predicate.initialize(0)

      iter.filter { row =>

        val r = predicate.eval(row)

        if (r) numOutputRows += 1

        r

      }

    }

  }

 

FilterExec

過濾操做的物理執行。

 

    1. basicPyhsicalOperators

各類類型物理執行計劃。有ProjectExec、SortExec、FilterExec等等。都重寫了SparkPlan的doExecute方法。

名稱

說明

ProjectExec

輸出列

FilterExec

過濾物理計劃

SampleExec

取樣

RangeExec

範圍

UnionExec

聯合查詢

CoalesceExec

級聯返回非空值?

SubqueryExec

主要功能是執行child計劃

 

      1. ProjectExec

重寫方法:

protected override def doExecute(): RDD[InternalRow] = {

    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>

      val project = UnsafeProjection.create(projectList, child.output,

        subexpressionEliminationEnabled)

      project.initialize(index)

      iter.map(project)

    }

  }

調用UnsafeProjection對InternalRow進行映射轉化。

如:select id, func(name) from table1 where id>1;

其中id,func(name)的解析就是Projection。id和func(name)能夠看出Expression。對每一個Expression調用expression.eval(internalRow)的過程就是Projection,根據本條數據解析表達式,輸出最終結果。

 

 

      1. FilterExec

入參:Expressionval child sparkPlan

protected override def doExecute(): RDD[InternalRow] = {

    val numOutputRows = longMetric("numOutputRows")

    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>

      val predicate = newPredicate(condition, child.output)

      predicate.initialize(0)

      iter.filter { row =>

        val r = predicate.eval(row)

        if (r) numOutputRows += 1

        r

      }

    }

  }

newPredicateInternalRow進行過濾,若是知足條件則輸出行數累加器加1.

      1. SampleExec

對數據取樣,取出分區數據的一部分數據。

      1. SubqueryExec

子查詢,執行子查詢的execute方法

      1. RangeExec

Range查詢

      1. UnionExec

聯合查詢,合併多個查詢。

protected override def doExecute(): RDD[InternalRow] =

    sparkContext.union(children.map(_.execute()))

 

      1. CoalesceExec

protected override def doExecute(): RDD[InternalRow] = {

    child.execute().coalesce(numPartitions, shuffle = false)

  }

相關文章
相關標籤/搜索