版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。算法
頁面轉化率的求解思路是經過UserAction表獲取一個session的全部UserAction,根據時間順序排序後獲取所有PageId 而後將PageId組合成PageFlow,即1,2,3,4,5的形式(按照時間順序排列),以後,組合爲1_2, 2_3, 3_4, ...的形式 而後篩選出出如今targetFlow中的全部A_Bsql
每一個A_B進行數量統計,而後統計startPage的PV,以後根據targetFlow的A_B順序,計算每一層的轉化率數據庫
// 任務的執行ID,用戶惟一標示運行後的結果,用在MySQL數據庫中
val taskUUID = UUID.randomUUID().toString
// 構建Spark上下文
val sparkConf = new SparkConf().setAppName("SessionAnalyzer").setMaster("local[*]")
// 建立Spark客戶端
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
複製代碼
查詢指定日期範圍內的用戶訪問行爲數據
val actionRDD = this.getActionRDDByDateRange(spark, taskParam)
def getActionRDDByDateRange(spark:SparkSession, taskParam:JSONObject): RDD[UserVisitAction] = {
val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
import spark.implicits._
spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'")
.as[UserVisitAction].rdd
}
複製代碼
將用戶行爲信息轉換爲 K-V 結構
val sessionid2actionRDD = actionRDD.map(item => (item.session_id, item))
將數據進行內存緩存
sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY)
對<sessionid,訪問行爲> RDD,作一次groupByKey操做,生成頁面切片
val sessionid2actionsRDD = sessionid2actionRDD.groupByKey()
複製代碼
val pageSplitRDD = generateAndMatchPageSplit(sc, sessionid2actionsRDD, taskParam)
def generateAndMatchPageSplit(sc:SparkContext, sessionid2actionsRDD:RDD[(String, Iterable[UserVisitAction])], taskParam:JSONObject ):RDD[(String, Int)] = {
/* 對目標PageFlow進行解析 */
//1,2,3,4,5,6,7
val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
//將字符串轉換成爲了List[String]
val targetPages = targetPageFlow.split(",").toList
//targetPages.slice(0, targetPages.length-1) :[1,2,3,4,5,6]
//targetPages.tail :[2,3,4,5,6,7]
//targetPages.slice(0, targetPages.length-1).zip(targetPages.tail):(1,2)(2,3)(3,4)(4,5)(5,6)(6,7)
//map(item => item._1 + "_" + item._2):(1_2,2_3,3_4,4_5,5_6,6_7)
val targetPagePairs = targetPages.slice(0, targetPages.length-1).zip(targetPages.tail).map(item => item._1 + "_" + item._2)
//將結果轉換爲廣播變量
//targetPagePairs類型爲List[String]
val targetPageFlowBroadcast = sc.broadcast(targetPagePairs)
/* 對全部PageFlow進行解析 */
// 對所有數據進行處理
sessionid2actionsRDD.flatMap{ case (sessionid, userVisitActions) =>
// 獲取使用者指定的頁面流
// 使用者指定的頁面流,1,2,3,4,5,6,7
// 1->2的轉化率是多少?2->3的轉化率是多少?
// 這裏,咱們拿到的session的訪問行爲,默認狀況下是亂序的
// 好比說,正常狀況下,咱們但願拿到的數據,是按照時間順序排序的
// 可是問題是,默認是不排序的
// 因此,咱們第一件事情,對session的訪問行爲數據按照時間進行排序
// 舉例,反例
// 好比,3->5->4->10->7
// 3->4->5->7->10
// userVisitActions是Iterable[UserAction],toList.sortWith將Iterable中的全部UserAction按照時間進行排序
// 按照時間排序
val sortedUVAs = userVisitActions.toList.sortWith((uva1, uva2) => DateUtils.parseTime(uva1.action_time).getTime() < DateUtils.parseTime(uva2.action_time).getTime())
// 提取全部UserAction中的PageId信息
val soredPages = sortedUVAs.map(item => if(item.page_id != null) item.page_id)
//【注意】頁面的PageFlow是將session的全部UserAction按照時間順序排序後提取PageId,再將PageId進行鏈接獲得的
// 按照已經排好的順序對PageId信息進行整合,生成全部頁面切片:(1_2,2_3,3_4,4_5,5_6,6_7)
val sessionPagePairs = soredPages.slice(0, soredPages.length-1).zip(soredPages.tail).map(item => item._1 + "_" + item._2)
/* 由此,獲得了當前session的PageFlow */
// 只要是當前session的PageFlow有一個切片與targetPageFlow中任一切片重合,那麼就保留下來
// 目標:(1_2,2_3,3_4,4_5,5_6,6_7) 當前:(1_2,2_5,5_6,6_7,7_8)
// 最後保留:(1_2,5_6,6_7)
// 輸出:(1_2, 1) (5_6, 1) (6_7, 1)
sessionPagePairs.filter(targetPageFlowBroadcast.value.contains(_)).map((_,1))
}
複製代碼
}緩存
// 返回:(1_2, 1),(3_4, 1), ..., (100_101, 1)
// 統計每一個跳轉切片的總個數
// pageSplitPvMap:(1_2, 102320), (3_4, 90021), ..., (100_101, 45789)
val pageSplitPvMap = pageSplitRDD.countByKey
複製代碼
// 使用者指定的頁面流是3,2,5,8,6
// 我們如今拿到的這個pageSplitPvMap,3->2,2->5,5->8,8->6
// 首先計算首頁PV的數量
val startPagePv = getStartPagePv(taskParam, sessionid2actionsRDD)
def getStartPagePv(taskParam:JSONObject, sessionid2actionsRDD:RDD[(String, Iterable[UserVisitAction])]) :Long = {
// 獲取配置文件中的targetPageFlow
val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
// 獲取起始頁面ID
val startPageId = targetPageFlow.split(",")(0).toLong
// sessionid2actionsRDD是聚合後的用戶行爲數據
// userVisitAction中記錄的是在一個頁面中的用戶行爲數據
val startPageRDD = sessionid2actionsRDD.flatMap{ case (sessionid, userVisitActions) =>
// 過濾出全部PageId爲startPageId的用戶行爲數據
userVisitActions.filter(_.page_id == startPageId).map(_.page_id)
}
// 對PageId等於startPageId的用戶行爲數據進行技術
startPageRDD.count()
複製代碼
}session
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。dom
計算目標頁面流的各個頁面切片的轉化率
val convertRateMap = computePageSplitConvertRate(taskParam, pageSplitPvMap, startPagePv)
def computePageSplitConvertRate(taskParam:JSONObject, pageSplitPvMap:collection.Map[String, Long], startPagePv:Long):collection.Map[String, Double] = {
val convertRateMap = new mutable.HashMap[String, Double]()
//1,2,3,4,5,6,7
val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
val targetPages = targetPageFlow.split(",").toList
//(1_2,2_3,3_4,4_5,5_6,6_7)
val targetPagePairs = targetPages.slice(0, targetPages.length-1).zip(targetPages.tail).map(item => item._1 + "_" + item._2)
// lastPageSplitPv:存儲最新一次的頁面PV數量
var lastPageSplitPv = startPagePv.toDouble
// 3,5,2,4,6
// 3_5
// 3_5 pv / 3 pv
// 5_2 rate = 5_2 pv / 3_5 pv
// 經過for循環,獲取目標頁面流中的各個頁面切片(pv)
for(targetPage <- targetPagePairs){
// 先獲取pageSplitPvMap中記錄的當前targetPage的數量
val targetPageSplitPv = pageSplitPvMap.get(targetPage).get.toDouble
println((targetPageSplitPv, lastPageSplitPv))
// 用當前targetPage的數量除以上一次lastPageSplit的數量,獲得轉化率
val convertRate = NumberUtils.formatDouble(targetPageSplitPv / lastPageSplitPv, 2)
// 對targetPage和轉化率進行存儲
convertRateMap.put(targetPage, convertRate)
// 將本次的targetPage做爲下一次的lastPageSplitPv
lastPageSplitPv = targetPageSplitPv
}
convertRateMap
}
複製代碼
persistConvertRate(spark, taskUUID, convertRateMap)
def persistConvertRate(spark:SparkSession, taskid:String, convertRateMap:collection.Map[String, Double]) {
val convertRate = convertRateMap.map(item => item._1 + "=" + item._2).mkString("|")
val pageSplitConvertRateRDD = spark.sparkContext.makeRDD(Array(PageSplitConvertRate(taskid,convertRate)))
import spark.implicits._
pageSplitConvertRateRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "page_split_convert_rate")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
複製代碼
}ui
溫故而知新,本文爲了綜合複習,進行代碼總結,內容粗鄙,勿怪this
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。url
秦凱新 於深圳spa