版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。java
timestamp province city userid adid
複製代碼
數據格式
(timestamp province city userid adid)
統計單用戶的統計次數
(date, userid,adid,clickCount)
閾值統計統計黑名單
複製代碼
輸入數據格式
(userid, timestamp province city userid adid)
計算後數據格式並持久化
(date,province,city,adid,clickCount)
複製代碼
輸入數據格式
(yyyyMMdd_province_city_adid,clickCount)
計算後數據格式並持久化
(date,province, adid,clickCount)
註冊成表ROW_NUMBER()實現窗聚合
tmp_daily_ad_click_count_by_prov
複製代碼
輸入數據格式
(timestamp province city userid adid)
計算後數據格式並持久化
(date,hour,minute,adid,clickCount)
複製代碼
構建Spark上下文mysql
val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
// 建立Spark客戶端
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
// 獲取Kafka配置
val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
val topics = ConfigurationManager.config.getString("kafka.topics")
複製代碼
kafka消費者配置算法
val kafkaParam = Map(
"bootstrap.servers" -> broker_list,//用於初始化連接到集羣的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用於標識這個消費者屬於哪一個消費團體
"group.id" -> "commerce-consumer-group",
//若是沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可使用這個配置屬性
//可使用這個配置,latest自動重置偏移量爲最新的偏移量
//earilist:提交過度區,從Offset處讀取,若是沒有提交過offset,從頭讀取
//latest:提交過度區,從Offset處讀取,沒有從最新的數據開始讀取
//None:若是沒有提交offset,就會報錯,提交過offset,就從offset處讀取
"auto.offset.reset" -> "latest",
//若是是true,則這個消費者的偏移量會在後臺自動提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
複製代碼
設置檢查點目錄sql
ssc.checkpoint("./streaming_checkpoint")
複製代碼
LocationStrategies 分配分區策略數據庫
// 建立DStream,返回接收到的輸入數據
// LocationStrategies:根據給定的主題和集羣地址建立consumer
// LocationStrategies.PreferConsistent:持續的在全部Executor之間勻分配分區 (均勻分配,選中的每個Executor都會分配 partition)
// LocationStrategies.PreferBrokers: 若是executor和kafka brokers 在同一臺機器上,選擇該executor。
// LocationStrategies.PreferFixed: 若是機器不是均勻的狀況下,能夠指定特殊的hosts。固然若是不指定,採用 LocationStrategies.PreferConsistent模式
複製代碼
ConsumerStrategies 消費策略bootstrap
// ConsumerStrategies:選擇如何在Driver和Executor上建立和配置Kafka Consumer
// ConsumerStrategies.Subscribe:訂閱一系列主題
val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
複製代碼
SparkStreaming開始消費安全
var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
複製代碼
算法過程以下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)服務器
根據userId進行過濾app
用於Kafka Stream的線程非安全問題,從新分區切斷血統
adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)
def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {
// 剛剛接受到原始的用戶點擊行爲日誌以後
// 根據mysql中的動態黑名單,進行實時的黑名單過濾(黑名單用戶的點擊行爲,直接過濾掉,不要了)
// 使用transform算子(將dstream中的每一個batch RDD進行處理,轉換爲任意的其餘RDD,功能很強大)
val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>
//格式 :timestamp province city userid adid
//某個時間點 某個省份 某個城市 某個用戶 某個廣告
// 首先,從mysql中查詢全部黑名單用戶,將其轉換爲一個rdd
val adBlacklists = AdBlacklistDAO.findAll()
// (userid, timestamp province city userid adid)
val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))
//格式 :timestamp province city userid adid
val mappedRDD = consumerRecordRDD.map(consumerRecord => {
val userid = consumerRecord.split(" ")(3).toLong
(userid,consumerRecord)
})
// 將原始日誌數據rdd,與黑名單rdd,進行左外鏈接
// 若是說原始日誌的userid,沒有在對應的黑名單中,join不到,左外鏈接
// 用inner join,內鏈接,會致使數據丟失
val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)
val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>
// 若是這個值存在,那麼說明原始日誌中的userid,join到了某個黑名單用戶
if(black.isDefined && black.get) false else true
}
filteredRDD.map{ case (userid,(log, black)) => (userid, log)}
}
filteredAdRealTimeLogDStream
}
複製代碼
轉化爲用戶粒度進行過濾,拋棄 province city ,格式爲:(date, userid,adid,clickCount)函數
過濾次數大於閾值的userId,持久化到磁盤。
generateDynamicBlacklist(filteredAdRealTimeLogDStream)
def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {
// (timestamp province city userid adid)
// 計算出每5個秒內的數據中,天天每一個用戶每一個廣告的點擊量
// 經過對原始實時日誌的處理
// 將日誌的格式處理成<yyyyMMdd_userid_adid, 1L>格式
val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>
// 從tuple中獲取到每一條原始的實時日誌
val logSplited = log.split(" ")
// 提取出日期(yyyyMMdd)、userid、adid
val timestamp = logSplited(0)
val date = new Date(timestamp.toLong)
val datekey = DateUtils.formatDateKey(date)
val userid = logSplited(3).toLong
val adid = logSplited(4)
// 拼接key
val key = datekey + "_" + userid + "_" + adid
(key, 1L)
}
// 針對處理後的日誌格式,執行reduceByKey算子便可,(每一個batch中)天天每一個用戶對每一個廣告的點擊量
val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)
// 源源不斷的,每一個5s的batch中,當天每一個用戶對每支廣告的點擊次數
// <yyyyMMdd_userid_adid, clickCount>
dailyUserAdClickCountDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
// 對每一個分區的數據就去獲取一次鏈接對象
// 每次都是從鏈接池中獲取,而不是每次都建立
// 寫數據庫操做,性能已經提到最高了
val adUserClickCounts = ArrayBuffer[AdUserClickCount]()
for(item <- items){
val keySplited = item._1.split("_")
val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
// yyyy-MM-dd
val userid = keySplited(1).toLong
val adid = keySplited(2).toLong
val clickCount = item._2
//批量插入
adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)
}
AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
}
}
// 如今咱們在mysql裏面,已經有了累計的天天各用戶對各廣告的點擊量
// 遍歷每一個batch中的全部記錄,對每條記錄都要去查詢一下,這一天這個用戶對這個廣告的累計點擊量是多少
// 從mysql中查詢
// 查詢出來的結果,若是是100,若是你發現某個用戶某天對某個廣告的點擊量已經大於等於100了
// 那麼就斷定這個用戶就是黑名單用戶,就寫入mysql的表中,持久化
val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>
val keySplited = key.split("_")
// yyyyMMdd -> yyyy-MM-dd
val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
val userid = keySplited(1).toLong
val adid = keySplited(2).toLong
// 從mysql中查詢指定日期指定用戶對指定廣告的點擊量
val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
// 判斷,若是點擊量大於等於100,ok,那麼很差意思,你就是黑名單用戶
// 那麼就拉入黑名單,返回true
if(clickCount >= 100) {
true
}else{
// 反之,若是點擊量小於100的,那麼就暫時不要管它了
false
}
}
複製代碼
轉化爲省城市粒度進行過濾,拋棄userId,格式爲:(yyyyMMdd_province_city_adid,clickCount)
val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)
def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {
// 計算天天各省各城市各廣告的點擊量
// 設計出來幾個維度:日期、省份、城市、廣告
// 2015-12-01,當天,能夠看到當天全部的實時數據(動態改變),好比江蘇省南京市
// 廣告能夠進行選擇(廣告主、廣告名稱、廣告類型來篩選一個出來)
// 拿着date、province、city、adid,去mysql中查詢最新的數據
// 等等,基於這幾個維度,以及這份動態改變的數據,是能夠實現比較靈活的廣告點擊流量查看的功能的
// date province city userid adid
// date_province_city_adid,做爲key;1做爲value
// 經過spark,直接統計出來全局的點擊次數,在spark集羣中保留一份;在mysql中,也保留一份
// 咱們要對原始數據進行map,映射成<date_province_city_adid,1>格式
// 而後呢,對上述格式的數據,執行updateStateByKey算子
// spark streaming特有的一種算子,在spark集羣內存中,維護一份key的全局狀態
//(userid, timestamp province city userid adid)
val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>
val logSplited = log.split(" ")
val timestamp = logSplited(0)
val date = new Date(timestamp.toLong)
val datekey = DateUtils.formatDateKey(date)
val province = logSplited(1)
val city = logSplited(2)
val adid = logSplited(4).toLong
val key = datekey + "_" + province + "_" + city + "_" + adid
(key, 1L)
}
// 在這個dstream中,就至關於,有每一個batch rdd累加的各個key(各天各省份各城市各廣告的點擊次數)
// 每次計算出最新的值,就在aggregatedDStream中的每一個batch rdd中反應出來
val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
// 舉例來講
// 對於每一個key,都會調用一次這個方法
// 好比key是<20151201_Jiangsu_Nanjing_10001,1>,就會來調用一次這個方法7
// 10個
// values,(1,1,1,1,1,1,1,1,1,1)
// 首先根據optional判斷,以前這個key,是否有對應的狀態
var clickCount = 0L
// 若是說,以前是存在這個狀態的,那麼就以以前的狀態做爲起點,進行值的累加
if(old.isDefined) {
clickCount = old.get
}
// values,表明了,batch rdd中,每一個key對應的全部的值
for(value <- values) {
clickCount += value
}
Some(clickCount)
}
// 將計算出來的最新結果,同步一份到mysql中,以便於j2ee系統使用
aggregatedDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
//批量保存到數據庫
val adStats = ArrayBuffer[AdStat]()
for(item <- items){
val keySplited = item._1.split("_")
val date = keySplited(0)
val province = keySplited(1)
val city = keySplited(2)
val adid = keySplited(3).toLong
val clickCount = item._2
adStats += AdStat(date,province,city,adid,clickCount)
}
AdStatDAO.updateBatch(adStats.toArray)
}
}
aggregatedDStream
}
複製代碼
轉化爲省粒度進行過濾,拋棄userId ,cityid,格式爲:(yyyyMMdd_province_adid,clickCount)
註冊成表,基於ROW_NUMBER()實現窗聚合,按照province分區,實現top3排序,
tmp_daily_ad_click_count_by_prov
calculateProvinceTop3Ad(spark,adRealTimeStatDStream)
def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {
// 每個batch rdd,都表明了最新的全量的天天各省份各城市各廣告的點擊量
//(yyyyMMdd_province_city_adid,clickCount)
val rowsDStream = adRealTimeStatDStream.transform{ rdd =>
// <yyyyMMdd_province_city_adid, clickCount>
// <yyyyMMdd_province_adid, clickCount>
// 計算出天天各省份各廣告的點擊量
val mappedRDD = rdd.map{ case (keyString, count) =>
val keySplited = keyString.split("_")
val date = keySplited(0)
val province = keySplited(1)
val adid = keySplited(3).toLong
val clickCount = count
val key = date + "_" + province + "_" + adid
(key, clickCount)
}
val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )
// 將dailyAdClickCountByProvinceRDD轉換爲DataFrame
// 註冊爲一張臨時表
// 使用Spark SQL,經過開窗函數,獲取到各省份的top3熱門廣告
val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>
val keySplited = keyString.split("_")
val datekey = keySplited(0)
val province = keySplited(1)
val adid = keySplited(2).toLong
val clickCount = count
val date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))
(date, province, adid, clickCount)
}
import spark.implicits._
val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")
// 將dailyAdClickCountByProvinceDF,註冊成一張臨時表
dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
// 使用Spark SQL執行SQL語句,配合開窗函數,統計出各身份top3熱門的廣告
val provinceTop3AdDF = spark.sql(
"SELECT "
+ "date,"
+ "province,"
+ "ad_id,"
+ "click_count "
+ "FROM ( "
+ "SELECT "
+ "date,"
+ "province,"
+ "ad_id,"
+ "click_count,"
+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "
+ "FROM tmp_daily_ad_click_count_by_prov "
+ ") t "
+ "WHERE rank<=3"
)
provinceTop3AdDF.rdd
}
// 每次都是刷新出來各個省份最熱門的top3廣告,將其中的數據批量更新到MySQL中
rowsDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
// 插入數據庫
val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
for (item <- items){
val date = item.getString(0)
val province = item.getString(1)
val adid = item.getLong(2)
val clickCount = item.getLong(3)
adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
}
AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
}
}
複製代碼
}
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。
轉化爲時間粒度進行過濾,拋棄province,userId ,cityid,格式爲: <yyyyMMddHHMM_adid,1L>,基於reduceByKeyAndWindow進行聚合
最終結果展開 (date,hour,minute,adid,clickCount)
calculateAdClickCountByWindow(adRealTimeValueDStream)
def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {
// 映射成<yyyyMMddHHMM_adid,1L>格式
//(timestamp province city userid adid)
val pairDStream = adRealTimeValueDStream.map{ case consumerRecord =>
val logSplited = consumerRecord.split(" ")
val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
val adid = logSplited(4).toLong
(timeMinute + "_" + adid, 1L)
}
// 計算窗口函數,1小時滑動窗口內的廣告點擊趨勢
val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))
// 最近1小時內,各分鐘的點擊量,並保存到數據庫
aggrRDD.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
//保存到數據庫
val adClickTrends = ArrayBuffer[AdClickTrend]()
for (item <- items){
val keySplited = item._1.split("_")
// yyyyMMddHHmm
val dateMinute = keySplited(0)
val adid = keySplited(1).toLong
val clickCount = item._2
val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))
val hour = dateMinute.substring(8, 10)
val minute = dateMinute.substring(10)
adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
}
AdClickTrendDAO.updateBatch(adClickTrends.toArray)
}
}
}
複製代碼
溫故而知新,本文爲了綜合複習,進行代碼總結,內容粗鄙,勿怪
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。
秦凱新 於深圳
來源:掘金
著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。