Spark DataFrame API

DataFrame

過濾: 注意這些API最後都轉換爲了DataSet

  1. where | filter:sql

    這裏咱們推薦使用filter或者where(由於filter不能用 and 和 or) 的第一個API,由於少調了一次sql解析器,且代碼更清晰,不容易出錯.session

def where(condition: Column): Dataset[T] = filter(condition)
   def where(conditionExpr: String): Dataset[T] = {
       filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))
   }

看 where 的兩個 API,發現都調用了 filter 方法,下面咱們演示用法:函數

df.where("sequence==6410 or sequence=6411").show()
df.where($"sequence" === 6411 or $"sequence" === 6410).show()
df.filter($"sequence" !== 6411).show()
  1. limit: 這個 API 簡單,可是注意他是transformation 級別的API,和 take及head action級別的API同樣的效果 .

選擇或切片:

  1. select | selectExprthis

    注意spa

  2. drop:scala

    def drop(colNames: String*): DataFrame = {
        val resolver = sparkSession.sessionState.analyzer.resolver
        val allColumns = queryExecution.analyzed.output
        val remainingCols = allColumns.filter { attribute =>
          colNames.forall(n => !resolver(attribute.name, n))
        }.map(attribute => Column(attribute))
        if (remainingCols.size == allColumns.size) {
          toDF()
        } else {
          this.select(remainingCols: _*)
        }
      }

    咱們分析上面drop的源碼,發現其實它調用了select的API.code


排序

  1. orderBy |sort:建議用 sortorm

    def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)
    def sort(sortCol: String, sortCols: String*): Dataset[T] = {
        sort((sortCol +: sortCols).map(Column(_)) : _*)
    }
    @scala.annotation.varargs
    def sort(sortExprs: Column*): Dataset[T] = {
        sortInternal(global = true, sortExprs)
    }
    private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
        val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
          col.expr match {
            case expr: SortOrder =>
              expr
            case expr: Expression =>
              SortOrder(expr, Ascending)
          }
        }
    }

集聚

  1. groupBy|cube|rollup:我看了下實現是同樣,可是三個的結果科不同額!具體間下面!
def groupBy(col1: String, cols: String*): RelationalGroupedDataset = {
    val colNames: Seq[String] = col1 +: cols
    RelationalGroupedDataset(
      toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.GroupByType)
  }
def cube(col1: String, cols: String*): RelationalGroupedDataset = {
    val colNames: Seq[String] = col1 +: cols
    RelationalGroupedDataset(
      toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.CubeType)
  }
 def rollup(col1: String, cols: String*): RelationalGroupedDataset = {
    val colNames: Seq[String] = col1 +: cols
    RelationalGroupedDataset(
      toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.RollupType)
  }

groupby排序

sequence gender group
6412 M 3
6411 M 3
6410 M 3
6412 F 4
6410 F 2
6411 F 3

cube:ci

sequence gender cube
6412 null 7
6410 null 5
null F 9
null null 18
6410 F 2
6410 M 3
6412 M 3
6411 F 3
null M 9
6412 F 4
6411 null 6
6411 M 3

rollup:

sequence gender rollup
6412 null 7
6410 null 5
null null 18
6410 F 2
6410 M 3
6412 M 3
6411 F 3
6412 F 4
6411 null 6
6411 M 3

下面咱們將研究一個稍微複雜點的問題,分組計算樣本方差的問題:

傳統實現:

//method1
df.groupBy("sequence").agg(variance("age")).show()
//method2
df.createOrReplaceTempView("people")
val variances = spark.sql("select sequence,VARIANCE(age) as var,max(age) as max,min(age) as min,count(*) as count from people group by sequence")

下面咱們將研究自已定義udaf函數來實現這個功能:


join

join比較簡單,和sql中的join用法同樣,默認是inner join=join.一共4個:

  • inner join
  • left join
  • right join
  • full join

pivot & agg

val df1 = spark.createDataFrame(Seq(("a", "foo", 1), ("a", "foo", 3), ("a", "bar", 2), ("a", "car", 4), ("b", "foo", 3), ("b", "car", 8), ("b", "bar", 5), ("b", "bar", 1))).toDF("xb", "y", "z")
df1.groupBy('xb).pivot("y").max("z").show()
df1.groupBy('xb).pivot("y", List("foo", "car")).max("z").show()
df1.groupBy('xb).pivot("y", List("foo", "car")).agg("z" -> "max").show()
df1.groupBy('xb, 'y).agg("z" -> "max").show()

上面演示了其用法,下面看下 pivot 的 :API

def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset

注意 pivot必須用在groupBy以後.

agg函數用得較多,當不能直接調用自定義聚合函數或者相似 variance 這樣的函數的時候,你能夠用 agg 函數來調用.

交集,並集,差集

//交集
    df1.intersect(df2).show()
    //差集
    df1.except(df2).show()
    //並集
    df1.union(df2).show()

	import spark.implicits._
    df3.withColumnRenamed("_1", "col1").show()
    df3.withColumn("newcolum", $"_1").show()

**注意:**有個小技巧,DataFrame若是直接經過Seq建立會默認爲"1,_2......."等列名.

explode 行轉列,好比把一個List換成好幾行

val df4 = spark.createDataFrame(Seq(("a", "65-66-67-68"), ("b", "35-68-37-98"), ("c", "5-60-77-28"))).toDF("stu", "scores")
df4.select($"stu", explode(split($"scores", "-"))).show()
相關文章
相關標籤/搜索