Spark聚合開窗與自定義累加器的高級應用-Spark商業應用實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。git

1 Spark開窗函數與聚合函數

1.1 Spark開窗函數與聚合函數的區別

開窗函數與聚合函數同樣,都是對行的集合組進行聚合計算。可是二者卻有本質區別,待我細細給你道來,絕對讓你震撼無窮。github

  • 開窗函數用於爲行定義一個窗口(這裏的窗口是指運算將要操做的行的集合),它是對一組值進行操做,不須要使用 GROUP BY 子句對數據進行分組,可以在同一行中同時返回基礎行的列和聚合列。極端點說:能夠返回全部行的同時外帶開窗聚合的列。可是 基於GROUP BY 進行聚合是不行的,由於select中不容許出現非GROUP BY 字段。sql

  • 聚合函數則不一樣:不容許同時返回全部列的同時外帶聚合(sum,max 等)多行的列。json

2 Spark聚合開窗函數使用技巧

開窗函數的調用格式爲: 函數名(列) OVER(選項)函數

  • 第一大類:[聚合開窗函數 -> 聚合函數(列) ] OVER (選項),這裏的選項能夠是 PARTITION BY 子句,但不但是 ORDER BY 子句,選項能夠爲空,表示聚合函數向開窗函數的轉換。

2.1 Spark開窗函數使用技巧

  • 聚合開窗函數 OVER 關鍵字 : 表示把聚合函數當成聚合開窗函數而不是聚合函數。post

  • (1)SQL 標準容許將全部聚合函數用作聚合開窗函數。經過over()進行轉換學習

    sparkSession.sql("select name, class, score, count(name) over() name_count from score")
    複製代碼

  • (2)開窗函數的 OVER 關鍵字後括號中的可使用 PARTITION BY 子句來定義行 的分區來供進行聚合計算。經過over(partition by 列 ) 進行分組開窗,此處與 GROUP BY 子句不一樣測試

    sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()
    複製代碼

能夠看到:over只是實現了聚合函數到窗函數的轉換。且不用group by。開窗函數的 OVER 關鍵字後括號中的可使用 PARTITION BY 子句來定義行的分區來供進行聚合計算。與 GROUP BY 子句不一樣,由於GROUP BY不容許同時返回全部列的同時外帶聚合(sum,max 等)多行的的列。

3 Spark排序開窗函數使用技巧

第二大類:[排序開窗函數 -> 排序函數(列) ] OVER(選項),這裏的選項能夠是ORDER BY 子句,也能夠是OVER(PARTITION BY 子句 ORDER BY 子句),但不能夠是 PARTITION BY 子句。大數據

  • 對於排序開窗函數來說,它支持的開窗函數分別爲: ROW_NUMBER(行號)、 RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分組排名)。spa

    sparkSession.sql("select name, class, score, row_number() over(order by score) rank from
      score").show()
    複製代碼

sparkSession.sql("select name, class, score, rank() over(order by score) rank from
    score").show()
複製代碼

sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank from
    score").show()
複製代碼

sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank from
    score").show()
複製代碼

4:用戶自定義聚合函數(UDAF)

4.1 弱類型 UDAF 函數經過繼承 UserDefinedAggregateFunction 來實現用戶自定義聚合函數。

4.2 強類型 UDAF 函數經過繼承 Aggregator 來實現強類型自定義聚合函數。

4:用戶自定義聚合函數(UDF)

4.1 註冊自定義函數

字符串拼接:

spark.udf.register("concat_long_string", 
     (v1: Long, v2: String, split: String) => v1.toString + split + v2)
複製代碼

Json抽取字段值:

spark.udf.register("get_json_object", (json: String, field: String) => {
  val jsonObject = JSONObject.fromObject(json);
  jsonObject.getString(field)})
複製代碼

udaf全數據拼接:

spark.udf.register("group_concat_distinct", new GroupConcatDistinctUDAF())
複製代碼

5 使用自定義函數

6 累加器高級用法

6.1自定義累加器

6.2如何使用累加器

7 總結

本節內容主要探討了開船函數和自定義累加器的高階高級使用案例,可能部分截圖來自github公開源碼,部分是個人測試案例,若有雷同某位大神私有內容,請直接留言於我,我來從新修正案例。

秦凱新 於深圳

相關文章
相關標籤/搜索