【原創】大叔案例分享(3)用戶行爲分析--見證scala的強大

一 場景分析

用戶行爲分析應用的場景不少,像線上網站訪問統計,線下客流分析(好比圖像人臉識別、wifi探針等),比較核心的指標有幾個:session

PV | UV | SD | SC性能

指標說明:測試

PV(Page View):網站瀏覽量或者商場門店的訪問量
UV(Unique Visitor):獨立訪客數,即去重後的人數
SD(Session Duration):單次會話停留時間
SC(Session Count):會話次數網站

 

用戶行爲分析的原始數據一般是一系列時間離散數據,好比網站訪問記錄:用戶在一個時間點訪問了一個網頁,而後又在下個時間點訪問了下個網頁;this

 這些原始數據能夠抽象爲:spa

User | Timestamp | Targetscala

即用戶在什麼時間點訪問了什麼目標;code

 

統計PV、UV比較簡單,可是在時間離散數據的基礎上,要計算SD、SC這兩個指標,經常使用的方式是設置過時時間閾值,若是用戶兩次訪問的時間間隔超過閾值,則認爲是兩次Session;而後在一次Session的全部數據中取時間最先和最晚的數據來統計本次Session Duration;blog

二 統計示例

輸入數據排序

(user1, 2018-12-01 01:00:00, t1)
(user1, 2018-12-01 01:01:30, t1)
(user1, 2018-12-01 01:06:00, t1)
(user1, 2018-12-01 01:20:00, t1)
(user1, 2018-12-01 01:24:00, t1)

能夠統計出

PV=5,UV=1

過時時間閾值設置爲5分鐘,以上數據應該統計出來2次Session,分別是:

Session1: (2018-12-01 01:00:00 到 2018-12-01 01:06:00),Duration:6分鐘
Session2: (2018-12-01 01:20:00 到 2018-12-01 01:24:00),Duration:4分鐘

實際處理時還要數據亂序的問題,尤爲是在實時計算中,你想好怎樣作了嗎?

容易想到的方式是先作group而後將全部的timestamp排序後一次遍歷統計出SD和SC,不過這種方式佔用內存比較大,性能略差,並且只能用在離線計算中。

 

三 代碼實現

下面給出scala實現,來見證scala的強大:

scala核心代碼(一步foldLeft)

scala

  val expireInSecond = 300
  def mergeTimeArray(arr1 : ArrayBuffer[(Long, Long)], arr2 : ArrayBuffer[(Long, Long)]) : ArrayBuffer[(Long, Long)] = {
    if (arr1.head._1.equals(0l)) arr2
    else if (arr2.head._1.equals(0l)) arr1
    else (arr1 ++ arr2).sortBy(_._1).foldLeft(ArrayBuffer[(Long, Long)]())((result, item) => if (!result.isEmpty && result.last._2 + expireInSecond >= item._1) {result.update(result.length - 1, (result.last._1, math.max(result.last._2, item._2))); result} else result += item)
  }

spark核心代碼(2步map 1步aggregateByKey)

scala

  /**
    * @param data (user, timestamp, target)
    * @return (user, target, session_count, session_duration)
    */
  def process(data : RDD[(String, Long, String)]) : RDD[(String, String, Integer, Double)] = {
    //((user, target), timestamp)
    data.map(item => ((item._1, item._3), item._2))
      //((user, target), Array[(startTime, endTime)])
      .aggregateByKey(ArrayBuffer((0l, 0l)))((result : ArrayBuffer[(Long, Long)], timestamp: Long) => mergeTimeArray(result, ArrayBuffer((timestamp, timestamp))), (result1 : ArrayBuffer[(Long, Long)], result2 : ArrayBuffer[(Long, Long)]) => mergeTimeArray(result1, result2))
      //(user, target, session_count, session_duration)
      .map(item => (item._1._1, item._1._2, item._2.length, item._2.foldLeft(0l)((result, item) => result + (item._2 - item._1)).toDouble / item._2.length))
  }

測試運行

  def main(args : Array[String]) : Unit = {
    val conf = new SparkConf().setAppName("UserAnalysis").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val arr = Array(("user1", 1546054000l, "t1"), ("user1", 1546054090l, "t1"), ("user1", 1546054360l, "t1"), ("user1", 1546055200l, "t1"), ("user1", 1546055440l, "t1"))
    //(user, timestamp, target)
    val data : RDD[(String, Long, String)] = sc.parallelize(arr)
    this.process(data).foreach(println)
  }

輸出

(user1,t1,2,300.0)

相關文章
相關標籤/搜索