window操做和groupBy操做

window操做

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

 

window操做是連續流特有的操做,設置時間窗口大小,根據窗口大小來執行groupBy操做等。session

看看dataset上的groupBy操做。this

groupBy操做

定義:spa

 def groupBy(cols: Column*): RelationalGroupedDataset = {code

    RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)對象

  }ci

生成新的RelationalGroupedDataset對象。該對象最重要得方法:it

 private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {

    val aggregates = if (df.sparkSession.sessionState.conf.dataFrameRetainGroupColumns) {

      groupingExprs ++ aggExprs

    } else {

      aggExprs

    }

    val aliasedAgg = aggregates.map(alias)

    groupType match {

      case RelationalGroupedDataset.GroupByType =>

        Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.RollupType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.CubeType =>

        Dataset.ofRows(

          df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))

      case RelationalGroupedDataset.PivotType(pivotCol, values) =>

        val aliasedGrps = groupingExprs.map(alias)

        Dataset.ofRows(

          df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))

    }

  }

咱們就看一個吧:spark

Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))io

看看它的實現機制是怎樣得?import

這裏得Aggregate是一種LogicPlan,咱們只要看看Aggregate的實現機制就能夠了。

Aggregate的實現機制就要涉及到catalyst包裏的相關類了。

相關文章
相關標籤/搜索