spark 累加歷史 + 統計所有 + 行轉列

spark 累加歷史主要用到了窗口函數,而進行所有統計,則須要用到rollup函數sql

1  應用場景:express

  一、咱們須要統計用戶的總使用時長(累加歷史)apache

  二、前臺展示頁面須要對多個維度進行查詢,如:產品、地區等等api

  三、須要展示的表格頭如: 產品、2015-0四、2015-0五、2015-06ruby

2 原始數據:函數

product_code |event_date |duration |
-------------|-----------|---------|
1438         |2016-05-13 |165      |
1438         |2016-05-14 |595      |
1438         |2016-05-15 |105      |
1629         |2016-05-13 |12340    |
1629         |2016-05-14 |13850    |
1629         |2016-05-15 |227      |

3 業務場景實現spa

3.1 業務場景1:累加歷史:scala

如數據源所示:咱們已經有當天用戶的使用時長,咱們指望在進行統計的時候,14號能累加13號的,15號能累加1四、13號的,以此類推code

3.1.1 spark-sql實現blog

//spark sql 使用窗口函數累加歷史數據
sqlContext.sql(
"""
  select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
  from userlogs_date
""").show
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.1.2 dataframe實現

 

//使用Column提供的over 函數,傳入窗口操做
import org.apache.spark.sql.expressions._
val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
df_userlogs_date.select(
    $"pcode",
    $"event_date",
    sum($"duration").over(first_2_now_window).as("sum_duration")
).show

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

 3.1.3 擴展 累加一段時間範圍內

實際業務中的累加邏輯遠比上面複雜,好比,累加以前N天,累加前N天到後N天等等。如下咱們來實現:

 3.1.3.1 累加歷史全部:

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")
上邊四種寫法徹底相等

3.1.3.2 累加N天以前,假設N=3
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0

  3.1.3.3 累加前N天,後M天: 假設N=3 M=5 

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
3.1.3.4 累加該分區內全部行
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,Long.MaxValue)

 總結以下:

preceding:用於累加前N行(分區以內)。如果從分區第一行頭開始,則爲 unbounded。 N爲:相對當前行向前的偏移量
following :與preceding相反,累加後N行(分區以內)。如果累加到該分區結束,則爲 unbounded。N爲:相對當前行向後的偏移量
current row:顧名思義,當前行,偏移量爲0
說明:上邊的前N,後M,以及current row均會累加該偏移量所在行

3.1.3.4 實測結果
累加歷史:分區內當天及以前全部 寫法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
 
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
累加歷史:分區內當天及以前全部 寫法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
累加當日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
+-----+----------+------------+
累加當日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
+-----+----------+------------+
累加分區內全部:當天和以前以後全部select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
3.2 業務場景2:統計所有

3.2.1 spark sql實現

//spark sql 使用rollup添加all統計
sqlContext.sql(
"""
  select pcode,event_date,sum(duration) as sum_duration
  from userlogs_date_1
  group by pcode,event_date with rollup
  order by pcode,event_date
""").show()

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|
+-----+----------+------------+

3.2.2 dataframe函數實現

//使用dataframe提供的rollup函數,進行多維度all統計
df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date")

+-----+----------+-------------+                                                
|pcode|event_date|sum(duration)|
+-----+----------+-------------+
| null|      null|        27282|
| 1438|      null|          865|
| 1438|2016-05-13|          165|
| 1438|2016-05-14|          595|
| 1438|2016-05-15|          105|
| 1629|      null|        26417|
| 1629|2016-05-13|        12340|
| 1629|2016-05-14|        13850|
| 1629|2016-05-15|          227|
+-----+----------+-------------+

  3.3 行轉列 ->pivot

 

 

pivot目前尚未sql語法,先用df語法吧
val userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date ")
userlogs_date_all.registerTempTable("userlogs_date_all")
val dates = userlogs_date_all.select($"event_date").map(row => row.getAs[String]("event_date")).distinct().collect().toList
userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show

+-----------------+-----+----------+----------+----------+----------+
|            dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|
+-----------------+-----+----------+----------+----------+----------+
|         F2429186| 1438|         0|         0|       227|         0|
|        AI2342441| 1438|         0|         0|         0|       345|
|       A320018711| 1438|         0|       939|         0|         0|
|         H2635817| 1438|         0|       522|         0|         0|
|         D0288196| 1438|         0|       101|         0|         0|
|         Y0242218| 1438|         0|      1036|         0|         0|
|         H2392574| 1438|         0|         0|       689|         0|
|         D2245588| 1438|         0|         0|         1|         0|
|         Y2514906| 1438|         0|         0|       118|         4|
|         H2540419| 1438|         0|       465|       242|         5|
|         R2231926| 1438|         0|         0|       305|         0|
|         H2684591| 1438|         0|       136|         0|         0|
|         A2548470| 1438|         0|       412|         0|         0|
|         GH000309| 1438|         0|         0|         0|         4|
|         H2293216| 1438|         0|         0|         0|       534|
|         R2170601| 1438|         0|         0|         0|         0|
|B2365238;B2559538| 1438|         0|         0|         0|         0|
|         BQ005465| 1438|         0|         0|       642|        78|
|        AH2180324| 1438|         0|       608|       146|        36|
|         H0279306| 1438|         0|       490|         0|         0|
+-----------------+-----+----------+----------+----------+----------+

 

 

附錄

下面是這兩個函數的官方api說明:

org.apache.spark.sql.scala
def rollup(col1: String, cols: String*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).

// Compute the average for all numeric columns rolluped by department and group.
df.rollup("department", "group").avg()

// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))
def rollup(cols: Column*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
df.rollup($"department", $"group").avg()

// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))
org.apache.spark.sql.Column.scala
def over(window: WindowSpec): Column
Define a windowing column.

val w = Window.partitionBy("name").orderBy("id")
df.select(
  sum("price").over(w.rangeBetween(Long.MinValue, 2)),
  avg("price").over(w.rowsBetween(0, 4))
)
相關文章
相關標籤/搜索