靈活利用Spark窗口函數lead、lag進行在線時長統計

簡介

在數據統中常常須要統計一些時長數據,例如在線時長,這些數據有些比較好統計,有些稍微麻煩一點,例如,根據登陸和退出日誌統計用戶在線時長。java

咱們能夠利用窗口函數lead與lag來完成,很是方便,lead的函數是把某一列數據的後面第n行數據拼接到當前行,lag是把指定列的前面第n行數據拼接到當前行。sql

lag(column,n,default)
lead(column,n,default)

參數column是選擇要拼接的列,參數n表示要移動幾行,通常就移動1行,default是默認值,若是lag前面沒有行,lead後面沒有行就使用默認值。apache

使用這2個函數的關鍵點是:分區和排序api

select  gid, 
        lag(time,1,'0') over (partition by gid order by time) as lag_time, 
        lead(time,1,'0') over (partition by gid order by time) as lead_time
from  table_name;

lead與lag

分區就是分組,使用partition by分組多個列之間用逗號分割app

排序使用order by指定,多個排序列之間使用逗號分割ide

lead和lag組合,可以發揮超出咱們想像的能力。函數

例如,經過登陸退出日誌進行在線時長統計,若是要求不高直接:用戶id分組,時間升序,而後使用lead讓後一個退出時間拼接到當前登陸時間行就輕易能計算了。ui

可是考慮到有跨天的問題、日誌丟失,並不肯定第一個就是登陸日誌,後面的就是退出日誌。this

經過lead和lag組合起來,咱們就能輕易的過濾丟非法的數據。spa

具體代碼

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF6;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.junit.Before;
import org.junit.Test;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.LinkedList;
import java.util.List;

public class SparkLoginTimeTest implements Serializable {

    private SparkSession sparkSession;

    @Before
    public void setUp() {
        sparkSession = SparkSession
                .builder()
                .appName("test")
                .master("local")
                .getOrCreate();
    }

    private static List<Info> getInfos() {
        String[] gids = {"10001","10001","10002","10002","10003","10003","10004","10004","10005","10005"};
        LocalDateTime base = LocalDateTime.of(2020, 1, 1,0,0,0);
        LinkedList<Info> infos = new LinkedList<>();
        for(int i=0;i<50;i++){
            Info info = new Info();
            info.setGid(gids[i%10]);
            info.setResult(i % 2);
            info.setDate(base.plus(i * 5, ChronoUnit.MINUTES).toInstant(ZoneOffset.UTC).toEpochMilli());
            infos.add(info);
        }
        return infos;
    }

    @Test
    public void lag(){
        List<Info> infos = getInfos();
        sparkSession.udf().register("accTimes",accTimes(), DataTypes.LongType);

        Dataset<Info> dataset = sparkSession.createDataset(infos, Encoders.bean(Info.class));
        dataset.show(100);
        dataset.createOrReplaceTempView("temp");

        String sql = "select gid,result,date," +
                "lead(date,1,-1) over(partition by gid order by date) lead_date," +
                "lead(result,1,-1) over(partition by gid order by date) lead_result," +
                "lag(result,1,-1) over(partition by gid order by date) lag_result," +
                "lag(date,1,-1) over(partition by gid order by date) lag_date" +
                " from temp";

        Dataset<Row> baseDs = sparkSession.sql(sql);

        Dataset<Row> rs = baseDs.withColumn("acc_times",
                functions.callUDF("accTimes",
                        baseDs.col("result"),
                        baseDs.col("date"),
                        baseDs.col("lead_result"),
                        baseDs.col("lead_date"),
                        baseDs.col("lag_result"),
                        baseDs.col("lag_date")
                )).groupBy("gid")
                .agg(functions.sum("acc_times").alias("accTimes")).na().fill(0)
                .select("gid", "accTimes");

        rs.show(100);
    }

    private static UDF6<Integer,Long,Integer,Long,Integer,Long,Long> accTimes(){
        return new UDF6<Integer, Long, Integer, Long, Integer, Long, Long>() {
            long dayMill = 86400000;
            @Override
            public Long call(Integer result, Long time, Integer headResult, Long headTime, Integer lagResult, Long lagTime) {
                if(lagResult == -1){//第一行
                    if(result == 1){//退出,計算退出到這一天的開始時間
                        return time - (time / dayMill) * dayMill ;
                    }
                }
                if(headResult == -1){//最後一行
                    if(result == 0){//進入,計算到這一天結束
                        return (time / dayMill + 1) * dayMill - time;
                    }
                }
                if(result == 0 && headResult == 1){//當前行是進入,而且下移行是退出
                    long rs;
                    rs = headTime - time;
                    if(rs > 0) {
                        return rs;
                    }
                }
                return 0L;
            }
        };
    }


    public static class Info implements Serializable {
        /**
         * 用戶惟一標識
         */
        private String gid;
        /**
         * 登陸、退出時間
         */
        private Long date;
        /**
         * 0-登陸、1-退出
         */
        private Integer result;

        public Integer getResult() {
            return result;
        }

        public void setResult(Integer result) {
            this.result = result;
        }

        public String getGid() {
            return gid;
        }

        public void setGid(String gid) {
            this.gid = gid;
        }

        public Long getDate() {
            return date;
        }

        public void setDate(Long date) {
            this.date = date;
        }
    }
}

結果

其餘實例

相關文章
相關標籤/搜索