DataFrame
DataSet
where | filter:sql
這裏咱們推薦使用filter或者where(由於filter不能用 and 和 or) 的第一個API,由於少調了一次sql
解析器,且代碼更清晰,不容易出錯.session
def where(condition: Column): Dataset[T] = filter(condition) def where(conditionExpr: String): Dataset[T] = { filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr))) }
看 where 的兩個 API,發現都調用了 filter 方法,下面咱們演示用法:函數
df.where("sequence==6410 or sequence=6411").show() df.where($"sequence" === 6411 or $"sequence" === 6410).show() df.filter($"sequence" !== 6411).show()
select
| selectExpr
this
注意spa
drop:scala
def drop(colNames: String*): DataFrame = { val resolver = sparkSession.sessionState.analyzer.resolver val allColumns = queryExecution.analyzed.output val remainingCols = allColumns.filter { attribute => colNames.forall(n => !resolver(attribute.name, n)) }.map(attribute => Column(attribute)) if (remainingCols.size == allColumns.size) { toDF() } else { this.select(remainingCols: _*) } }
咱們分析上面drop的源碼,發現其實它調用了select的API.code
orderBy
|sort
:建議用 sortorm
def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*) def sort(sortCol: String, sortCols: String*): Dataset[T] = { sort((sortCol +: sortCols).map(Column(_)) : _*) } @scala.annotation.varargs def sort(sortExprs: Column*): Dataset[T] = { sortInternal(global = true, sortExprs) } private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { val sortOrder: Seq[SortOrder] = sortExprs.map { col => col.expr match { case expr: SortOrder => expr case expr: Expression => SortOrder(expr, Ascending) } } }
groupBy
|cube
|rollup
:我看了下實現是同樣,可是三個的結果科不同額!具體間下面!def groupBy(col1: String, cols: String*): RelationalGroupedDataset = { val colNames: Seq[String] = col1 +: cols RelationalGroupedDataset( toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.GroupByType) } def cube(col1: String, cols: String*): RelationalGroupedDataset = { val colNames: Seq[String] = col1 +: cols RelationalGroupedDataset( toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.CubeType) } def rollup(col1: String, cols: String*): RelationalGroupedDataset = { val colNames: Seq[String] = col1 +: cols RelationalGroupedDataset( toDF(), colNames.map(colName => resolve(colName)), RelationalGroupedDataset.RollupType) }
groupby排序
sequence | gender | group |
---|---|---|
6412 | M | 3 |
6411 | M | 3 |
6410 | M | 3 |
6412 | F | 4 |
6410 | F | 2 |
6411 | F | 3 |
cube:ci
sequence | gender | cube |
---|---|---|
6412 | null | 7 |
6410 | null | 5 |
null | F | 9 |
null | null | 18 |
6410 | F | 2 |
6410 | M | 3 |
6412 | M | 3 |
6411 | F | 3 |
null | M | 9 |
6412 | F | 4 |
6411 | null | 6 |
6411 | M | 3 |
rollup:
sequence | gender | rollup |
---|---|---|
6412 | null | 7 |
6410 | null | 5 |
null | null | 18 |
6410 | F | 2 |
6410 | M | 3 |
6412 | M | 3 |
6411 | F | 3 |
6412 | F | 4 |
6411 | null | 6 |
6411 | M | 3 |
下面咱們將研究一個稍微複雜點的問題,分組計算樣本方差的問題:
傳統實現:
//method1 df.groupBy("sequence").agg(variance("age")).show() //method2 df.createOrReplaceTempView("people") val variances = spark.sql("select sequence,VARIANCE(age) as var,max(age) as max,min(age) as min,count(*) as count from people group by sequence")
下面咱們將研究自已定義udaf
函數來實現這個功能:
join比較簡單,和sql中的join用法同樣,默認是inner join=join.一共4個:
val df1 = spark.createDataFrame(Seq(("a", "foo", 1), ("a", "foo", 3), ("a", "bar", 2), ("a", "car", 4), ("b", "foo", 3), ("b", "car", 8), ("b", "bar", 5), ("b", "bar", 1))).toDF("xb", "y", "z") df1.groupBy('xb).pivot("y").max("z").show() df1.groupBy('xb).pivot("y", List("foo", "car")).max("z").show() df1.groupBy('xb).pivot("y", List("foo", "car")).agg("z" -> "max").show() df1.groupBy('xb, 'y).agg("z" -> "max").show()
上面演示了其用法,下面看下 pivot 的 :API
def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset
注意 pivot必須用在groupBy
以後.
agg函數用得較多,當不能直接調用自定義聚合函數或者相似 variance 這樣的函數的時候,你能夠用 agg 函數來調用.
//交集 df1.intersect(df2).show() //差集 df1.except(df2).show() //並集 df1.union(df2).show() import spark.implicits._ df3.withColumnRenamed("_1", "col1").show() df3.withColumn("newcolum", $"_1").show()
**注意:**有個小技巧,DataFrame若是直接經過Seq建立會默認爲"1,_2......."等列名.
val df4 = spark.createDataFrame(Seq(("a", "65-66-67-68"), ("b", "35-68-37-98"), ("c", "5-60-77-28"))).toDF("stu", "scores") df4.select($"stu", explode(split($"scores", "-"))).show()