spark sql中的窗口函數

databricks博客給出的窗口函數概述html

Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. The available ranking functions and analytic functions are summarized in the table below. For aggregate functions, users can use any existing aggregate function as a window function.python

窗口函數包含3種:sql

  1. ranking 排名類
  2. analytic 分析類
  3. aggregate 聚合類

ranking 和 analytic 見下表,全部已經存在的聚合類函數(sum、avg、max、min)均可以做爲窗口函數。shell

|Function Type| SQL| DataFrame API| |--|--|--| |Ranking |rank | rank | |Ranking |dense_rank|denseRank| |Ranking |percent_rank |percentRank| |Ranking |ntile|ntile| |Ranking |row_number|rowNumber| |Analytic |cume_dist|cumeDist| |Analytic |first_value |firstValue| |Analytic |last_value |lastValue| |Analytic |lag|lag| |Analytic |lead|lead|express

先用案例說明apache

案例數據:/root/score.json/score.json,學生名字、課程、分數json

{"name":"A","lesson":"Math","score":100}
{"name":"B","lesson":"Math","score":100}
{"name":"C","lesson":"Math","score":99}
{"name":"D","lesson":"Math","score":98}
{"name":"A","lesson":"E","score":100}
{"name":"B","lesson":"E","score":99}
{"name":"C","lesson":"E","score":99}
{"name":"D","lesson":"E","score":98}
./spark-shell --master local #本地啓動spark-shell
import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.hive.HiveContext


    sc.setLogLevel("WARN") // 日誌級別,可不改
    val hiveContext = new HiveContext(sc)

    val df = hiveContext.read.json("file:///root/score.json")
    case class Score(val name: String, val lesson: String, val score: Int)

    df.registerTempTable("score") // 註冊臨時表

    // SQL語句
    val stat = "select".
      concat(" name,lesson,score, ").
      concat(" ntile(2) over (partition by lesson order by score desc ) as ntile_2,").
      concat(" ntile(3) over (partition by lesson order by score desc ) as ntile_3,").
      concat(" row_number() over (partition by lesson order by score desc ) as row_number,").
      concat(" rank() over (partition by lesson order by score desc ) as rank, ").
      concat(" dense_rank() over (partition by lesson order by score desc ) as dense_rank, ").
      concat(" percent_rank() over (partition by lesson order by score desc ) as percent_rank ").
      concat(" from score ").
      concat(" order by lesson,name,score")

    hiveContext.sql(stat).show // 執行語句獲得的結果

/**
 * 用DataFrame API的方式完成相同的功能。
**/
    val window_spec = Window.partitionBy("lesson").orderBy(df("score").desc) // 窗口函數中公用的子句

    df.select(df("name"), df("lesson"), df("score"),
      ntile(2).over(window_spec).as("ntile_2"),
      ntile(3).over(window_spec).as("ntile_3"),
      row_number().over(window_spec).as("row_number"),
      rank().over(window_spec).as("rank"),
      dense_rank().over(window_spec).as("dense_rank"),
      percent_rank().over(window_spec).as("percent_rank")
    ).orderBy("lesson", "name", "score").show
  • 輸出結果徹底同樣,以下表所示
name lesson score ntile_2 ntile_3 row_number rank dense_rank percent_rank
A E 100 1 1 1 1 1 0.0
B E 99 1 1 2 2 2 0.3333333333333333
C E 99 2 2 3 2 2 0.3333333333333333
D E 98 2 3 4 4 3 1.0
A Math 100 1 1 1 1 1 0.0
B Math 100 1 1 2 1 1 0.0
C Math 99 2 2 3 3 2 0.6666666666666666
D Math 98 2 3 4 4 3 1.0
  • rank遇到相同的數據則rank並列,所以rank值多是不連續的
  • dense_rank遇到相同的數據則rank並列,可是rank值必定是連續的
  • row_number 很單純的行號,相似excel的行號,不會由於數據相同而rank的值重複或者有間隔
  • percent_rank = 相同的分組中 (rank -1) / ( count(score) - 1 )
  • ntile(n) 是將同一組數據 循環的往n個 桶中放,返回對應的桶的index,index從1開始。
  • 結合官方博客的python調用dataframe API的寫法可知,scala的寫法幾乎和python的同樣。官方博客的地址見最下面的參考。

上面的案例,每一個分組中全部的數據都參與到窗口函數中計算了。考慮下面一種場景:less

  1. 各科成績 與 該科成績的 最高分、最高分、平均分相差多少。每一行與此行所屬分組聚合後的值再作計算。參與窗口計算的數據是絕對的,就是此行所屬的窗口內的全部數據。
  2. 各科成績按從高到低排序後,比前一名相差多少。每一行與此行的前一行的值相關。參與窗口計算的數據是相對於當前行的。
// 各科成績和最高分、最高分、平均分差多少分
    // 各科成績按從高到低排序後,比前一名差多少分
    val window_clause = Window.partitionBy(df("lesson")).orderBy(df("score").desc)
    val window_spec2 = window_clause.rangeBetween(-Int.MaxValue, Int.MaxValue) // 絕對範圍
    val window_spec3 = window_clause.rowsBetween(-1, 0) // 相對範圍,-1:當前行的前一行,
    df.select(
      df("name"),
      df("lesson"),
      df("score"),
      // 窗口內的第一行的score-當前的行score
      (df("score") - first("score").over(window_spec3)).as("score-last_score"), 

      // 各科成績和最高分、最高分、平均分差多少分
      (min(df("score")).over(window_spec2)).as("min_score"),
      (df("score") - min(df("score")).over(window_spec2)).as("score-min"),
      (max(df("score")).over(window_spec2)).as("max_score"),
      (df("score") - max(df("score")).over(window_spec2)).as("score-max"),
      (avg(df("score")).over(window_spec2)).as("avg_score"),
      (df("score") - avg(df("score")).over(window_spec2)).as("score-avg")
    ).
      orderBy("lesson", "name", "score").
      show
name lesson score score-last_score min_score score-min max_score score-max avg_score score-avg
A E 100 0 98 2 100 0 99.0 1.0
B E 99 -1 98 1 100 -1 99.0 0.0
C E 99 0 98 1 100 -1 99.0 0.0
D E 98 -1 98 0 100 -2 99.0 -1.0
A Math 100 0 98 2 100 0 99.25 0.75
B Math 100 0 98 2 100 0 99.25 0.75
C Math 99 -1 98 1 100 -1 99.25 -0.25
D Math 98 -1 98 0 100 -2 99.25 -1.25

未完待續函數

  • Analytic functions類型的解析
  • 源碼解析

參考:spa

  1. percent_rank
  2. databricks博客
相關文章
相關標籤/搜索