Spark Dataset

spark sql中不少後續操做,如select(),filter()等都是在dataset中定義的。好比select()操做會生成新的Projectection類型的logicPlan,filter會生成Filter類型的logicPlan。dataset中有兩大類數據源:一種是format()方法從DataSource子類中讀取數據,如cvs、json、txt等格式;另外一種是sql()方法從sql語句來,解析成SparkPlan。最後二者都是經過RDD[internalRow]格式的迭代器來檢索數據。java

format()返回的是DataFrameReader對象,而後load()方法返回DataFrame。load()的核心實現代碼:算法

@scala.annotation.varargssql

  def load(paths: String*): DataFrame = {數據庫

    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {express

      throw new AnalysisException("Hive data source can only be used with tables, you can not " +apache

        "read files of Hive data source directly.")編程

    }json

 

    sparkSession.baseRelationToDataFrame(數組

      DataSource.apply(性能優化

        sparkSession,

        paths = paths,

        userSpecifiedSchema = userSpecifiedSchema,

        className = source,

        options = extraOptions.toMap).resolveRelation())

  }

 

SparkSession中:

  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {

    Dataset.ofRows(self, LogicalRelation(baseRelation))

  }

 

    1. BaseRelation

抽象類,表明基礎的數據源關係,子類通常經過Scan方法來返回RDD[Row]格式的數據。對於json、cvs、txt等數據源都有對應的BaseRelation子類。

一些子類的特性定義的例子,定義在sql/source/interfaces.scala中:

/**

 * A BaseRelation that can produce all of its tuples as an RDD of Row objects.

 *

 * @since 1.3.0

 */

@InterfaceStability.Stable

trait TableScan {

  def buildScan(): RDD[Row]

}

 

/**

 * A BaseRelation that can eliminate unneeded columns before producing an RDD

 * containing all of its tuples as Row objects.

 *

 * @since 1.3.0

 */

@InterfaceStability.Stable

trait PrunedScan {

  def buildScan(requiredColumns: Array[String]): RDD[Row]

}

 

/**

 * A BaseRelation that can eliminate unneeded columns and filter using selected

 * predicates before producing an RDD containing all matching tuples as Row objects.

 *

 * The actual filter should be the conjunction of all `filters`,

 * i.e. they should be "and" together.

 *

 * The pushed down filters are currently purely an optimization as they will all be evaluated

 * again.  This means it is safe to use them with methods that produce false positives such

 * as filtering partitions based on a bloom filter.

 *

 * @since 1.3.0

 */

@InterfaceStability.Stable

trait PrunedFilteredScan {

  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

}

 

    1. LogicalRelation

邏輯查詢關係,基礎LeadNode。從BaseRelation生成LogicalRelation。

 

    1. SparkStrategy

BaseRelation生成的LogicalRelationLogicalPlan生成SparkPlan。這裏定義了一系列策略決定若是處理各類不一樣的數據源。

最核心和基礎的一個方法是BasicOperators,定義了各類LogicalPlan的執行邏輯,以下:

// Can we automate these 'pass through' operations?

  object BasicOperators extends Strategy {

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

 

      case MemoryPlan(sink, output) =>

        val encoder = RowEncoder(sink.schema)

        LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil

 

      case logical.Distinct(child) =>

        throw new IllegalStateException(

          "logical distinct operator should have been replaced by aggregate in the optimizer")

      case logical.Intersect(left, right) =>

        throw new IllegalStateException(

          "logical intersect operator should have been replaced by semi-join in the optimizer")

      case logical.Except(left, right) =>

        throw new IllegalStateException(

          "logical except operator should have been replaced by anti-join in the optimizer")

 

      case logical.DeserializeToObject(deserializer, objAttr, child) =>

        execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil

      case logical.SerializeFromObject(serializer, child) =>

        execution.SerializeFromObjectExec(serializer, planLater(child)) :: Nil

      case logical.MapPartitions(f, objAttr, child) =>

        execution.MapPartitionsExec(f, objAttr, planLater(child)) :: Nil

      case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) =>

        execution.MapPartitionsExec(

          execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil

      case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>

        execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,

          data, objAttr, planLater(child)) :: Nil

      case logical.MapElements(f, _, _, objAttr, child) =>

        execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil

      case logical.AppendColumns(f, _, _, in, out, child) =>

        execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil

      case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>

        execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil

      case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>

        execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil

      case logical.FlatMapGroupsWithState(

          f, key, value, grouping, data, output, _, _, _, timeout, child) =>

        execution.MapGroupsExec(

          f, key, value, grouping, data, output, timeout, planLater(child)) :: Nil

      case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) =>

        execution.CoGroupExec(

          f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,

          planLater(left), planLater(right)) :: Nil

 

      case logical.Repartition(numPartitions, shuffle, child) =>

        if (shuffle) {

          ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil

        } else {

          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil

        }

      case logical.Sort(sortExprs, global, child) =>

        execution.SortExec(sortExprs, global, planLater(child)) :: Nil

      case logical.Project(projectList, child) =>

        execution.ProjectExec(projectList, planLater(child)) :: Nil

      case logical.Filter(condition, child) =>

        execution.FilterExec(condition, planLater(child)) :: Nil

      case f: logical.TypedFilter =>

        execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil

      case e @ logical.Expand(_, _, child) =>

        execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil

      case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>

        execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil

      case logical.Sample(lb, ub, withReplacement, seed, child) =>

        execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil

      case logical.LocalRelation(output, data) =>

        LocalTableScanExec(output, data) :: Nil

      case logical.LocalLimit(IntegerLiteral(limit), child) =>

        execution.LocalLimitExec(limit, planLater(child)) :: Nil

      case logical.GlobalLimit(IntegerLiteral(limit), child) =>

        execution.GlobalLimitExec(limit, planLater(child)) :: Nil

      case logical.Union(unionChildren) =>

        execution.UnionExec(unionChildren.map(planLater)) :: Nil

      case g @ logical.Generate(generator, join, outer, _, _, child) =>

        execution.GenerateExec(

          generator, join = join, outer = outer, g.qualifiedGeneratorOutput,

          planLater(child)) :: Nil

      case logical.OneRowRelation =>

        execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil

      case r: logical.Range =>

        execution.RangeExec(r) :: Nil

      case logical.RepartitionByExpression(expressions, child, numPartitions) =>

        exchange.ShuffleExchange(HashPartitioning(

          expressions, numPartitions), planLater(child)) :: Nil

      case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil

      case r: LogicalRDD =>

        RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil

      case h: ResolvedHint => planLater(h.child) :: Nil

      case _ => Nil

    }

  }

 

    1. DataSource

數據源基類,cvs、json、txt等都是其擴張子類。統一的方法:

 

    1. DataSourceScanExec

定義了怎麼物理執行DataSource,生成InternalRow數據。

一系列子類或者實現類:

名稱

說明

FileSourceScanExec

讀取文件,生成Rdd[InternalRow] 。

裏面會調用到具體的DataSource或者BaseRelationreadFile方法來迭代數據源生成InternalRow的迭代器。

內部會調用FileScanRDD來分區讀數據

RowDataSourceScanExec

對行執行操做,如Project,選擇指定的輸出列的操做之類。

 

 

主要這兩個實現類。

      1. jdbc數據源的處理

jdbc首先定義了JDBCRelation繼承BaseRelation,重寫了buildScan方法,該方法經過建立JDBCRDD的方式來返回RDD[Row],JDBCRDD中經過jdbc驅動生成查詢sql語句,查詢數據庫。生成sql語句的時候JDBCRDD會拼接一個sql,包括了過濾條件的解析。

JDBCRDD中有一段compute()方法的定義:

override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow]

其中最後這段:

  CompletionIterator[InternalRow, Iterator[InternalRow]](

      new InterruptibleIterator(context, rowsIterator), close())

它的做用是將jdbc驅動生成的Iterator封裝成新的Iterator,新的Iterator會在最後一條數據處理完成以後執行close()方法,close()方法關閉jdbc鏈接釋放鏈接資源。

      1. UnsafeRow

UnsafeRowInternalRow的子類,是字節數組表示的一行數據,若是字段是整形、浮點等數字型,則unsafeRow中對應的字段直接保存值,若是是字符串、數組等大數據,則保存數據的位置。

UnsafeRow的類說明摘錄以下:

/**

 * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.

 *

 * Each tuple has three parts: [null bit set] [values] [variable length portion]

 *

 * The bit set is used for null tracking and is aligned to 8-byte word boundaries.  It stores

 * one bit per field.

 *

 * In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length

 * primitive types, such as long, double, or int, we store the value directly in the word. For

 * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the

 * base address of the row) that points to the beginning of the variable-length field, and length

 * (they are combined into a long).

 *

 * Instances of `UnsafeRow` act as pointers to row data stored in this format.

 */

 

      1. HadoopFsRelation

全部讀取DataSource的實現都經過HadoopFsRelation,它繼承BaseRelation。

 

    1. 聚合操做

通常sql操做都是做用 在單行數據上,只要在Iterator上層層嵌套操做運算符就能夠了,通常的聚合操做如求和,求平均值也好辦,只要在迭代器上添加一個全局累加器變量也能夠很輕鬆的實現,有的聚合操做不是那麼容易的,好比排序、鏈接操做。這裏能夠重點觀察鏈接操做的數據鏈,看它的內部實現機制是怎樣的。

      1. dropDuplicates

消除指定的列中的重複行,只保留一份。

定義Deduplicate,指定哪些列是要判斷是否重複的(keys)

case class Deduplicate(

    keys: Seq[Attribute],

    child: LogicalPlan,

    streaming: Boolean) extends UnaryNode {

  override def output: Seq[Attribute] = child.output

}

對應的物理執行計劃,在SparkStrateies中:

/**

   * Used to plan the streaming deduplicate operator.

   */

  object StreamingDeduplicationStrategy extends Strategy {

    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case Deduplicate(keys, child, true) =>

        StreamingDeduplicateExec(keys, planLater(child)) :: Nil

 

      case _ => Nil

    }

  }

順藤摸瓜,看看StreamingDeduplicateExec的關鍵函數的代碼,如下代碼在sql/execution/streaming/state/statefulOperatores中:

override protected def doExecute(): RDD[InternalRow] = {

    metrics // force lazy init at driver

 

    child.execute().mapPartitionsWithStateStore(

      getStateId.checkpointLocation,

      getStateId.operatorId,

      getStateId.batchId,

      keyExpressions.toStructType,

      child.output.toStructType,

      sqlContext.sessionState,

      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>

      val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)

      val numOutputRows = longMetric("numOutputRows")

      val numTotalStateRows = longMetric("numTotalStateRows")

      val numUpdatedStateRows = longMetric("numUpdatedStateRows")

 

      val baseIterator = watermarkPredicateForData match {

        case Some(predicate) => iter.filter(row => !predicate.eval(row))

        case None => iter

      }

 

      val result = baseIterator.filter { r =>

        val row = r.asInstanceOf[UnsafeRow]

        val key = getKey(row)

        val value = store.get(key)

        if (value.isEmpty) {

          store.put(key.copy(), StreamingDeduplicateExec.EMPTY_ROW)

          numUpdatedStateRows += 1

          numOutputRows += 1

          true

        } else {

          // Drop duplicated rows

          false

        }

      }

 

      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {

        watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))

        store.commit()

        numTotalStateRows += store.numKeys()

      })

    }

看看裏面用到了哪些關鍵類或者關鍵方法:

1mapPartitionsWithStateStore

引入StateStore。跟蹤代碼進去看看StateStore是幹什麼用的。

      1. StateStore

sql/execution/streaming/state/package.scala中,定義了StateStoreOps類,裏面有mapPartitionsWithStateStore方法,這裏貼出其代碼:

/** Map each partition of an RDD along with data in a [[StateStore]]. */

    def mapPartitionsWithStateStore[U: ClassTag](

        sqlContext: SQLContext,

        checkpointLocation: String,

        operatorId: Long,

        storeVersion: Long,

        keySchema: StructType,

        valueSchema: StructType)(

        storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {

 

      mapPartitionsWithStateStore(

        checkpointLocation,

        operatorId,

        storeVersion,

        keySchema,

        valueSchema,

        sqlContext.sessionState,

        Some(sqlContext.streams.stateStoreCoordinator))(

        storeUpdateFunction)

    }

 

    /** Map each partition of an RDD along with data in a [[StateStore]]. */

    private[streaming] def mapPartitionsWithStateStore[U: ClassTag](

        checkpointLocation: String,

        operatorId: Long,

        storeVersion: Long,

        keySchema: StructType,

        valueSchema: StructType,

        sessionState: SessionState,

        storeCoordinator: Option[StateStoreCoordinatorRef])(

        storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {

 

      val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)

      val wrappedF = (store: StateStore, iter: Iterator[T]) => {

        // Abort the state store in case of error

        TaskContext.get().addTaskCompletionListener(_ => {

          if (!store.hasCommitted) store.abort()

        })

        cleanedF(store, iter)

      }

      new StateStoreRDD(

        dataRDD,

        wrappedF,

        checkpointLocation,

        operatorId,

        storeVersion,

        keySchema,

        valueSchema,

        sessionState,

        storeCoordinator)

    }

  }

 

sql/execution/streaming/state/StateStore.scala中,定義了stateStore.scala。它是用來處理流式Iterator的。好比處理重複行數據等相似操做。

 

      1. HDFSBackedStateStoreProvider

一個StateStore的實現。

 

      1. Distinct

去重操做的實現。

還沒徹底看完,先說說編程e思想。

定義一種StoreState對象,用於管理數據源的中間存儲,每行數據保存到StoreState時能夠進行去重處理,而後再從StoreState生成Iterator做爲結果數據,完成Distinct之類的聚合操做。性能優化和擴展在StoreState中完成,保證框架的穩定性。

 

      1. BytesToBytesMap

一種內存中保存<Key Value>數據的機制。字節組級別的操做。

/**

 * An append-only hash map where keys and values are contiguous regions of bytes.

 *

 * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,

 * which is guaranteed to exhaust the space.

 *

 * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should

 * probably be using sorting instead of hashing for better cache locality.

 *

 * The key and values under the hood are stored together, in the following format:

 *   Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4

 *   Bytes 4 to 8: len(k)

 *   Bytes 8 to 8 + len(k): key data

 *   Bytes 8 + len(k) to 8 + len(k) + len(v): value data

 *   Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair

 *

 * This means that the first four bytes store the entire record (key + value) length. This format

 * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},

 * so we can pass records from this map directly into the sorter to sort records in place.

 */

 

      1. UnsafeExternalSorter

內核中的內存外部排序類,供排序場景時調用。

將Compare接口傳給它,它會維護一個內存區域保存要排序的數據,insertRecord方法每次處理一條記錄將數據插入到內存中,UnsafeExternalSorter在插入數據時作排序,最終它返回一個Iterator出來表明已經排序好的迭代器。數據在內存中以<Key,Value>的方式保存爲字節組。

主要是內存管理,Sort利用Sorter.javaTimSort.java的邏輯來實現。

 

      1. UnsafeInMemorySorter

內存排序算法,使用Alpha-prefix算法。首先他會用到SortComparator

 

SortComparator

它使用memoryManager來保存帶比較的兩個對象,將對象的基礎對象和位移(offset)傳遞給Sorter作比較,比較的結果爲1,0,-1分別表明是大於、等於仍是小於。

比較的對象包括對象的內存地址和prefix,若是prefix不等則直接根據prefix的結果做爲比較的結果,若是prefix相等,則根據內存地址從memoryManager中取出內存塊的首地址和該對象在內存塊的offset,而後比較兩個內存塊。內存塊表明的具體對象格式本身來定義,若是解析內存塊對象也經過入參recordComparator來指定。

      1. Agg

系列聚合操做的實現。

Dataset.agg的實現,大部分是調用dataset.groupby().agg方法。

groupByKey

先看看groupByKey方法

def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {

    val inputPlan = logicalPlan

    val withGroupingKey = AppendColumns(func, inputPlan)

    val executed = sparkSession.sessionState.executePlan(withGroupingKey)

 

    new KeyValueGroupedDataset(

      encoderFor[K],

      encoderFor[T],

      executed,

      inputPlan.output,

      withGroupingKey.newColumns)

  }

對sql添加一個表示聚合函數的Column,而後生成KeyValueGroupedDataset,這是dataset的子類,表示聚合過以後的dataset。

groupBy

分組函數。

@scala.annotation.varargs

  def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {

    val colNames: Seq[String] = col1 +: cols

    RelationalGroupedDataset(

      toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.GroupByType)

 }

 

RelationalGroupedDataset

groupby操做都是將dataset轉換成RelationalGroupedDataset,經過RelationalGroupedDataset來操做數據的,看看RelationalGroupedDataset的關鍵定義和關鍵方法。

RelationalGroupedDataset對於GroupByType類型的groupby操做,轉換成Aggregate類型的邏輯計劃來執行。轉換代碼以下:

private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {

    val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {

      groupingExprs ++ aggExprs

    } else {

      aggExprs

    }

 

    val aliasedAgg = aggregates.map(alias)

 

    groupType match {

      case RelationalGroupedDataset.GroupByType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.RollupType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.CubeType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.PivotType(pivotCol, values) =>

        val aliasedGrps = groupingExprs.map(alias)

        Dataset.ofRows(

          df.sparkSession, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))

    }

  }

下面再看看裏面比較關鍵的一個方法agg的實現代碼:

/**

   * (Scala-specific) Compute aggregates by specifying the column names and

   * aggregate methods. The resulting `DataFrame` will also contain the grouping columns.

   *

   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.

   * {{{

   *   // Selects the age of the oldest employee and the aggregate expense for each department

   *   df.groupBy("department").agg(

   *     "age" -> "max",

   *     "expense" -> "sum"

   *   )

   * }}}

   *

   * @since 1.3.0

   */

  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {

    toDF((aggExpr +: aggExprs).map { case (colName, expr) =>

      strToExpr(expr)(df(colName).expr)

    })

  }

經過toDF方法將本Dataframe轉換成目標Dataframe,將agg方法放在轉換函數中。其餘各類聚合操做如count,avg,min,max等都是經過調用toDF方法來執行的。toDF方法經過Aggregate來描述邏輯計劃(LogicPlan)。所以研究聚合函數的重心實際上是在研究Aggregate的編程實現上面來。

Aggregate

看看Aggregate的主函數和主要邏輯。它的類名是:org.apache.spark.sql.catalyst.plans.logical.Aggregate,定義在文件catalyst/plans/logical/badicLogicalOperators.scala中。

定義以下:

case class Aggregate(

    groupingExpressions: Seq[Expression],

    aggregateExpressions: Seq[NamedExpression],

    child: LogicalPlan)

  extends UnaryNode {

。。。。

}

再看看對應的Aggregate的物理計劃是怎麼定義的。

object Aggregation extends Strategy

在SparkStrateries中定義如何處理Agg類型的邏輯計劃。

val aggregateOperator =

          if (functionsWithDistinct.isEmpty) {

            aggregate.AggUtils.planAggregateWithoutDistinct(

              groupingExpressions,

              aggregateExpressions,

              resultExpressions,

              planLater(child))

          } else {

            aggregate.AggUtils.planAggregateWithOneDistinct(

              groupingExpressions,

              functionsWithDistinct,

              functionsWithoutDistinct,

              resultExpressions,

              planLater(child))

          }

 

        aggregateOperator

 

經過AggUtils.planAggregateWithoutDistinct方法來構造對應的SparkPlan物理執行計劃。而該planAggregateWithoutDistinct(或planAggregateWithDistinct)方法的主要做用就是判斷是應該選擇哪一種類型的Agg操做,是HashAggregateExec、ObjectHashAggregateExec仍是SortAggregateExec。

咱們看一個HashAggregateExec的例子就好了,其餘的都很相似。

HashAggregateExec

HashAggregateExec的定義和主要方法。

HashAggregateExec用到了TungstenAggregationIterator。

TungstenAggregationIterator

源碼說明摘要:

/**

 * An iterator used to evaluate aggregate functions. It operates on [[UnsafeRow]]s.

 *

 * This iterator first uses hash-based aggregation to process input rows. It uses

 * a hash map to store groups and their corresponding aggregation buffers. If

 * this map cannot allocate memory from memory manager, it spills the map into disk

 * and creates a new one. After processed all the input, then merge all the spills

 * together using external sorter, and do sort-based aggregation.

 *

 * The process has the following step:

 *  - Step 0: Do hash-based aggregation.

 *  - Step 1: Sort all entries of the hash map based on values of grouping expressions and

 *            spill them to disk.

 *  - Step 2: Create an external sorter based on the spilled sorted map entries and reset the map.

 *  - Step 3: Get a sorted [[KVIterator]] from the external sorter.

 *  - Step 4: Repeat step 0 until no more input.

 *  - Step 5: Initialize sort-based aggregation on the sorted iterator.

 * Then, this iterator works in the way of sort-based aggregation.

 *

 * The code of this class is organized as follows:

 *  - Part 1: Initializing aggregate functions.

 *  - Part 2: Methods and fields used by setting aggregation buffer values,

 *            processing input rows from inputIter, and generating output

 *            rows.

 *  - Part 3: Methods and fields used by hash-based aggregation.

 *  - Part 4: Methods and fields used when we switch to sort-based aggregation.

 *  - Part 5: Methods and fields used by sort-based aggregation.

 *  - Part 6: Loads input and process input rows.

 *  - Part 7: Public methods of this iterator.

 *  - Part 8: A utility function used to generate a result when there is no

 *            input and there is no grouping expression.

 *

 * @param groupingExpressions

 *   expressions for grouping keys

 * @param aggregateExpressions

 * [[AggregateExpression]] containing [[AggregateFunction]]s with mode [[Partial]],

 * [[PartialMerge]], or [[Final]].

 * @param aggregateAttributes the attributes of the aggregateExpressions'

 *   outputs when they are stored in the final aggregation buffer.

 * @param resultExpressions

 *   expressions for generating output rows.

 * @param newMutableProjection

 *   the function used to create mutable projections.

 * @param originalInputAttributes

 *   attributes of representing input rows from `inputIter`.

 * @param inputIter

 *   the iterator containing input [[UnsafeRow]]s.

 */

 

    1. 總結:

最終執行都是子類的要麼是buildScana生成RDD[Row],要麼是readFile生成InternalRow的迭代器。

相關文章
相關標籤/搜索