版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。程序員
cache()和 persist()的區別在於, cache()是 persist()的一種簡化方式, cache()的底 層就是調用的 persist()的無參版本,同時就是調用 persist(MEMORY_ONLY),將輸 入持久化到內存中。若是須要從內存中清除緩存,那麼可使用 unpersist()方法。算法
廣播變量容許程序員在每一個機器上保留緩存的只讀變量,而不是給每一個任務發 送一個副本。 例如,可使用它們以有效的方式爲每一個節點提供一個大型輸入數據 集的副本。 Spark 還嘗試使用高效的廣播算法分發廣播變量,以下降通訊成本。sql
Spark 提供的 Broadcast Variable 是隻讀的,而且在每一個節點上只會有一個副本, 而不會爲每一個 task 都拷貝一份副本,所以, 它的最大做用,就是減小變量到各個節 點的網絡傳輸消耗,以及在各個節點上的內存消耗。此外, Spark 內部也使用了高效 的廣播算法來減小網絡消耗。數據庫
累加器(accumulator): Accumulator 是僅僅被相關操做累加的變量,所以能夠 在並行中被有效地支持。它們可用於實現計數器(如 MapReduce)或總和計數。 Accumulator 是存在於 Driver 端的,從節點不斷把值發到 Driver 端,在 Driver 端計數(Spark UI 在 SparkContext 建立時被建立,即在 Driver 端被建立,所以它可 以讀取 Accumulator 的數值), 累加器是存在於 Driver 端的一個值,從節點是讀取不到的。apache
Spark 提供的 Accumulator 主要用於多個節點對一個變量進行共享性的操做。 Accumulator 只提供了累加的功能,可是卻給咱們提供了多個 task 對於同一個變量 並行操做的功能,可是 task 只能對 Accumulator 進行累加操做,不能讀取它的值, 只有 Driver 程序能夠讀取 Accumulator 的值。編程
DataFrame保存到Hive表中json
// 1:ArrayBuffer[ProductInfo]生成
private def mockProductInfo(): Array[ProductInfo] = {
val rows = ArrayBuffer[ProductInfo]()
val random = new Random()
val productStatus = Array(0, 1)
for (i <- 0 to 100) {
val productId = i
val productName = "product" + i
val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}"
rows += ProductInfo(productId, productName, extendInfo)
}
rows.toArray
}
// 2:模擬數據
val userVisitActionData = this.mockUserVisitActionData()
val userInfoData = this.mockUserInfo()
val productInfoData = this.mockProductInfo()
// 3:將模擬數據裝換爲RDD
val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)
// 4:加載SparkSQL的隱式轉換支持
import spark.implicits._
// 5:將用戶訪問數據裝換爲DF保存到Hive表中
val userVisitActionDF = userVisitActionRdd.toDF()
insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)
// 6:將用戶信息數據轉換爲DF保存到Hive表中
val userInfoDF = userInfoRdd.toDF()
insertHive(spark, USER_INFO_TABLE, userInfoDF)
// 7:將產品信息數據轉換爲DF保存到Hive表中
val productInfoDF = productInfoRdd.toDF()
insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)
// 8:將DataFrame插入到Hive表中
private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
spark.sql("DROP TABLE IF EXISTS " + tableName)
dataDF.write.saveAsTable(tableName)
}
複製代碼
DataSet 與 RDD 互操做數組
1.經過編程獲取 Schema:經過 spark 內部的 StructType 方式,將普通的 RDD 轉換成 DataFrame。
object SparkRDDtoDF {
def rddToDF(sparkSession:SparkSession):DataFrame = {
//設置 schema 結構
val schema = StructType(
Seq(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
val rowRDD = sparkSession.sparkContext
.textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
.map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))
sparkSession.createDataFrame(rowRDD,schema)
}
2.經過反射獲取 Schema:使用 case class 的方式,不過在 scala 2.10 中最大支持 22 個字段的 case class,這點須要注意;
case class Person(name:String,age:Int)
def rddToDFCase(sparkSession : SparkSession):DataFrame = {
//導入隱飾操做,不然 RDD 沒法調用 toDF 方法
import sparkSession.implicits._
val peopleRDD = sparkSession.sparkContext
.textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
.map( x => x.split(",")).map( x => Person(x(0),x(1).trim().toInt)).toDF()
peopleRDD
}
3 Main函數
def main(agrs : Array[String]):Unit = {
val conf = new SparkConf().setMaster("local[2]")
conf.set("spark.sql.warehouse.dir","file:/E:/scala_workspace/z_spark_study/")
conf.set("spark.sql.shuffle.partitions","20")
val sparkSession = SparkSession.builder().appName("RDD to DataFrame")
.config(conf).getOrCreate()
// 經過代碼的方式,設置 Spark log4j 的級別
sparkSession.sparkContext.setLogLevel("WARN")
import sparkSession.implicits._
//使用 case class 的方式
//val peopleDF = rddToDFCase(sparkSession)
// 經過編程的方式完成 RDD 向
val peopleDF = rddToDF(sparkSession)
peopleDF.show()
peopleDF.select($"name",$"age").filter($"age">20).show()
}
}
複製代碼
4 DataFrame/DataSet 轉 RDD緩存
val rdd1=testDF.rdd
val rdd2=testDS.rdd
複製代碼
5 RDD 轉 DataFrame網絡
import spark.implicits._
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF("col1","col2")
複製代碼
6 DataSet 轉 DataFrame
import spark.implicits._
val testDF = testDS.toDF
複製代碼
7 DataFrame 轉 DataSet
import spark.implicits._
//定義字段名和類型
case class Coltest(col1:String, col2:Int) extends Serializable
val testDS = testDF.as[Coltest]
複製代碼
弱類型 UDAF 函數
/**
* 用戶自定義聚合函數
*/
class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {
/**
* 聚合函數輸入參數的數據類型
*/
override def inputSchema: StructType = StructType(StructField("cityInfo", StringType) ::
Nil)
/**
* 聚合緩衝區中值的類型
* 中間進行聚合時所處理的數據類型
*/
override def bufferSchema: StructType = StructType(StructField("bufferCityInfo",
StringType) :: Nil)
/**
* 函數返回值的數據類型
*/
override def dataType: DataType = StringType
/**
* 一致性檢驗,若是爲 true,那麼輸入不變的狀況下計算的結果也是不變的
*/
override def deterministic: Boolean = true
/**
* 設置聚合中間 buffer 的初始值
* 須要保證這個語義:兩個初始 buffer 調用下面實現的 merge 方法後也應該爲初始 buffer 即若是你初始值是
1,而後你 merge 是執行一個相加的動做,兩個初始 buffer 合併以後等於 2,不會等於初始 buffer 了。這樣的初始
值就是有問題的,因此初始值也叫"zero value"
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)= ""
}
/**
* 用輸入數據 input 更新 buffer 值,相似於 combineByKey
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 緩衝中的已經拼接過的城市信息串
var bufferCityInfo = buffer.getString(0)
// 剛剛傳遞進來的某個城市信息
val cityInfo = input.getString(0)
// 在這裏要實現去重的邏輯
// 判斷:以前沒有拼接過某個城市信息,那麼這裏才能夠接下去拼接新的城市信息
if(!bufferCityInfo.contains(cityInfo)) {
if("".equals(bufferCityInfo))
bufferCityInfo += cityInfo
else {
// 好比 1:北京
// 1:北京,2:上海
bufferCityInfo += "," + cityInfo
}
buffer.update(0, bufferCityInfo)
}
}
/**
* 合併兩個 buffer,將 buffer2 合併到 buffer1.在合併兩個分區聚合結果的時候會被用到,相似於
reduceByKey
* 這裏要注意該方法沒有返回值,在實現的時候是把 buffer2 合併到 buffer1 中去,你須要實現這個合併細節
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
var bufferCityInfo1 = buffer1.getString(0);
val bufferCityInfo2 = buffer2.getString(0);
for(cityInfo <- bufferCityInfo2.split(",")) {
if(!bufferCityInfo1.contains(cityInfo)) {
if("".equals(bufferCityInfo1)) {
bufferCityInfo1 += cityInfo;
} else {
bufferCityInfo1 += "," + cityInfo;
}
}
}
buffer1.update(0, bufferCityInfo1);
}
/**
* 計算並返回最終的聚合結果
*/
override def evaluate(buffer: Row): Any = {
buffer.getString(0)
}
}
複製代碼
強類型 UDAF 函數
// 定義 case 類
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
/**
* 計算並返回最終的聚合結果
*/
def zero: Average = Average(0L, 0L)
/**
* 根據傳入的參數值更新 buffer 值
*/
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
/**
* 合併兩個 buffer 值,將 buffer2 的值合併到 buffer1
*/
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
/**
* 計算輸出
*/
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
/**
* 設定中間值類型的編碼器,要轉換成 case 類
* Encoders.product 是進行 scala 元組和 case 類轉換的編碼器
*/
def bufferEncoder: Encoder[Average] = Encoders.product
/**
* 設定最終輸出值的編碼器
*/
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
複製代碼
開窗用於爲行定義一個窗口(這裏的窗口是指運算將要操做的行的集合), 它 對一組值進行操做,不須要使用 GROUP BY 子句對數據進行分組,可以在同一行中 同時返回基礎行的列和聚合列。
開窗函數的調用格式爲: 函數名(列) OVER(選項)
第一大類: 聚合開窗函數 -> 聚合函數(列) OVER (選項),這裏的選項能夠是
PARTITION BY 子句,但不但是 ORDER BY 子句。
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("score").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
val scoreDF = sparkSession.sparkContext.makeRDD(Array(Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78))).toDF("name", "class
", "score")
scoreDF.createOrReplaceTempView("score")
scoreDF.show()
}
OVER 關鍵字表示把聚合函數當成聚合開窗函數而不是聚合函數
sparkSession.sql("select name, class, score, count(name) over() name_count from score")
PARTITION BY 子句建立的分區是獨立於結果集的,建立的分區只是供進行聚合計算的,並且不一樣的開窗函數所建立的分區也不互相影響。
sparkSession.sql("select name, class, score, count(name) over(partition by class) name_count from score").show()
|name|class|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 3|
| a2| 1| 78| 3|
| a3| 1| 95| 3|
| a6| 3| 99| 5|
| a7| 3| 99| 5|
| a8| 3| 45| 5|
| a9| 3| 55| 5|
| a10| 3| 78| 5|
| a4| 2| 74| 2|
| a5| 2| 92| 2|
+----+-----+-----+----------+
第二大類: 排序開窗函數 -> 排序函數(列) OVER(選項),這裏的選項能夠是
ORDER BY 子句,也能夠是 OVER(PARTITION BY 子句 ORDER BY 子句),
但不能夠是 PARTITION BY 子句。
對於排序開窗函數來說,它支持的開窗函數分別爲: ROW_NUMBER(行號)、
RANK(排名)、 DENSE_RANK(密集排名)和 NTILE(分組排名)。
sparkSession.sql("select name, class, score, row_number() over(order by score) rank from
score").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 5|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 10|
+----+-----+-----+----+
sparkSession.sql("select name, class, score, rank() over(order by score) rank from
score").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 9|
+----+-----+-----+----+
sparkSession.sql("select name, class, score, dense_rank() over(order by score) rank from
score").show()
----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 5|
| a5| 2| 92| 6|
| a3| 1| 95| 7|
| a6| 3| 99| 8|
| a7| 3| 99| 8|
+----+-----+-----+----+
sparkSession.sql("select name, class, score, ntile(6) over(order by score) rank from
score").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 1|
| a4| 2| 74| 2|
| a2| 1| 78| 2|
| a10| 3| 78| 3|
| a1| 1| 80| 3|
| a5| 2| 92| 4|
| a3| 1| 95| 4|
| a6| 3| 99| 5|
| a7| 3| 99| 6|
+----+-----+-----+----+
複製代碼
object updateStateByKeyWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Wordcount")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://s100:8020/wordcount_checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCount = pairs.updateStateByKey((values:Seq[Int], state:Option[Int]) =>{
var newValue = state.getOrElse(0)
for(value <- values){
newValue += value
}
Option(newValue)
})
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
1. 點擊Session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:04:42,null,37,17,null,null,null,null,7
2. 搜索Session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,3,2018-02-11 17:29:50,重慶小面,-1,-1,null,null,null,null,1
3. 下單Session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,6,2018-02-11 17:50:10,null,-1,-1,61,71,null,null,2
4. 付款Session
2018-02-11,81,af18373e1dbc47a397e87f186ffd9555,4,2018-02-11 17:18:24,null,-1,-1,null,null,83,17,1
複製代碼
用戶訪問行爲模型(每個 Session_Id對應一個用戶,從而能夠聚合一個用戶的全部操做行爲)
一個 Session_Id 對應多個action_time,從而能夠得出每個Session的訪問週期Visit_Length。
一個 Session_Id 對應多個page_id,能夠進一步統計出Step_Length 以及轉化率等指標。
Session_Id | Search_Keywords | Click_Category_Id | Visit_Length | Step_Length | Start_Time
複製代碼
初步統計出每個 Session_Id對應的Visit_Length和Step_Length
累加器在Driver端維護了一個Map,用於集中存儲全部Sesson中(如:1s_3s或1_3_ratio等)的訪問步長和訪問時長佔比累積數。
每個Sesson 包含了一種(如:1s_3s或1_3_ratio)特徵。
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
/**
* 自定義累加器
*/
class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {
// 保存全部聚合數據
private val aggrStatMap = mutable.HashMap[String, Int]()
override def isZero: Boolean = {
aggrStatMap.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
val newAcc = new SessionAggrStatAccumulator
aggrStatMap.synchronized{
newAcc.aggrStatMap ++= this.aggrStatMap
}
newAcc
}
override def reset(): Unit = {
aggrStatMap.clear()
}
mutable.HashMap[String, Int]()的更新操做
override def add(v: String): Unit = {
if (!aggrStatMap.contains(v))
aggrStatMap += (v -> 0)
aggrStatMap.update(v, aggrStatMap(v) + 1)
}
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
other match {
case acc:SessionAggrStatAccumulator => {
(this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )}
}
}
}
override def value: mutable.HashMap[String, Int] = {
this.aggrStatMap
}
}
複製代碼
獲取統計任務參數【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
task.params.json={startDate:"2018-02-01", \
endDate:"2018-02-28", \
startAge: 20, \
endAge: 50, \
professionals: "", \
cities: "", \
sex:"", \
keywords:"", \
categoryIds:"", \
targetPageFlow:"1,2,3,4,5,6,7"}
val taskParam = JSONObject.fromObject(jsonStr)
複製代碼
建立Spark客戶端
// 構建Spark上下文
val sparkConf = new SparkConf().setAppName("SessionAnalyzer").setMaster("local[*]")
// 建立Spark客戶端
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
複製代碼
設置自定義累加器,實現全部數據的統計功能,注意累加器也是懶執行的
val sessionAggrStatAccumulator = new SessionAggrStatAccumulator
複製代碼
註冊自定義累加器
sc.register(sessionAggrStatAccumulator, "sessionAggrStatAccumulator")
複製代碼
首先要從user_visit_action的Hive表中,查詢出來指定日期範圍內的行爲數據
def getParam(jsonObject:JSONObject, field:String):String = {
jsonObject.getString(field)
}
def getActionRDDByDateRange(spark: SparkSession, taskParam: JSONObject): RDD[UserVisitAction] = {
val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
import spark.implicits._
spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'")
.as[UserVisitAction].rdd
}
rdd仍然具備表頭信息
val actionRDD = this.getActionRDDByDateRange(spark, taskParam)
將用戶行爲信息轉換爲 K-V 結構
val sessionid2actionRDD = actionRDD.map(item => (item.session_id, item))
複製代碼
將數據進行內存緩存
sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY)
複製代碼
將數據轉換爲Session粒度(對數據聚合變換,獲得過濾,搜索列表數組,點擊類別數組,訪問起始時間及訪問步長,訪問時長等)
格式爲<sessionid,(sessionid,searchKeywords,clickCategoryIds,age,professional,city,sex)>
def aggregateBySession(spark: SparkSession, sessinoid2actionRDD: RDD[(String, UserVisitAction)]): RDD[(String, String)] = {
// 對行爲數據按session粒度進行分組
val sessionid2ActionsRDD = sessinoid2actionRDD.groupByKey()
// 對每個session分組進行聚合,將session中全部的搜索詞和點擊品類都聚合起來,<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
val userid2PartAggrInfoRDD = sessionid2ActionsRDD.map { case (sessionid, userVisitActions) =>
val searchKeywordsBuffer = new StringBuffer("")
val clickCategoryIdsBuffer = new StringBuffer("")
var userid = -1L
// session的起始和結束時間
var startTime: Date = null
var endTime: Date = null
// session的訪問步長
var stepLength = 0
// 遍歷session全部的訪問行爲
userVisitActions.foreach { userVisitAction =>
if (userid == -1L) {
userid = userVisitAction.user_id
}
val searchKeyword = userVisitAction.search_keyword
val clickCategoryId = userVisitAction.click_category_id
// 實際上這裏要對數聽說明一下
// 並非每一行訪問行爲都有searchKeyword何clickCategoryId兩個字段的
// 其實,只有搜索行爲,是有searchKeyword字段的
// 只有點擊品類的行爲,是有clickCategoryId字段的
// 因此,任何一行行爲數據,都不可能兩個字段都有,因此數據是可能出現null值的
// 咱們決定是否將搜索詞或點擊品類id拼接到字符串中去
// 首先要知足:不能是null值
// 其次,以前的字符串中尚未搜索詞或者點擊品類id
if (StringUtils.isNotEmpty(searchKeyword)) {
if (!searchKeywordsBuffer.toString.contains(searchKeyword)) {
searchKeywordsBuffer.append(searchKeyword + ",")
}
}
if (clickCategoryId != null && clickCategoryId != -1L) {
if (!clickCategoryIdsBuffer.toString.contains(clickCategoryId.toString)) {
clickCategoryIdsBuffer.append(clickCategoryId + ",")
}
}
// 計算session開始和結束時間
val actionTime = DateUtils.parseTime(userVisitAction.action_time)
if (startTime == null) {
startTime = actionTime
}
if (endTime == null) {
endTime = actionTime
}
if (actionTime.before(startTime)) {
startTime = actionTime
}
if (actionTime.after(endTime)) {
endTime = actionTime
}
// 計算session訪問步長
stepLength += 1
}
val searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString)
val clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString)
// 計算session訪問時長(秒)
val visitLength = (endTime.getTime() - startTime.getTime()) / 1000
// 聚合數據,使用key=value|key=value
val partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|" +
Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|" +
Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +
Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +
Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)
(userid, partAggrInfo);
}
// 查詢全部用戶數據,並映射成<userid,Row>的格式
import spark.implicits._
val userid2InfoRDD = spark.sql("select * from user_info").as[UserInfo].rdd.map(item => (item.user_id, item))
// 將session粒度聚合數據,與用戶信息進行join
val userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD);
// 對join起來的數據進行拼接,而且返回<sessionid,fullAggrInfo>格式的數據
val sessionid2FullAggrInfoRDD = userid2FullInfoRDD.map { case (uid, (partAggrInfo, userInfo)) =>
val sessionid = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
val fullAggrInfo = partAggrInfo + "|" +
Constants.FIELD_AGE + "=" + userInfo.age + "|" +
Constants.FIELD_PROFESSIONAL + "=" + userInfo.professional + "|" +
Constants.FIELD_CITY + "=" + userInfo.city + "|" +
Constants.FIELD_SEX + "=" + userInfo.sex
(sessionid, fullAggrInfo)
}
sessionid2FullAggrInfoRDD
}
複製代碼
根據查詢任務的配置,過濾用戶的行爲數據,同時在過濾的過程當中,對累加器中的數據進行統計
按照年齡、職業、城市範圍、性別、搜索詞、點擊品類這些條件過濾後的最終結果
def filterSessionAndAggrStat(sessionid2AggrInfoRDD: RDD[(String, String)],
taskParam: JSONObject,
sessionAggrStatAccumulator: AccumulatorV2[String, mutable.HashMap[String, Int]]): RDD[(String, String)] = {
// 獲取查詢任務中的配置
val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)
val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)
val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)
val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)
val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)
val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)
val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)
var _parameter = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +
(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +
(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +
(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +
(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +
(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +
(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")
if (_parameter.endsWith("\\|")) {
_parameter = _parameter.substring(0, _parameter.length() - 1)
}
val parameter = _parameter
// 根據篩選參數進行過濾
val filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter { case (sessionid, aggrInfo) =>
// 接着,依次按照篩選條件進行過濾
// 按照年齡範圍進行過濾(startAge、endAge)
var success = true
if (!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE))
success = false
// 按照職業範圍進行過濾(professionals)
// 互聯網,IT,軟件
// 互聯網
if (!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS))
success = false
// 按照城市範圍進行過濾(cities)
// 北京,上海,廣州,深圳
// 成都
if (!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CITIES))
success = false
// 按照性別進行過濾
// 男/女
// 男,女
if (!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX))
success = false
// 按照搜索詞進行過濾
// 咱們的session可能搜索了 火鍋,蛋糕,燒烤
// 咱們的篩選條件多是 火鍋,串串香,iphone手機
// 那麼,in這個校驗方法,主要斷定session搜索的詞中,有任何一個,與篩選條件中
// 任何一個搜索詞至關,即經過
if (!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS))
success = false
// 按照點擊品類id進行過濾
if (!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS))
success = false
// 若是符合任務搜索需求
if (success) {
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
// 計算訪問時長範圍
def calculateVisitLength(visitLength: Long) {
if (visitLength >= 1 && visitLength <= 3) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
} else if (visitLength >= 4 && visitLength <= 6) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
} else if (visitLength >= 7 && visitLength <= 9) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
} else if (visitLength >= 10 && visitLength <= 30) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
} else if (visitLength > 30 && visitLength <= 60) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
} else if (visitLength > 60 && visitLength <= 180) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
} else if (visitLength > 180 && visitLength <= 600) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
} else if (visitLength > 600 && visitLength <= 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
} else if (visitLength > 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
}
}
// 計算訪問步長範圍
def calculateStepLength(stepLength: Long) {
if (stepLength >= 1 && stepLength <= 3) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
} else if (stepLength >= 4 && stepLength <= 6) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
} else if (stepLength >= 7 && stepLength <= 9) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
} else if (stepLength >= 10 && stepLength <= 30) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
} else if (stepLength > 30 && stepLength <= 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
} else if (stepLength > 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
}
// 計算出session的訪問時長和訪問步長的範圍,並進行相應的累加
val visitLength = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
val stepLength = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong
calculateVisitLength(visitLength)
calculateStepLength(stepLength)
}
success
}
filteredSessionid2AggrInfoRDD
}
複製代碼
持久化辛苦聚合過濾統計值,對數據進行內存緩存
filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY)
複製代碼
獲得篩選的session對應的訪問明細數據(獲得過濾後的原始數據)
def getSessionid2detailRDD(sessionid2aggrInfoRDD: RDD[(String, String)], sessionid2actionRDD: RDD[(String, UserVisitAction)]): RDD[(String, UserVisitAction)] = {
sessionid2aggrInfoRDD.join(sessionid2actionRDD).map(item => (item._1, item._2._2))
}
sessionid2detailRDD.persist(StorageLevel.MEMORY_ONLY)
複製代碼
利用累積器開發業務功能一:統計各個範圍的session佔比,並寫入MySQL
calculateAndPersistAggrStat(spark, sessionAggrStatAccumulator.value, taskUUID)
def calculateAndPersistAggrStat(spark: SparkSession, value: mutable.HashMap[String, Int], taskUUID: String) {
// 從Accumulator統計串中獲取值
val session_count = value(Constants.SESSION_COUNT).toDouble
val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)
val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)
val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)
val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)
val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)
val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)
val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)
val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)
val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)
val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)
val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)
val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)
val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)
val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)
val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)
// 計算各個訪問時長和訪問步長的範圍
val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)
val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)
val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)
val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)
val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)
val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)
val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)
val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)
val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)
val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)
val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)
val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)
val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)
val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)
val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)
// 將統計結果封裝爲Domain對象
val sessionAggrStat = SessionAggrStat(taskUUID,
session_count.toInt, visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,
visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,
visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,
step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,
step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)
import spark.implicits._
val sessionAggrStatRDD = spark.sparkContext.makeRDD(Array(sessionAggrStat))
sessionAggrStatRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "session_aggr_stat")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
}
複製代碼
按照Session粒度(注意每個session可能有多條action記錄。)隨機均勻獲取Session。
randomExtractSession(spark, taskUUID, filteredSessionid2AggrInfoRDD, sessionid2detailRDD)
def randomExtractSession(spark: SparkSession, taskUUID: String, sessionid2AggrInfoRDD: RDD[(String, String)], sessionid2actionRDD: RDD[(String, UserVisitAction)]) {
// 第一步,計算出天天每小時的session數量,獲取<yyyy-MM-dd_HH,aggrInfo>格式的RDD
val time2sessionidRDD = sessionid2AggrInfoRDD.map { case (sessionid, aggrInfo) =>
val startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME)
// 將key改成yyyy-MM-dd_HH的形式(小時粒度)
val dateHour = DateUtils.getDateHour(startTime)
(dateHour, aggrInfo)
}
// 獲得天天每小時的session數量
// countByKey()計算每一個不一樣的key有多少個數據
// countMap<yyyy-MM-dd_HH, count>
val countMap = time2sessionidRDD.countByKey()
// 第二步,使用按時間比例隨機抽取算法,計算出天天每小時要抽取session的索引,將<yyyy-MM-dd_HH,count>格式的map,轉換成<yyyy-MM-dd,<HH,count>>的格式
// dateHourCountMap <yyyy-MM-dd,<HH,count>>
val dateHourCountMap = mutable.HashMap[String, mutable.HashMap[String, Long]]()
for ((dateHour, count) <- countMap) {
val date = dateHour.split("_")(0)
val hour = dateHour.split("_")(1)
// 經過模式匹配實現了if的功能
dateHourCountMap.get(date) match {
// 對應日期的數據不存在,則新增
case None => dateHourCountMap(date) = new mutable.HashMap[String, Long](); dateHourCountMap(date) += (hour -> count)
// 對應日期的數據存在,則更新
// 若是有值,Some(hourCountMap)將值取到了hourCountMap中
case Some(hourCountMap) => hourCountMap += (hour -> count)
}
}
// 按時間比例隨機抽取算法,總共要抽取100個session,先按照天數,進行平分
// 獲取每一天要抽取的數量
val extractNumberPerDay = 100 / dateHourCountMap.size
// dateHourExtractMap[天,[小時,index列表]]
val dateHourExtractMap = mutable.HashMap[String, mutable.HashMap[String, mutable.ListBuffer[Int]]]()
val random = new Random()
/**
* 根據每一個小時應該抽取的數量,來產生隨機值
* 遍歷每一個小時,填充Map<date,<hour,(3,5,20,102)>>
* @param hourExtractMap 主要用來存放生成的隨機值
* @param hourCountMap 每一個小時的session總數
* @param sessionCount 當天全部的seesion總數
*/
def hourExtractMapFunc(hourExtractMap: mutable.HashMap[String, mutable.ListBuffer[Int]], hourCountMap: mutable.HashMap[String, Long], sessionCount: Long) {
for ((hour, count) <- hourCountMap) {
// 計算每一個小時的session數量,佔據當天總session數量的比例,直接乘以天天要抽取的數量
// 就能夠計算出,當前小時須要抽取的session數量
var hourExtractNumber = ((count / sessionCount.toDouble) * extractNumberPerDay).toInt
if (hourExtractNumber > count) {
hourExtractNumber = count.toInt
}
// 仍然經過模式匹配實現有則追加,無則新建
hourExtractMap.get(hour) match {
case None => hourExtractMap(hour) = new mutable.ListBuffer[Int]();
// 根據數量隨機生成下標
for (i <- 0 to hourExtractNumber) {
var extractIndex = random.nextInt(count.toInt);
// 一旦隨機生成的index已經存在,從新獲取,直到獲取到以前沒有的index
while (hourExtractMap(hour).contains(extractIndex)) {
extractIndex = random.nextInt(count.toInt);
}
hourExtractMap(hour) += (extractIndex)
}
case Some(extractIndexList) =>
for (i <- 0 to hourExtractNumber) {
var extractIndex = random.nextInt(count.toInt);
// 一旦隨機生成的index已經存在,從新獲取,直到獲取到以前沒有的index
while (hourExtractMap(hour).contains(extractIndex)) {
extractIndex = random.nextInt(count.toInt);
}
hourExtractMap(hour) += (extractIndex)
}
}
}
}
// session隨機抽取功能
for ((date, hourCountMap) <- dateHourCountMap) {
// 計算出這一天的session總數
val sessionCount = hourCountMap.values.sum
// dateHourExtractMap[天,[小時,小時列表]]
dateHourExtractMap.get(date) match {
case None => dateHourExtractMap(date) = new mutable.HashMap[String, mutable.ListBuffer[Int]]();
// 更新index
hourExtractMapFunc(dateHourExtractMap(date), hourCountMap, sessionCount)
case Some(hourExtractMap) => hourExtractMapFunc(hourExtractMap, hourCountMap, sessionCount)
}
}
/* 至此,index獲取完畢 */
//將Map進行廣播
val dateHourExtractMapBroadcast = spark.sparkContext.broadcast(dateHourExtractMap)
// time2sessionidRDD <yyyy-MM-dd_HH,aggrInfo>
// 執行groupByKey算子,獲得<yyyy-MM-dd_HH,(session aggrInfo)>
val time2sessionsRDD = time2sessionidRDD.groupByKey()
// 第三步:遍歷天天每小時的session,而後根據隨機索引進行抽取,咱們用flatMap算子,遍歷全部的<dateHour,(session aggrInfo)>格式的數據
val sessionRandomExtract = time2sessionsRDD.flatMap { case (dateHour, items) =>
val date = dateHour.split("_")(0)
val hour = dateHour.split("_")(1)
// 從廣播變量中提取出數據
val dateHourExtractMap = dateHourExtractMapBroadcast.value
// 獲取指定天對應的指定小時的indexList
// 當前小時須要的index集合
val extractIndexList = dateHourExtractMap.get(date).get(hour)
// index是在外部進行維護
var index = 0
val sessionRandomExtractArray = new ArrayBuffer[SessionRandomExtract]()
// 開始遍歷全部的aggrInfo
for (sessionAggrInfo <- items) {
// 若是篩選List中包含當前的index,則提取此sessionAggrInfo中的數據
if (extractIndexList.contains(index)) {
val sessionid = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
val starttime = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_START_TIME)
val searchKeywords = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)
val clickCategoryIds = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)
sessionRandomExtractArray += SessionRandomExtract(taskUUID, sessionid, starttime, searchKeywords, clickCategoryIds)
}
// index自增
index += 1
}
sessionRandomExtractArray
}
/* 將抽取後的數據保存到MySQL */
// 引入隱式轉換,準備進行RDD向Dataframe的轉換
import spark.implicits._
// 爲了方便地將數據保存到MySQL數據庫,將RDD數據轉換爲Dataframe
sessionRandomExtract.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "session_random_extract")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
// 提取抽取出來的數據中的sessionId
val extractSessionidsRDD = sessionRandomExtract.map(item => (item.sessionid, item.sessionid))
// 第四步:獲取抽取出來的session的明細數據
// 根據sessionId與詳細數據進行聚合
val extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD)
// 對extractSessionDetailRDD中的數據進行聚合,提煉有價值的明細數據
val sessionDetailRDD = extractSessionDetailRDD.map { case (sid, (sessionid, userVisitAction)) =>
SessionDetail(taskUUID, userVisitAction.user_id, userVisitAction.session_id,
userVisitAction.page_id, userVisitAction.action_time, userVisitAction.search_keyword,
userVisitAction.click_category_id, userVisitAction.click_product_id, userVisitAction.order_category_ids,
userVisitAction.order_product_ids, userVisitAction.pay_category_ids, userVisitAction.pay_product_ids)
}
// 將明細數據保存到MySQL中
sessionDetailRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "session_detail")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
}
複製代碼
獲取top10熱門品類
排序
case class CategorySortKey(val clickCount: Long, val orderCount: Long, val payCount: Long) extends Ordered[CategorySortKey] {
override def compare(that: CategorySortKey): Int = {
if (this.clickCount - that.clickCount != 0) {
return (this.clickCount - that.clickCount).toInt
} else if (this.orderCount - that.orderCount != 0) {
return (this.orderCount - that.orderCount).toInt
} else if (this.payCount - that.payCount != 0) {
return (this.payCount - that.payCount).toInt
}
0
}
}
獲取各個品類的點擊次數RDD
def getClickCategoryId2CountRDD(sessionid2detailRDD: RDD[(String, UserVisitAction)]): RDD[(Long, Long)] = {
// 只將點擊行爲過濾出來
val clickActionRDD = sessionid2detailRDD.filter { case (sessionid, userVisitAction) => userVisitAction.click_category_id != null }
// 獲取每種類別的點擊次數
// map階段:(品類ID,1L)
val clickCategoryIdRDD = clickActionRDD.map { case (sessionid, userVisitAction) => (userVisitAction.click_category_id, 1L) }
// 計算各個品類的點擊次數
// reduce階段:對map階段的數據進行彙總
// (品類ID1,次數) (品類ID2,次數) (品類ID3,次數) ... ... (品類ID4,次數)
clickCategoryIdRDD.reduceByKey(_ + _)
}
鏈接品類RDD與數據RDD
def joinCategoryAndData(categoryidRDD: RDD[(Long, Long)], clickCategoryId2CountRDD: RDD[(Long, Long)], orderCategoryId2CountRDD: RDD[(Long, Long)], payCategoryId2CountRDD: RDD[(Long, Long)]): RDD[(Long, String)] = {
// 將全部品類信息與點擊次數信息結合【左鏈接】
val clickJoinRDD = categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD).map { case (categoryid, (cid, optionValue)) =>
val clickCount = if (optionValue.isDefined) optionValue.get else 0L
val value = Constants.FIELD_CATEGORY_ID + "=" + categoryid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount
(categoryid, value)
}
// 將全部品類信息與訂單次數信息結合【左鏈接】
val orderJoinRDD = clickJoinRDD.leftOuterJoin(orderCategoryId2CountRDD).map { case (categoryid, (ovalue, optionValue)) =>
val orderCount = if (optionValue.isDefined) optionValue.get else 0L
val value = ovalue + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount
(categoryid, value)
}
// 將全部品類信息與付款次數信息結合【左鏈接】
val payJoinRDD = orderJoinRDD.leftOuterJoin(payCategoryId2CountRDD).map { case (categoryid, (ovalue, optionValue)) =>
val payCount = if (optionValue.isDefined) optionValue.get else 0L
val value = ovalue + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount
(categoryid, value)
}
payJoinRDD
}
def getTop10Category(spark: SparkSession, taskid: String, sessionid2detailRDD: RDD[(String, UserVisitAction)]): Array[(CategorySortKey, String)] = {
// 第一步:獲取每個Sessionid 點擊過、下單過、支付過的數量
// 獲取全部產生過點擊、下單、支付中任意行爲的商品類別
val categoryidRDD = sessionid2detailRDD.flatMap { case (sessionid, userVisitAction) =>
val list = ArrayBuffer[(Long, Long)]()
// 一個session中點擊的商品ID
if (userVisitAction.click_category_id != null) {
list += ((userVisitAction.click_category_id, userVisitAction.click_category_id))
}
// 一個session中下單的商品ID集合
if (userVisitAction.order_category_ids != null) {
for (orderCategoryId <- userVisitAction.order_category_ids.split(","))
list += ((orderCategoryId.toLong, orderCategoryId.toLong))
}
// 一個session中支付的商品ID集合
if (userVisitAction.pay_category_ids != null) {
for (payCategoryId <- userVisitAction.pay_category_ids.split(","))
list += ((payCategoryId.toLong, payCategoryId.toLong))
}
list
}
// 對重複的categoryid進行去重
// 獲得了全部被點擊、下單、支付的商品的品類
val distinctCategoryIdRDD = categoryidRDD.distinct
// 第二步:計算各品類的點擊、下單和支付的次數
// 計算各個品類的點擊次數
val clickCategoryId2CountRDD = getClickCategoryId2CountRDD(sessionid2detailRDD)
// 計算各個品類的下單次數
val orderCategoryId2CountRDD = getOrderCategoryId2CountRDD(sessionid2detailRDD)
// 計算各個品類的支付次數
val payCategoryId2CountRDD = getPayCategoryId2CountRDD(sessionid2detailRDD)
// 第三步:join各品類與它的點擊、下單和支付的次數
// distinctCategoryIdRDD中是全部產生過點擊、下單、支付行爲的商品類別
// 經過distinctCategoryIdRDD與各個統計數據的LeftJoin保證數據的完整性
val categoryid2countRDD = joinCategoryAndData(distinctCategoryIdRDD, clickCategoryId2CountRDD, orderCategoryId2CountRDD, payCategoryId2CountRDD);
// 第四步:自定義二次排序key
// 第五步:將數據映射成<CategorySortKey,info>格式的RDD,而後進行二次排序(降序)
// 建立用於二次排序的聯合key —— (CategorySortKey(clickCount, orderCount, payCount), line)
// 按照:點擊次數 -> 下單次數 -> 支付次數 這一順序進行二次排序
val sortKey2countRDD = categoryid2countRDD.map { case (categoryid, line) =>
val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLong
val orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLong
val payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong
(CategorySortKey(clickCount, orderCount, payCount), line)
}
// 降序排序
val sortedCategoryCountRDD = sortKey2countRDD.sortByKey(false)
// 第六步:用take(10)取出top10熱門品類,並寫入MySQL
val top10CategoryList = sortedCategoryCountRDD.take(10)
val top10Category = top10CategoryList.map { case (categorySortKey, line) =>
val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLong
val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLong
val orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLong
val payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong
Top10Category(taskid, categoryid, clickCount, orderCount, payCount)
}
// 將Map結構轉化爲RDD
val top10CategoryRDD = spark.sparkContext.makeRDD(top10Category)
// 寫入MySQL以前,將RDD轉化爲Dataframe
import spark.implicits._
top10CategoryRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "top10_category")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
top10CategoryList
}
複製代碼
獲取top10熱門品類的活躍session(先join熱門品類獲得熱門的session,再迭代計算每一種品類對應的session中點擊次數排名,取前10)
1 sessionid2detailRDD 數據結構重組和計算全部品類出現的次數累加值count
(一個SessionId對應的多條action記錄:sessionid-iter(userVisitAction))
val sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey()
數據結構重組後輸出
(categoryid, sessionid + "," + count)
2 獲取到top10熱門品類,被各個session點擊的次數【將數據集縮小】,包含大量的重複key
val top10CategorySessionCountRDD = top10CategoryIdRDD.join(categoryid2sessionCountRDD).map { case (cid, (ccid, value)) => (cid, value) }
3 整合大量重複的key,按照品類分組,獲取品類下的全部(sessionid + "," + count)迭代器。
val top10CategorySessionCountsRDD = top10CategorySessionCountRDD.groupByKey()
4 每一種品類對應的session中點擊次數進行排序,取前10
val top10Sessions = clicks.toList.sortWith(_.split(",")(1) > _.split(",")(1)).take(10)
複製代碼
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。
def getTop10Session(spark: SparkSession, taskid: String, top10CategoryList: Array[(CategorySortKey, String)], sessionid2ActionRDD: RDD[(String, UserVisitAction)]) {
// 第一步:將top10熱門品類的id,生成一份RDD
// 得到全部須要求的category集合
val top10CategoryIdRDD = spark.sparkContext.makeRDD(top10CategoryList.map { case (categorySortKey, line) =>
val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLong;
(categoryid, categoryid)
})
// 第二步:計算top10品類被各session點擊的次數
// sessionid2ActionRDD是符合過濾(職業、年齡等)條件的完整數據
// sessionid2detailRDD ( sessionId, userAction )
val sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey()
// 獲取每一個品類被每個Session點擊的次數
val categoryid2sessionCountRDD = sessionid2ActionsRDD.flatMap { case (sessionid, userVisitActions) =>
val categoryCountMap = new mutable.HashMap[Long, Long]()
// userVisitActions中聚合了一個session的全部用戶行爲數據
// 遍歷userVisitActions是提取session中的每個用戶行爲,並對每個用戶行爲中的點擊事件進行計數
for (userVisitAction <- userVisitActions) {
// 若是categoryCountMap中尚不存在此點擊品類,則新增品類
if (!categoryCountMap.contains(userVisitAction.click_category_id))
categoryCountMap.put(userVisitAction.click_category_id, 0)
// 若是categoryCountMap中已經存在此點擊品類,則進行累加
if (userVisitAction.click_category_id != null && userVisitAction.click_category_id != -1L) {
categoryCountMap.update(userVisitAction.click_category_id, categoryCountMap(userVisitAction.click_category_id) + 1)
}
}
// 對categoryCountMap中的數據進行格式轉化
for ((categoryid, count) <- categoryCountMap)
yield (categoryid, sessionid + "," + count)
}
// 經過top10熱門品類top10CategoryIdRDD與完整品類點擊統計categoryid2sessionCountRDD進行join,僅獲取熱門品類的數據信息
// 獲取到to10熱門品類,被各個session點擊的次數【將數據集縮小】
val top10CategorySessionCountRDD = top10CategoryIdRDD.join(categoryid2sessionCountRDD).map { case (cid, (ccid, value)) => (cid, value) }
// 第三步:分組取TopN算法實現,獲取每一個品類的top10活躍用戶
// 先按照品類分組
val top10CategorySessionCountsRDD = top10CategorySessionCountRDD.groupByKey()
// 將每個品類的全部點擊排序,取前十個,並轉換爲對象
//(categoryid, sessionId=1213,sessionId=908)
val top10SessionObjectRDD = top10CategorySessionCountsRDD.flatMap { case (categoryid, clicks) =>
// 先排序,而後取前10
val top10Sessions = clicks.toList.sortWith(_.split(",")(1) > _.split(",")(1)).take(10)
// 從新整理數據
top10Sessions.map { case line =>
val sessionid = line.split(",")(0)
val count = line.split(",")(1).toLong
Top10Session(taskid, categoryid, sessionid, count)
}
}
// 將結果以追加方式寫入到MySQL中
import spark.implicits._
top10SessionObjectRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "top10_session")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
val top10SessionRDD = top10SessionObjectRDD.map(item => (item.sessionid, item.sessionid))
// 第四步:獲取top10活躍session的明細數據
val sessionDetailRDD = top10SessionRDD.join(sessionid2ActionRDD).map { case (sid, (sessionid, userVisitAction)) =>
SessionDetail(taskid, userVisitAction.user_id, userVisitAction.session_id,
userVisitAction.page_id, userVisitAction.action_time, userVisitAction.search_keyword,
userVisitAction.click_category_id, userVisitAction.click_product_id, userVisitAction.order_category_ids,
userVisitAction.order_product_ids, userVisitAction.pay_category_ids, userVisitAction.pay_product_ids)
}
// 將活躍Session的明細數據,寫入到MySQL
sessionDetailRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "session_detail")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
}
複製代碼
溫故而知新,本文爲了綜合複習,進行代碼總結,內容粗鄙,勿怪
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。
秦凱新 於深圳