spark sql中不少後續操做,如select(),filter()等都是在dataset中定義的。好比select()操做會生成新的Projectection類型的logicPlan,filter會生成Filter類型的logicPlan。dataset中有兩大類數據源:一種是format()方法從DataSource子類中讀取數據,如cvs、json、txt等格式;另外一種是sql()方法從sql語句來,解析成SparkPlan。最後二者都是經過RDD[internalRow]格式的迭代器來檢索數據。java
format()返回的是DataFrameReader對象,而後load()方法返回DataFrame。load()的核心實現代碼:算法
@scala.annotation.varargssql
def load(paths: String*): DataFrame = {數據庫
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {express
throw new AnalysisException("Hive data source can only be used with tables, you can not " +apache
"read files of Hive data source directly.")編程
}json
sparkSession.baseRelationToDataFrame(數組
DataSource.apply(性能優化
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}
SparkSession中:
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
抽象類,表明基礎的數據源關係,子類通常經過Scan方法來返回RDD[Row]格式的數據。對於json、cvs、txt等數據源都有對應的BaseRelation子類。
一些子類的特性定義的例子,定義在sql/source/interfaces.scala中:
/**
* A BaseRelation that can produce all of its tuples as an RDD of Row objects.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait TableScan {
def buildScan(): RDD[Row]
}
/**
* A BaseRelation that can eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait PrunedScan {
def buildScan(requiredColumns: Array[String]): RDD[Row]
}
/**
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
* The actual filter should be the conjunction of all `filters`,
* i.e. they should be "and" together.
*
* The pushed down filters are currently purely an optimization as they will all be evaluated
* again. This means it is safe to use them with methods that produce false positives such
* as filtering partitions based on a bloom filter.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}
邏輯查詢關係,基礎LeadNode。從BaseRelation生成LogicalRelation。
將BaseRelation生成的LogicalRelation到LogicalPlan生成SparkPlan。這裏定義了一系列策略決定若是處理各類不一樣的數據源。
最核心和基礎的一個方法是BasicOperators,定義了各類LogicalPlan的執行邏輯,以下:
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
case logical.Distinct(child) =>
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Intersect(left, right) =>
throw new IllegalStateException(
"logical intersect operator should have been replaced by semi-join in the optimizer")
case logical.Except(left, right) =>
throw new IllegalStateException(
"logical except operator should have been replaced by anti-join in the optimizer")
case logical.DeserializeToObject(deserializer, objAttr, child) =>
execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil
case logical.SerializeFromObject(serializer, child) =>
execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil
case logical.MapPartitions(f, objAttr, child) =>
execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil
case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) =>
execution.MapPartitionsExec(
execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
data, objAttr, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil
case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsWithState(
f, key, value, grouping, data, output, _, _, _, timeout, child) =>
execution.MapGroupsExec(
f, key, value, grouping, data, output, timeout, planLater(child)) :: Nil
case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) =>
execution.CoGroupExec(
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
planLater(left), planLater(right)) :: Nil
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.ProjectExec(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.FilterExec(condition, planLater(child)) :: Nil
case f: logical.TypedFilter =>
execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil
case logical.Union(unionChildren) =>
execution.UnionExec(unionChildren.map(planLater)) :: Nil
case g @ logical.Generate(generator, join, outer, _, _, child) =>
execution.GenerateExec(
generator, join = join, outer = outer, g.qualifiedGeneratorOutput,
planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case logical.RepartitionByExpression(expressions, child, numPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, numPartitions), planLater(child)) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
case h: ResolvedHint => planLater(h.child) :: Nil
case _ => Nil
}
}
數據源基類,cvs、json、txt等都是其擴張子類。統一的方法:
定義了怎麼物理執行DataSource,生成InternalRow數據。
一系列子類或者實現類:
名稱 |
說明 |
FileSourceScanExec |
讀取文件,生成Rdd[InternalRow] 。 裏面會調用到具體的DataSource或者BaseRelation的readFile方法來迭代數據源生成InternalRow的迭代器。 內部會調用FileScanRDD來分區讀數據 |
RowDataSourceScanExec |
對行執行操做,如Project,選擇指定的輸出列的操做之類。 |
|
|
主要這兩個實現類。
jdbc首先定義了JDBCRelation繼承BaseRelation,重寫了buildScan方法,該方法經過建立JDBCRDD的方式來返回RDD[Row],JDBCRDD中經過jdbc驅動生成查詢sql語句,查詢數據庫。生成sql語句的時候JDBCRDD會拼接一個sql,包括了過濾條件的解析。
JDBCRDD中有一段compute()方法的定義:
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow]
其中最後這段:
CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
它的做用是將jdbc驅動生成的Iterator封裝成新的Iterator,新的Iterator會在最後一條數據處理完成以後執行close()方法,close()方法關閉jdbc鏈接釋放鏈接資源。
UnsafeRow是InternalRow的子類,是字節數組表示的一行數據,若是字段是整形、浮點等數字型,則unsafeRow中對應的字段直接保存值,若是是字符串、數組等大數據,則保存數據的位置。
UnsafeRow的類說明摘錄以下:
/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
*
* Each tuple has three parts: [null bit set] [values] [variable length portion]
*
* The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores
* one bit per field.
*
* In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
* primitive types, such as long, double, or int, we store the value directly in the word. For
* fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
* base address of the row) that points to the beginning of the variable-length field, and length
* (they are combined into a long).
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
全部讀取DataSource的實現都經過HadoopFsRelation,它繼承BaseRelation。
通常sql操做都是做用 在單行數據上,只要在Iterator上層層嵌套操做運算符就能夠了,通常的聚合操做如求和,求平均值也好辦,只要在迭代器上添加一個全局累加器變量也能夠很輕鬆的實現,有的聚合操做不是那麼容易的,好比排序、鏈接操做。這裏能夠重點觀察鏈接操做的數據鏈,看它的內部實現機制是怎樣的。
消除指定的列中的重複行,只保留一份。
定義Deduplicate,指定哪些列是要判斷是否重複的(keys):
case class Deduplicate(
keys: Seq[Attribute],
child: LogicalPlan,
streaming: Boolean) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
對應的物理執行計劃,在SparkStrateies中:
/**
* Used to plan the streaming deduplicate operator.
*/
object StreamingDeduplicationStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case Deduplicate(keys, child, true) =>
StreamingDeduplicateExec(keys, planLater(child)) :: Nil
case _ => Nil
}
}
順藤摸瓜,看看StreamingDeduplicateExec的關鍵函數的代碼,如下代碼在sql/execution/streaming/state/statefulOperatores中:
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
getStateId.operatorId,
getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
val numOutputRows = longMetric("numOutputRows")
val numTotalStateRows = longMetric("numTotalStateRows")
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
val baseIterator = watermarkPredicateForData match {
case Some(predicate) => iter.filter(row => !predicate.eval(row))
case None => iter
}
val result = baseIterator.filter { r =>
val row = r.asInstanceOf[UnsafeRow]
val key = getKey(row)
val value = store.get(key)
if (value.isEmpty) {
store.put(key.copy(), StreamingDeduplicateExec.EMPTY_ROW)
numUpdatedStateRows += 1
numOutputRows += 1
true
} else {
// Drop duplicated rows
false
}
}
CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
store.commit()
numTotalStateRows += store.numKeys()
})
}
看看裏面用到了哪些關鍵類或者關鍵方法:
(1)mapPartitionsWithStateStore
引入StateStore。跟蹤代碼進去看看StateStore是幹什麼用的。
在sql/execution/streaming/state/package.scala中,定義了StateStoreOps類,裏面有mapPartitionsWithStateStore方法,這裏貼出其代碼:
/** Map each partition of an RDD along with data in a [[StateStore]]. */
def mapPartitionsWithStateStore[U: ClassTag](
sqlContext: SQLContext,
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType)(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
mapPartitionsWithStateStore(
checkpointLocation,
operatorId,
storeVersion,
keySchema,
valueSchema,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator))(
storeUpdateFunction)
}
/** Map each partition of an RDD along with data in a [[StateStore]]. */
private[streaming] def mapPartitionsWithStateStore[U: ClassTag](
checkpointLocation: String,
operatorId: Long,
storeVersion: Long,
keySchema: StructType,
valueSchema: StructType,
sessionState: SessionState,
storeCoordinator: Option[StateStoreCoordinatorRef])(
storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
val wrappedF = (store: StateStore, iter: Iterator[T]) => {
// Abort the state store in case of error
TaskContext.get().addTaskCompletionListener(_ => {
if (!store.hasCommitted) store.abort()
})
cleanedF(store, iter)
}
new StateStoreRDD(
dataRDD,
wrappedF,
checkpointLocation,
operatorId,
storeVersion,
keySchema,
valueSchema,
sessionState,
storeCoordinator)
}
}
在sql/execution/streaming/state/StateStore.scala中,定義了stateStore.scala。它是用來處理流式Iterator的。好比處理重複行數據等相似操做。
一個StateStore的實現。
去重操做的實現。
還沒徹底看完,先說說編程e思想。
定義一種StoreState對象,用於管理數據源的中間存儲,每行數據保存到StoreState時能夠進行去重處理,而後再從StoreState生成Iterator做爲結果數據,完成Distinct之類的聚合操做。性能優化和擴展在StoreState中完成,保證框架的穩定性。
一種內存中保存<Key Value>數據的機制。字節組級別的操做。
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
*
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
* which is guaranteed to exhaust the space.
*
* The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
* probably be using sorting instead of hashing for better cache locality.
*
* The key and values under the hood are stored together, in the following format:
* Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
* Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair
*
* This means that the first four bytes store the entire record (key + value) length. This format
* is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
* so we can pass records from this map directly into the sorter to sort records in place.
*/
內核中的內存外部排序類,供排序場景時調用。
將Compare接口傳給它,它會維護一個內存區域保存要排序的數據,insertRecord方法每次處理一條記錄將數據插入到內存中,UnsafeExternalSorter在插入數據時作排序,最終它返回一個Iterator出來表明已經排序好的迭代器。數據在內存中以<Key,Value>的方式保存爲字節組。
主要是內存管理,Sort利用Sorter.java和TimSort.java的邏輯來實現。
內存排序算法,使用Alpha-prefix算法。首先他會用到SortComparator。
它使用memoryManager來保存帶比較的兩個對象,將對象的基礎對象和位移(offset)傳遞給Sorter作比較,比較的結果爲1,0,-1分別表明是大於、等於仍是小於。
比較的對象包括對象的內存地址和prefix,若是prefix不等則直接根據prefix的結果做爲比較的結果,若是prefix相等,則根據內存地址從memoryManager中取出內存塊的首地址和該對象在內存塊的offset,而後比較兩個內存塊。內存塊表明的具體對象格式本身來定義,若是解析內存塊對象也經過入參recordComparator來指定。
系列聚合操做的實現。
Dataset.agg的實現,大部分是調用dataset.groupby().agg方法。
先看看groupByKey方法
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)
new KeyValueGroupedDataset(
encoderFor[K],
encoderFor[T],
executed,
inputPlan.output,
withGroupingKey.newColumns)
}
對sql添加一個表示聚合函數的Column,而後生成KeyValueGroupedDataset,這是dataset的子類,表示聚合過以後的dataset。
分組函數。
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
val colNames: Seq[String] = col1 +: cols
RelationalGroupedDataset(
toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.GroupByType)
}
groupby操做都是將dataset轉換成RelationalGroupedDataset,經過RelationalGroupedDataset來操做數據的,看看RelationalGroupedDataset的關鍵定義和關鍵方法。
RelationalGroupedDataset對於GroupByType類型的groupby操做,轉換成Aggregate類型的邏輯計劃來執行。轉換代碼以下:
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {
groupingExprs ++ aggExprs
} else {
aggExprs
}
val aliasedAgg = aggregates.map(alias)
groupType match {
case RelationalGroupedDataset.GroupByType =>
Dataset.ofRows(
df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.RollupType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.CubeType =>
Dataset.ofRows(
df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.ofRows(
df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
}
}
下面再看看裏面比較關鍵的一個方法agg的實現代碼:
/**
* (Scala-specific) Compute aggregates by specifying the column names and
* aggregate methods. The resulting `DataFrame` will also contain the grouping columns.
*
* The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
* {{{
* // Selects the age of the oldest employee and the aggregate expense for each department
* df.groupBy("department").agg(
* "age" -> "max",
* "expense" -> "sum"
* )
* }}}
*
* @since 1.3.0
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
strToExpr(expr)(df(colName).expr)
})
}
經過toDF方法將本Dataframe轉換成目標Dataframe,將agg方法放在轉換函數中。其餘各類聚合操做如count,avg,min,max等都是經過調用toDF方法來執行的。toDF方法經過Aggregate來描述邏輯計劃(LogicPlan)。所以研究聚合函數的重心實際上是在研究Aggregate的編程實現上面來。
看看Aggregate的主函數和主要邏輯。它的類名是:org.apache.spark.sql.catalyst.plans.logical.Aggregate,定義在文件catalyst/plans/logical/badicLogicalOperators.scala中。
定義以下:
case class Aggregate(
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan)
extends UnaryNode {
。。。。
}
再看看對應的Aggregate的物理計劃是怎麼定義的。
object Aggregation extends Strategy
在SparkStrateries中定義如何處理Agg類型的邏輯計劃。
val aggregateOperator =
if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
} else {
aggregate.AggUtils.planAggregateWithOneDistinct(
groupingExpressions,
functionsWithDistinct,
functionsWithoutDistinct,
resultExpressions,
planLater(child))
}
aggregateOperator
經過AggUtils.planAggregateWithoutDistinct方法來構造對應的SparkPlan物理執行計劃。而該planAggregateWithoutDistinct(或planAggregateWithDistinct)方法的主要做用就是判斷是應該選擇哪一種類型的Agg操做,是HashAggregateExec、ObjectHashAggregateExec仍是SortAggregateExec。
咱們看一個HashAggregateExec的例子就好了,其餘的都很相似。
HashAggregateExec的定義和主要方法。
HashAggregateExec用到了TungstenAggregationIterator。
源碼說明摘要:
/**
* An iterator used to evaluate aggregate functions. It operates on [[UnsafeRow]]s.
*
* This iterator first uses hash-based aggregation to process input rows. It uses
* a hash map to store groups and their corresponding aggregation buffers. If
* this map cannot allocate memory from memory manager, it spills the map into disk
* and creates a new one. After processed all the input, then merge all the spills
* together using external sorter, and do sort-based aggregation.
*
* The process has the following step:
* - Step 0: Do hash-based aggregation.
* - Step 1: Sort all entries of the hash map based on values of grouping expressions and
* spill them to disk.
* - Step 2: Create an external sorter based on the spilled sorted map entries and reset the map.
* - Step 3: Get a sorted [[KVIterator]] from the external sorter.
* - Step 4: Repeat step 0 until no more input.
* - Step 5: Initialize sort-based aggregation on the sorted iterator.
* Then, this iterator works in the way of sort-based aggregation.
*
* The code of this class is organized as follows:
* - Part 1: Initializing aggregate functions.
* - Part 2: Methods and fields used by setting aggregation buffer values,
* processing input rows from inputIter, and generating output
* rows.
* - Part 3: Methods and fields used by hash-based aggregation.
* - Part 4: Methods and fields used when we switch to sort-based aggregation.
* - Part 5: Methods and fields used by sort-based aggregation.
* - Part 6: Loads input and process input rows.
* - Part 7: Public methods of this iterator.
* - Part 8: A utility function used to generate a result when there is no
* input and there is no grouping expression.
*
* @param groupingExpressions
* expressions for grouping keys
* @param aggregateExpressions
* [[AggregateExpression]] containing [[AggregateFunction]]s with mode [[Partial]],
* [[PartialMerge]], or [[Final]].
* @param aggregateAttributes the attributes of the aggregateExpressions'
* outputs when they are stored in the final aggregation buffer.
* @param resultExpressions
* expressions for generating output rows.
* @param newMutableProjection
* the function used to create mutable projections.
* @param originalInputAttributes
* attributes of representing input rows from `inputIter`.
* @param inputIter
* the iterator containing input [[UnsafeRow]]s.
*/
最終執行都是子類的要麼是buildScana生成RDD[Row],要麼是readFile生成InternalRow的迭代器。