Dataset:生成DataFrame的一系列wrap接口,統一的調度接口。有從DataSource生成DataFrame,有從RDD生成DataFrame。node
SparkSession:經過Dataset來建立DataFrame,觸發DataFrame的執行等。sql
SparkPlan:物理執行計劃express
QueryExecution:執行查詢,生成SparkPlanapache
SqlExecution:管理QueryExecution列表,調度執行等。json
ExistingRDD:RDD格式的DataFrame。數組
DataSource:外部數據存儲格式,如cvs、jdbc、txt、parquest等存儲格式。在SparkSession中format()函數會引用到ExistingRDD或者DataSource。session
建立DataFrame,有三種模式,一種是sql()主要是訪問Hive表;一種是從RDD生成DataFrame,主要從ExistingRDD開始建立;還有一種是read/format格式,從json/txt/csv等數據源格式建立。app
先看看第三種方式的建立流程。ide
def read: DataFrameReader = new DataFrameReader(self)函數
SparkSession.read()方法直接建立DataFrameReader,而後再DataFrameReader的load()方法來導入外部數據源。load()方法主要邏輯以下:
/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
*
* @since 1.6.0
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
建立對應數據源類型的DataSource,DataSource解析成BaseRelation,而後經過SparkSession的baseRelationToDataFrame方法從BaseRelation映射生成DataFrame。從BaseRelation建立LogicalRelation,而後調用Dataset.ofRows方法從LogicalRelation建立DataFrame。DataFrame實際就是Dataset。
type DataFrame = Dataset[Row]
baseRelationToDataFrame的定義:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
Dataset.ofRows方法主要是將邏輯計劃轉換成物理計劃,而後生成新的Dataset。
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
SparkSession的執行關鍵是如何從LogicalPlan生成物理計劃。咱們試試跟蹤這部分邏輯。
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}
Dataset的count()動做觸發物理計劃的執行,調用物理計劃plan的executeCollect方法,該方法實際上會調用doExecute()方法生成Array[InternalRow]格式。executeCollect方法在SparkPlan中定義。
def executeCollect(): Array[InternalRow] = {
val byteArrayRdd = getByteArrayRdd()
val results = ArrayBuffer[InternalRow]()
byteArrayRdd.collect().foreach { bytes =>
decodeUnsafeRows(bytes).foreach(results.+=)
}
results.toArray
}
須要跟蹤下如何從HadoopFsRelation生成物理計劃(也就是SparkPlan)
經過FileSourceStrategy來解析。它在FileSourceScanExec上疊加Filter和Projection等操做,看看FileSourceScanExec的定義:
/**
* Physical plan node for scanning data from HadoopFsRelations.
*
* @param relation The file-based relation to scan.
* @param output Output attributes of the scan, including data attributes and partition attributes.
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
@transient relation: HadoopFsRelation,
output: Seq[Attribute],
requiredSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
。。。
}
它的主要執行代碼doExecute()的功能邏輯以下:
protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this).execute()
} else {
val unsafeRows = {
val scan = inputRDD
if (needsUnsafeRowConversion) {
scan.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map(proj)
}
} else {
scan
}
}
val numOutputRows = longMetric("numOutputRows")
unsafeRows.map { r =>
numOutputRows += 1
r
}
}
}
inputRDD有兩種方式建立,一是createBucketedReadRDD,二是createNonBucketedReadRDD。二者沒有本質的區別,僅僅是文件分區規則的不一樣。
private lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
}
}
createNonBucketedReadRDD調用FileScanRDD :
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
SparkPlan是物理執行計劃的基類。sql語句經過分析和優化造成logicPlan,logicPlan根據數據源的類型生成具體的sparkPlan。
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
經過SqlParser類的parsePlan方法將sql語句解析成logicPlan,最後經過Dataset.ofRows方法將logicPlan生成Dataframe。
這裏存在兩種sessionState,一種是Hive的,另外一種是in-memory類型的。這裏咱們看看in-memory類型的。文件是spark.sql.internal.SessionState.scala。
SessionState中主要的方法是:
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
從邏輯計劃生成QueryExecution,而SparkPlan是QueryPlan的子類。QueryExecution和QueryPlan的關係應該是QueryPlan是物理計劃,而QueryExecution是物理計劃的實際執行。要注意的一點是SessionState中的createQueryExecution方法是SessionState的構造函數入參傳進去的。默認實現代碼以下:
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}
直接從logicPlan構造QueryExecution對象。
方法 |
說明 |
doExecute |
構造出RDD[InternalRow],在sparkPlan的子類中具體實現。 總的來講經過各類datasource或hive來生成迭代器,建立出對應的RDD。 |
executeCollect () |
得到Array[InternalRow]。實際啓動job |
executeToIterator |
def executeToIterator(): Iterator[InternalRow] = { getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows) } 實際啓動job執行了。 |
|
|
|
|
|
|
|
|
從LogicPlan生成QueryExecution,表明了邏輯計劃的實際執行。
方法 |
說明 |
analyzed |
分析logicPlan,生成新的分析過的logicPlan |
assertAnalyzed() |
調用analyzed,檢查是否分析(sparkSession.sessionState.analyzer.checkAnalysis(analyzed)) |
optimizedPlan |
優化logicPlan,返回優化後的logicPlan |
sparkPlan |
從optimizedPlan規劃出最優的sparkPlan |
executedPlan |
從sparkPlan生成等待執行的sparkPlan,調用prepareForExecution方法。入參和返回類型相同。 |
toRdd |
生成RDD[InternalRow],實現代碼是直接調用executedPlan.execute()方法。 |
|
|
SparkSession.sql()方法的流程大體以下:
首先從SessionState對sql語句分析生成LogicPlan;
而後用QueryExecution對logicPlan進行各類預處理、包括分析優化等,生成SprkPlan;
最後從優化後的sparkPlan調用execute方法生成RDD[InternalRow]對象,用dataset.ofRows方法從RDD生成Dataframe。
LogicPlan是QueryPlan的子類。
本地Relation,也算是本地LogicPlan的一種。表示內存本地的dataset數據。
邏輯關係,入參是BaseRelation。讀cvs,json,txt等文件是經過logicalRelation來構造邏輯計劃的,此時cvs,json,txt等都是DataSource,構造BaseRelation。
LogicalRelation繼承leafNode,而leafNode繼承LogicPlan。其餘沒有多少額外的邏輯。
注意的是unaryNode也是繼承LogicPlan。
基本的邏輯計劃處理函數,在解析logicPlan時須要調用到裏面一系列邏輯去解析。這裏的類都是講邏輯計劃生成新的logicPlan。包括UnaryNode、LeafNode等類型(他們都是LogicPlan的子類)。
方法 |
說明 |
Generate |
生成新的列,是否在原來列的基礎上加上新的列。 |
Filter |
過濾。主要邏輯: override protected def validConstraints: Set[Expression] = { val predicates = splitConjunctivePredicates(condition) .filterNot(SubqueryExpression.hasCorrelatedSubquery) child.constraints.union(predicates.toSet) } 將條件變成predicates,添加到child的constraints裏。 |
Subquery |
子查詢,好像沒有更多操做。簡單的設置output爲child的output |
Project |
主要處理別名,將別名加到child的constraints裏。 |
Intersect |
鏈接操做,將left\right兩個logicPlan鏈接,constraints也鏈接 |
Except |
排除right的logicPlan,constraints設置爲left的constraints |
Union |
鏈接多個logicPlan |
Sort |
排序。設置了Seq[SortOrder]。 |
Range |
id字段範圍? |
Aggregate |
聚合函數類。定義: case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode
|
|
|
sql分析器。在BaseSessionStateBuilder中初始化。調用catalyst的analyzer。
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
HiveOnlyCheck +:
customCheckRules
}
分析器定義一系列的Rule,每一個Rule對logicPlan處理替換生成新的logicPlan。用execution/datasources/rules.scala文件中定義的各類Rule示例。
建立表或者添加數據到表時,檢查表結構和sql之間是否匹配,是否存在sql錯誤(如表分區不存在)等。
截圖部分示例代碼簡單說明其機制:
// When we append data to an existing table, check if the given provider, partition columns,
// bucket spec, etc. match the existing table, and adjust the columns order of the given query
// if necessary.
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
if query.resolved && catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
assert(tableDesc.provider.isDefined)
val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
val existingTable = catalog.getTableMetadata(tableIdentWithDB)
if (existingTable.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("Saving data into a view is not allowed.")
}
// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
throw new AnalysisException(s"The format of the existing table $tableName is " +
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
s"`${specifiedProvider.getSimpleName}`.")
}
if (query.schema.length != existingTable.schema.length) {
throw new AnalysisException(
s"The column number of the existing table $tableName" +
s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
s"(${query.schema.catalogString})")
}
val resolver = sparkSession.sessionState.conf.resolver
val tableCols = existingTable.schema.map(_.name)
// As we are inserting into an existing table, we should respect the existing schema, preserve
// the case and adjust the column order of the given DataFrame according to it, or throw
// an exception if the column names do not match.
val adjustedColumns = tableCols.map { col =>
query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
}
}
若是sql錯誤,這個階段就會拋出異常,不往下執行了。
分析insert into語法,判斷列名稱是否存在,列數組和內容數組是否數量匹配等,錯誤時拋出異常。
sql優化器。在BaseSessionStateBuilder中初始化。調用catalyst的optimizer。
protected def optimizer: Optimizer = {
new SparkOptimizer(catalog, conf, experimentalMethods) {
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
}
若是一個sql語句中的DISTINCT類操做,如(distinct,max,min)等做用的col是分區的col。那麼對該sql優化,將查詢限制在該分區進行。
用到spark-catalyst相關類。
如select col1, max(col2) from tbl1 group by col1。會優化成select col1.partion,max(col2) from tbl1[col1 partion]這樣的形式,這裏只是示意。
物理計劃規劃器,從logicPlan中生成多個可供選擇的sparkPlan。SparkPlanner繼承SparkStrategies,SparkStrategies繼承QueryPlanner。
從下面方法建立:
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}
plan方法:
組合條件生成SparkPlan的策略,生成哪一個子類的SparkPlan。
根據logicPlan的類型不一樣,生成不一樣的UnaryExecNode,它的子類有:
名稱 |
說明 |
SortExec |
重寫doExecute(): RDD[InternalRow]方法。用sorter迭代器(UnsafeExternalRowSorter)。而UnsafeExternalRowSorter使用了UnsafeExternalSorter類(這是Spark Core中的一個類,用於對迭代器排序:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;) |
FilterExec |
執行過濾。 |
|
|
|
|
|
|
排序迭代器實現。重寫doExecute()函數:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = newPredicate(condition, child.output)
predicate.initialize(0)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
過濾操做的物理執行。
各類類型物理執行計劃。有ProjectExec、SortExec、FilterExec等等。都重寫了SparkPlan的doExecute方法。
名稱 |
說明 |
ProjectExec |
輸出列 |
FilterExec |
過濾物理計劃 |
SampleExec |
取樣 |
RangeExec |
範圍 |
UnionExec |
聯合查詢 |
CoalesceExec |
級聯返回非空值? |
SubqueryExec |
主要功能是執行child計劃 |
重寫方法:
protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
project.initialize(index)
iter.map(project)
}
}
調用UnsafeProjection對InternalRow進行映射轉化。
如:select id, func(name) from table1 where id>1;
其中id,func(name)的解析就是Projection。id和func(name)能夠看出Expression。對每一個Expression調用expression.eval(internalRow)的過程就是Projection,根據本條數據解析表達式,輸出最終結果。
入參:Expression和val child sparkPlan
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val predicate = newPredicate(condition, child.output)
predicate.initialize(0)
iter.filter { row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
}
}
}
用newPredicate對InternalRow進行過濾,若是知足條件則輸出行數累加器加1.
對數據取樣,取出分區數據的一部分數據。
子查詢,執行子查詢的execute方法
Range查詢
聯合查詢,合併多個查詢。
protected override def doExecute(): RDD[InternalRow] =
sparkContext.union(children.map(_.execute()))
protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
}