spark 累加歷史主要用到了窗口函數,而進行所有統計,則須要用到rollup函數sql
1 應用場景:express
三、須要展示的表格頭如: 產品、2015-0四、2015-0五、2015-06ruby
2 原始數據:函數
product_code |event_date |duration | -------------|-----------|---------| 1438 |2016-05-13 |165 | 1438 |2016-05-14 |595 | 1438 |2016-05-15 |105 | 1629 |2016-05-13 |12340 | 1629 |2016-05-14 |13850 | 1629 |2016-05-15 |227 |
3 業務場景實現spa
3.1 業務場景1:累加歷史:scala
3.1.1 spark-sql實現blog
//spark sql 使用窗口函數累加歷史數據 sqlContext.sql( """ select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date """).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.2 dataframe實現
//使用Column提供的over 函數,傳入窗口操做 import org.apache.spark.sql.expressions._ val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date") $"pcode", $"event_date", sum($"duration").over(first_2_now_window).as("sum_duration") ).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.3 擴展 累加一段時間範圍內
實際業務中的累加邏輯遠比上面複雜,好比,累加以前N天,累加前N天到後N天等等。如下咱們來實現: 累加歷史全部:
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
上邊四種寫法徹底相等 累加N天以前,假設N=3
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0) 累加前N天,後M天: 假設N=3 M=5
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5) 累加該分區內全部行
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
preceding:用於累加前N行(分區以內)。如果從分區第一行頭開始,則爲 unbounded。 N爲:相對當前行向前的偏移量
following :與preceding相反,累加後N行(分區以內)。如果累加到該分區結束,則爲 unbounded。N爲:相對當前行向後的偏移量
current row:顧名思義,當前行,偏移量爲0
說明:上邊的前N,後M,以及current row均會累加該偏移量所在行 實測結果
累加歷史:分區內當天及以前全部 寫法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
累加歷史:分區內當天及以前全部 寫法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
累加當日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 700| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 14077| +-----+----------+------------+
累加當日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 760| | 1438|2016-05-14| 865| | 1438|2016-05-15| 700| | 1629|2016-05-13| 26190| | 1629|2016-05-14| 26417| | 1629|2016-05-15| 14077| +-----+----------+------------+
累加分區內全部:當天和以前以後全部:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
+-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 865| | 1438|2016-05-14| 865| | 1438|2016-05-15| 865| | 1629|2016-05-13| 26417| | 1629|2016-05-14| 26417| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.2 業務場景2:統計所有
3.2.1 spark sql實現
//spark sql 使用rollup添加all統計 sqlContext.sql( """ select pcode,event_date,sum(duration) as sum_duration from userlogs_date_1 group by pcode,event_date with rollup order by pcode,event_date """).show() +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | null| null| 27282| | 1438| null| 865| | 1438|2016-05-13| 165| | 1438|2016-05-14| 595| | 1438|2016-05-15| 105| | 1629| null| 26417| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 13850| | 1629|2016-05-15| 227| +-----+----------+------------+
3.2.2 dataframe函數實現
//使用dataframe提供的rollup函數,進行多維度all統計 df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date") +-----+----------+-------------+ |pcode|event_date|sum(duration)| +-----+----------+-------------+ | null| null| 27282| | 1438| null| 865| | 1438|2016-05-13| 165| | 1438|2016-05-14| 595| | 1438|2016-05-15| 105| | 1629| null| 26417| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 13850| | 1629|2016-05-15| 227| +-----+----------+-------------+
3.3 行轉列 ->pivot
val userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date ") userlogs_date_all.registerTempTable("userlogs_date_all") val dates =$"event_date").map(row => row.getAs[String]("event_date")).distinct().collect().toList userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show +-----------------+-----+----------+----------+----------+----------+ | dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15| +-----------------+-----+----------+----------+----------+----------+ | F2429186| 1438| 0| 0| 227| 0| | AI2342441| 1438| 0| 0| 0| 345| | A320018711| 1438| 0| 939| 0| 0| | H2635817| 1438| 0| 522| 0| 0| | D0288196| 1438| 0| 101| 0| 0| | Y0242218| 1438| 0| 1036| 0| 0| | H2392574| 1438| 0| 0| 689| 0| | D2245588| 1438| 0| 0| 1| 0| | Y2514906| 1438| 0| 0| 118| 4| | H2540419| 1438| 0| 465| 242| 5| | R2231926| 1438| 0| 0| 305| 0| | H2684591| 1438| 0| 136| 0| 0| | A2548470| 1438| 0| 412| 0| 0| | GH000309| 1438| 0| 0| 0| 4| | H2293216| 1438| 0| 0| 0| 534| | R2170601| 1438| 0| 0| 0| 0| |B2365238;B2559538| 1438| 0| 0| 0| 0| | BQ005465| 1438| 0| 0| 642| 78| | AH2180324| 1438| 0| 608| 146| 36| | H0279306| 1438| 0| 490| 0| 0| +-----------------+-----+----------+----------+----------+----------+
def rollup(col1: String, cols: String*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions. This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions). // Compute the average for all numeric columns rolluped by department and group. df.rollup("department", "group").avg() // Compute the max age and average salary, rolluped by department and gender. df.rollup($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" ))
def rollup(cols: Column*): GroupedData Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions. df.rollup($"department", $"group").avg() // Compute the max age and average salary, rolluped by department and gender. df.rollup($"department", $"gender").agg(Map( "salary" -> "avg", "age" -> "max" ))
def over(window: WindowSpec): Column Define a windowing column. val w = Window.partitionBy("name").orderBy("id") sum("price").over(w.rangeBetween(Long.MinValue, 2)), avg("price").over(w.rowsBetween(0, 4)) )