建立DataFrame,有三種模式,一種是sql()主要是訪問Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開始建立;還有一種是read/format格式,從json/txt/csv等數據源格式建立。sql
先看看第三種方式的建立流程。json
一、read/formatsession
def read: DataFrameReader = new DataFrameReader(self)app
SparkSession.read()方法直接建立DataFrameReader,而後再DataFrameReader的load()方法來導入外部數據源。load()方法主要邏輯以下:
ide
def load(paths: String*): DataFrame = { sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) }
建立對應數據源類型的DataSource,DataSource解析成BaseRelation,而後經過SparkSession的baseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation建立LogicalRelation,而後調用Dataset.ofRows方法從LogicalRelation建立DataFrame。DataFrame實際就是Dataset。oop
type DataFrame = Dataset[Row]ui
baseRelationToDataFrame的定義:this
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self, LogicalRelation(baseRelation)) }
Dataset.ofRows方法主要是將邏輯計劃轉換成物理計劃,而後生成新的Dataset。spa
二、執行code
SparkSession的執行關鍵是如何從LogicalPlan生成物理計劃。咱們試試跟蹤這部分邏輯。
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
Dataset的count()動做觸發物理計劃的執行,調用物理計劃plan的executeCollect方法,該方法實際上會調用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。
三、HadoopFsRelation
須要跟蹤下如何從HadoopFsRelation生成物理計劃(也就是SparkPlan)
經過FileSourceStrategy來解析。它在FileSourceScanExec上疊加Filter和Projection等操做,看看FileSourceScanExec的定義:
case class FileSourceScanExec( @transient relation: HadoopFsRelation, output: Seq[Attribute], requiredSchema: StructType, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { 。。。 }
它的主要執行代碼doExecute()的功能邏輯以下:
protected override def doExecute(): RDD[InternalRow] = { if (supportsBatch) { // in the case of fallback, this batched scan should never fail because of: // 1) only primitive types are supported // 2) the number of columns should be smaller than spark.sql.codegen.maxFields WholeStageCodegenExec(this).execute() } else { val unsafeRows = { val scan = inputRDD if (needsUnsafeRowConversion) { scan.mapPartitionsWithIndexInternal { (index, iter) => val proj = UnsafeProjection.create(schema) proj.initialize(index) iter.map(proj) } } else { scan } } val numOutputRows = longMetric("numOutputRows") unsafeRows.map { r => numOutputRows += 1 r } } }
inputRDD有兩種方式建立,一是createBucketedReadRDD,二是createNonBucketedReadRDD。二者沒有本質的區別,僅僅是文件分區規則的不一樣。
private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } } createNonBucketedReadRDD調用FileScanRDD : new FileScanRDD(fsRelation.sparkSession, readFile, partitions)