基於Spark的用戶行爲路徑分析

1、研究背景

  互聯網行業愈來愈重視自家客戶的一些行爲偏好了,不管是電商行業仍是金融行業,基於用戶行爲能夠作出不少東西,電商行業能夠概括出用戶偏好爲用戶推薦商品,金融行業能夠把用戶行爲做爲反欺詐的一個點,本文主要介紹其中一個重要的功能點,基於行爲日誌統計用戶行爲路徑,爲運營人員提供更好的運營決策。能夠實現和成熟產品如adobe analysis相似的用戶行爲路徑分析。最終效果如圖。使用的是開源大數據可視化工具。如圖所示,用戶行爲路徑的數據很是巨大,uv指標又不能提早計算好(時間段未定),若是展現5級,一個頁面的數據量就是10的5次方,若是有幾千個頁面,數據量是沒法估量的,因此只能進行實時計算,而Spark很是適合迭代計算,基於這樣的考慮,Spark是不錯的選擇。前端

2、解決方案

1.流程描述

  客戶搜索某一塊兒始頁面的行爲路徑明細數據時,RPC請求到後臺,調用spark-submit腳本啓動spark程序,Spark程序實時計算並返回數據,前端Java解析數據並展示。java

 

2.準備工做

  1.首先要有行爲數據啦,用戶行爲日誌數據必須包含必須包含如下四個字段,訪問時間、設備指紋、會話id、頁面名稱,其中頁面名稱能夠自行定義,用來標示一種或者一類頁面,每次用戶請求的時候上報此字段,服務器端收集並存儲,此頁面名稱最好不要有重複,爲後續分析打下基礎。node

  2.而後對行爲日誌進行一級清洗(基於Hive),將數據統一清洗成以下格式。設備指紋是我另外一個研究的項目,還沒時間貼出來。會話id就是能夠定義一個會話超時時間,即20分鐘用戶若是沒有任何動做,等20分鐘事後再點擊頁面就認爲這是下個一會話id,可經過cookie來控制此會話id。正則表達式

設備指紋 會話id 頁面路徑(按時間升序 時間
fpid1 sessionid1 A_B_C_D_E_F_G 2017-01-13

 

A、B、C表明頁面名稱,清洗過程採用row_number函數,concat_ws函數,具體用法能夠百度。清洗完以後落地到hive表,後續會用到。T+1清洗此數據。算法

  3.弄清楚遞歸的定義sql

  遞歸算法是一種直接或者間接調用自身函數或者方法的算法。Java遞歸算法是基於Java語言實現的遞歸算法。遞歸算法的實質是把問題分解成規模縮小的同類問題的子問題,而後遞歸調用方法來表示問題的解。遞歸算法對解決一大類問題頗有效,它可使算法簡潔和易於理解。遞歸算法,其實說白了,就是程序的自身調用。它表如今一段程序中每每會遇到調用自身的那樣一種coding策略,這樣咱們就能夠利用大道至簡的思想,把一個大的複雜的問題層層轉換爲一個小的和原問題類似的問題來求解的這樣一種策略。遞歸每每能給咱們帶來很是簡潔很是直觀的代碼形勢,從而使咱們的編碼大大簡化,然而遞歸的思惟確實很咱們的常規思惟相逆的,咱們一般都是從上而下的思惟問題, 而遞歸趨勢從下往上的進行思惟。這樣咱們就能看到咱們會用不多的語句解決了很是大的問題,因此遞歸策略的最主要體現就是小的代碼量解決了很是複雜的問題。apache

  遞歸算法解決問題的特色:   json

  1)遞歸就是方法裏調用自身。   緩存

  2)在使用遞增歸策略時,必須有一個明確的遞歸結束條件,稱爲遞歸出口。    
  3)遞歸算法解題一般顯得很簡潔,但遞歸算法解題的運行效率較低。因此通常不提倡用遞歸算法設計程序。
  4)在遞歸調用的過程中系統爲每一層的返回點、局部量等開闢了棧來存儲。遞歸次數過多容易形成棧溢出等,因此通常不提倡用遞歸算法設計程序。
服務器

      在作遞歸算法的時候,必定要把握住出口,也就是作遞歸算法必需要有一個明確的遞歸結束條件。這一點是很是重要的。其實這個出口是很是好理解的,就是一個條件,當知足了這個條件的時候咱們就再也不遞歸了。

  4.多叉樹的基本知識

3、Spark處理

流程概述:

1.構建一個多叉樹的類,類主要屬性描述,name全路徑如A_B_C,childList兒子鏈表,多叉樹的構建和遞歸參考了這裏

2.按時間範圍讀取上一步預處理的數據,遞歸計算每一級頁面的屬性指標,並根據頁面路徑插入到初始化的Node類根節點中。

3.遞歸遍歷上一步初始化的根節點對象,並替換其中的name的id爲名稱,其中藉助Spark DataFrame查詢數據。

4.將root對象轉化成json格式,返回前端。

附上代碼以下。

import java.util

import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger => LG}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext

/**
  * 用戶行爲路徑實時計算實現
  * Created by chouyarn on 2016/12/12.
  */

/**
  * 樹結構類
  *
  * @param name      頁面路徑
  * @param visit     訪次
  * @param pv        pv
  * @param uv        uv
  * @param childList 兒子鏈表
  */
class Node(
            var name: String,
            var path:Any,
            var visit: Any,
            var pv: Any,
            var uv: Any,
            var childList: util.ArrayList[Node]) extends Serializable {
  /**
    * 添加子節點
    *
    * @param node 子節點對象
    * @return
    */
  def addNode(node: Node) = {
    childList.add(node)
  }

  /**
    * 遍歷節點,深度優先
    */
  def traverse(): Unit = {
    if (childList.isEmpty)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last//去除前邊絕對路徑
      child.traverse()
    }
  }

  /**
    * 遍歷節點,深度優先
    */
  def traverse(pages:DataFrame): Unit = {
    if (childList.isEmpty||childList.size()==0)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last
      val id =pages.filter("page_id='"+child.name+"'").select("page_name").first().getString(0)//替換id爲name
      child.name = id
      child.traverse(pages)
    }
  }

  /**
    * 動態插入節點
    *
    * @param node 節點對象
    * @return
    */
  def insertNode(node: Node): Boolean = {
    val insertName = node.name
    if (insertName.stripSuffix("_" + insertName.split("_").last).equals(name)) {
      //      node.name=node.name.split("_").last
      addNode(node)
      true
    } else {
      val childList1 = childList
      val childNum = childList1.size
      var insetFlag = false
      for (i <- 0 to childNum - 1) {
        val childNode = childList1.get(i)
        insetFlag = childNode.insertNode(node)
        if (insetFlag == true)
          true
      }
      false
    }
  }
}

/**
  * 處理類
  */
class Path extends CleanDataWithRDD {
  LG.getRootLogger.setLevel(Level.ERROR)//控制spark日誌輸出級別

  val sc: SparkContext = SparkUtil.createSparkContextYarn("path")
  val hiveContext = new HiveContext(sc)

  override def handleData(conf: Map[String, String]): Unit = {

    val num = conf.getOrElse("depth", 5)//路徑深度
    val pageName = conf.getOrElse("pageName", "")//頁面名稱
    //    val pageName = "A_C"
    val src = conf.getOrElse("src", "")//標示來源pc or wap

    val pageType = conf.getOrElse("pageType", "")//向前或者向後路徑
    val startDate = conf.getOrElse("startDate", "")//開始日期
    val endDate = conf.getOrElse("endDate", "")//結束日期
    //        保存log緩存以保證後續使用
    val log = hiveContext.sql(s"select fpid,sessionid,path " +
      s"from specter.t_pagename_path_sparksource " +
      s"where day between '$startDate' and '$endDate' and path_type=$pageType and src='$src' ")
      .map(s => {
        (s.apply(0) + "_" + s.apply(1) + "_" + s.apply(2))
      }).repartition(10).persist()

    val pages=hiveContext.sql("select page_id,page_name from specter.code_pagename").persist()//緩存頁面字典表
    // 本地測試數據
    // val log = sc.parallelize(Seq("fpid1_sessionid1_A_B",
    //      "fpid2_sessionid2_A_C_D_D_B_A_D_A_F_B",
    //      "fpid1_sessionid1_A_F_A_C_D_A_B_A_V_A_N"))
    var root: Node = null
    /**
      * 遞歸將計算的節點放入樹結構
      *
      * @param pageName 頁面名稱
      */
    def compute(pageName: String): Unit = {
      val currenRegex = pageName.r //頁面的正則表達式
      val containsRdd = log.filter(_.contains(pageName)).persist() //包含頁面名稱的RDD,後續步驟用到
      val currentpv = containsRdd.map(s => {//計算pv
        currenRegex findAllIn (s)
      }).map(_.mkString(","))
        .flatMap(_.toString.split(","))
        .filter(_.size > 0)
        .count()

      val tempRdd = containsRdd.map(_.split("_")).persist() //分解後的RDD
      val currentuv = tempRdd.map(_.apply(0)).distinct().count() //頁面uv
      val currentvisit = tempRdd.map(_.apply(1)).distinct().count() //頁面訪次

      //      初始化根節點或添加節點
      if (root == null) {
        root = new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]())
      } else {
        root.insertNode(new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]()))
      }

      if (pageName.split("_").size == 5||tempRdd.isEmpty()) {//遞歸出口
        return
      } else {
        //          肯定下個頁面名稱正則表達式
        val nextRegex =
        s"""${pageName}_[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}""".r
        // 本地測試       
        // val nextRegex =s"""${pageName}_[A-Z]""".r
        val nextpvMap = containsRdd.map(s => {//下一級路徑的pv數top9
          nextRegex findAllIn (s)
        }).map(_.mkString(","))
          .flatMap(_.toString.split(","))
          .filter(_.size > 0)
          .map(s => (s.split("_").last, 1))
          .filter(!_._1.contains(pageName.split("_")(0)))
          .reduceByKey(_ + _).sortBy(_._2, false).take(9).toMap

        nextpvMap.keySet.foreach(key => {//遞歸計算
          compute(pageName + "_" + key)
        })
      }
    }
    //觸發計算
    compute(pageName)
    val gson: Gson = new Gson()

    root.traverse(pages)
    root.name=pages.filter("page_id='"+pageName+"'").select("page_name").first().getString(0)
    println(gson.toJson(root))//轉化成JSON並打印,Alibaba fsatjson不可用,仍是google得厲害。
  }

  override def stop(): Unit = {
    sc.stop()
  }
}

object Path {
  def main(args: Array[String]): Unit = {
    //    println("ss".hashCode)
    var num=5
    try {
      num=args(5).toInt
    }catch {
      case e:Exception =>
    }

    val map = Map("pageName" -> args(0),
      "pageType" -> args(1),
      "startDate" -> args(2),
      "endDate" -> args(3),
      "src" -> args(4),
      "depth" -> num.toString)
    val path = new Path()
    path.handleData(map)
  }
}

4、總結

  Spark基本是解決了實時計算行爲路徑的問題,缺點就是延遲稍微有點高,由於提交Job以後要向集羣申請資源,申請資源和啓動就耗費將近30秒,後續這塊能夠優化。聽說spark-jobserver提供一個restful接口,爲Job預啓動容器,博主沒時間研究有興趣的能夠研究下啦。

  fastjson在對複雜對象的轉換中不如Google 的Gson。

  使用遞歸要慎重,要特別注意出口條件,若出口不明確,頗有可能致使死循環。

相關文章
相關標籤/搜索