簡介
在數據統中常常須要統計一些時長數據,例如在線時長,這些數據有些比較好統計,有些稍微麻煩一點,例如,根據登陸和退出日誌統計用戶在線時長。java
咱們能夠利用窗口函數lead與lag來完成,很是方便,lead的函數是把某一列數據的後面第n行數據拼接到當前行,lag是把指定列的前面第n行數據拼接到當前行。sql
lag(column,n,default) lead(column,n,default)
參數column是選擇要拼接的列,參數n表示要移動幾行,通常就移動1行,default是默認值,若是lag前面沒有行,lead後面沒有行就使用默認值。apache
使用這2個函數的關鍵點是:分區和排序api
select gid, lag(time,1,'0') over (partition by gid order by time) as lag_time, lead(time,1,'0') over (partition by gid order by time) as lead_time from table_name;
分區就是分組,使用partition by分組多個列之間用逗號分割app
排序使用order by指定,多個排序列之間使用逗號分割ide
lead和lag組合,可以發揮超出咱們想像的能力。函數
例如,經過登陸退出日誌進行在線時長統計,若是要求不高直接:用戶id分組,時間升序,而後使用lead讓後一個退出時間拼接到當前登陸時間行就輕易能計算了。ui
可是考慮到有跨天的問題、日誌丟失,並不肯定第一個就是登陸日誌,後面的就是退出日誌。this
經過lead和lag組合起來,咱們就能輕易的過濾丟非法的數據。spa
具體代碼
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF6; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.junit.Before; import org.junit.Test; import java.io.Serializable; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.LinkedList; import java.util.List; public class SparkLoginTimeTest implements Serializable { private SparkSession sparkSession; @Before public void setUp() { sparkSession = SparkSession .builder() .appName("test") .master("local") .getOrCreate(); } private static List<Info> getInfos() { String[] gids = {"10001","10001","10002","10002","10003","10003","10004","10004","10005","10005"}; LocalDateTime base = LocalDateTime.of(2020, 1, 1,0,0,0); LinkedList<Info> infos = new LinkedList<>(); for(int i=0;i<50;i++){ Info info = new Info(); info.setGid(gids[i%10]); info.setResult(i % 2); info.setDate(base.plus(i * 5, ChronoUnit.MINUTES).toInstant(ZoneOffset.UTC).toEpochMilli()); infos.add(info); } return infos; } @Test public void lag(){ List<Info> infos = getInfos(); sparkSession.udf().register("accTimes",accTimes(), DataTypes.LongType); Dataset<Info> dataset = sparkSession.createDataset(infos, Encoders.bean(Info.class)); dataset.show(100); dataset.createOrReplaceTempView("temp"); String sql = "select gid,result,date," + "lead(date,1,-1) over(partition by gid order by date) lead_date," + "lead(result,1,-1) over(partition by gid order by date) lead_result," + "lag(result,1,-1) over(partition by gid order by date) lag_result," + "lag(date,1,-1) over(partition by gid order by date) lag_date" + " from temp"; Dataset<Row> baseDs = sparkSession.sql(sql); Dataset<Row> rs = baseDs.withColumn("acc_times", functions.callUDF("accTimes", baseDs.col("result"), baseDs.col("date"), baseDs.col("lead_result"), baseDs.col("lead_date"), baseDs.col("lag_result"), baseDs.col("lag_date") )).groupBy("gid") .agg(functions.sum("acc_times").alias("accTimes")).na().fill(0) .select("gid", "accTimes"); rs.show(100); } private static UDF6<Integer,Long,Integer,Long,Integer,Long,Long> accTimes(){ return new UDF6<Integer, Long, Integer, Long, Integer, Long, Long>() { long dayMill = 86400000; @Override public Long call(Integer result, Long time, Integer headResult, Long headTime, Integer lagResult, Long lagTime) { if(lagResult == -1){//第一行 if(result == 1){//退出,計算退出到這一天的開始時間 return time - (time / dayMill) * dayMill ; } } if(headResult == -1){//最後一行 if(result == 0){//進入,計算到這一天結束 return (time / dayMill + 1) * dayMill - time; } } if(result == 0 && headResult == 1){//當前行是進入,而且下移行是退出 long rs; rs = headTime - time; if(rs > 0) { return rs; } } return 0L; } }; } public static class Info implements Serializable { /** * 用戶惟一標識 */ private String gid; /** * 登陸、退出時間 */ private Long date; /** * 0-登陸、1-退出 */ private Integer result; public Integer getResult() { return result; } public void setResult(Integer result) { this.result = result; } public String getGid() { return gid; } public void setGid(String gid) { this.gid = gid; } public Long getDate() { return date; } public void setDate(Long date) { this.date = date; } } }