結構化處理入門-SparkSession

建立DataFrame,有三種模式,一種是sql()主要是訪問Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開始建立;還有一種是read/format格式,從json/txt/csv等數據源格式建立。sql

先看看第三種方式的建立流程。json

一、read/formatsession

def read: DataFrameReader = new DataFrameReader(self)app

SparkSession.read()方法直接建立DataFrameReader,而後再DataFrameReader的load()方法來導入外部數據源。load()方法主要邏輯以下:
 ide

def load(paths: String*): DataFrame = {

    sparkSession.baseRelationToDataFrame(

      DataSource.apply(

        sparkSession,

        paths = paths,

        userSpecifiedSchema = userSpecifiedSchema,

        className = source,

        options = extraOptions.toMap).resolveRelation())

  }

建立對應數據源類型的DataSourceDataSource解析成BaseRelation,而後經過SparkSessionbaseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation建立LogicalRelation,而後調用Dataset.ofRows方法從LogicalRelation建立DataFrameDataFrame實際就是Datasetoop

type DataFrame = Dataset[Row]ui

baseRelationToDataFrame的定義:this

def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {

    Dataset.ofRows(self, LogicalRelation(baseRelation))

  }

Dataset.ofRows方法主要是將邏輯計劃轉換成物理計劃,而後生成新的Datasetspa

二、執行code

SparkSession的執行關鍵是如何從LogicalPlan生成物理計劃。咱們試試跟蹤這部分邏輯。

def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>

    plan.executeCollect().head.getLong(0)

  }

Datasetcount()動做觸發物理計劃的執行,調用物理計劃planexecuteCollect方法,該方法實際上會調用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。

三、HadoopFsRelation

須要跟蹤下如何從HadoopFsRelation生成物理計劃(也就是SparkPlan)

經過FileSourceStrategy來解析。它在FileSourceScanExec上疊加Filter和Projection等操做,看看FileSourceScanExec的定義:

case class FileSourceScanExec(

    @transient relation: HadoopFsRelation,

    output: Seq[Attribute],

    requiredSchema: StructType,

    partitionFilters: Seq[Expression],

    dataFilters: Seq[Expression],

    override val metastoreTableIdentifier: Option[TableIdentifier])

  extends DataSourceScanExec with ColumnarBatchScan  {

。。。

}

它的主要執行代碼doExecute()的功能邏輯以下

protected override def doExecute(): RDD[InternalRow] = {

    if (supportsBatch) {

      // in the case of fallback, this batched scan should never fail because of:

      // 1) only primitive types are supported

      // 2) the number of columns should be smaller than spark.sql.codegen.maxFields

      WholeStageCodegenExec(this).execute()

    } else {

      val unsafeRows = {

        val scan = inputRDD

        if (needsUnsafeRowConversion) {

          scan.mapPartitionsWithIndexInternal { (index, iter) =>

            val proj = UnsafeProjection.create(schema)

            proj.initialize(index)

            iter.map(proj)

          }

        } else {

          scan

        }

      }

      val numOutputRows = longMetric("numOutputRows")

      unsafeRows.map { r =>

        numOutputRows += 1

        r

      }

    }

  }

inputRDD有兩種方式建立,一是createBucketedReadRDD,二是createNonBucketedReadRDD。二者沒有本質的區別,僅僅是文件分區規則的不一樣。

private lazy val inputRDD: RDD[InternalRow] = {

    val readFile: (PartitionedFile) => Iterator[InternalRow] =

      relation.fileFormat.buildReaderWithPartitionValues(

        sparkSession = relation.sparkSession,

        dataSchema = relation.dataSchema,

        partitionSchema = relation.partitionSchema,

        requiredSchema = requiredSchema,

        filters = pushedDownFilters,

        options = relation.options,

        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))



    relation.bucketSpec match {

      case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>

        createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)

      case _ =>

        createNonBucketedReadRDD(readFile, selectedPartitions, relation)

    }

  }

createNonBucketedReadRDD調用FileScanRDD :

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
相關文章
相關標籤/搜索