目錄mysql
本文是原項目的一次重寫。主要是用DataFrame代替原來的RDD,並在一些實現上進行優化,還有就是實時流計算改用Flink進行實現。
項目分爲用戶訪問session模塊、頁面轉跳轉化率統計、熱門商品離線統計和廣告流量實時統計四部分組成。算法
用戶訪問sessionsql
該模塊主要是對用戶訪問session進行統計分析,包括session的聚合指標計算、按時間比例隨機抽取session、獲取天天點擊、下單和購買排名前10的品類、並獲取top10品類的點擊量排名前10的session。主要使用Spark DataFrame。數據庫
頁面單跳轉化率統計json
該模塊主要是計算關鍵頁面之間的單步跳轉轉化率,涉及到頁面切片算法以及頁面流匹配算法。主要使用Spark DataFrame。安全
熱門商品離線統計session
該模塊主要實現天天統計出各個區域的top3熱門商品。主要使用Spark DataFrame。數據結構
廣告流量實時統計app
通過實時黑名單過濾的天天各省各城市廣告點擊實時統計、天天各省topn熱門廣告、各廣告近1小時內每分鐘的點擊趨勢。主要使用Spark streaming或Flink。ide
輸入表
# 用戶表 user_id 用戶的ID username 用戶的名稱 name 用戶的名字 age 用戶的年齡 professional 用戶的職業 city 用戶所在的城市 sex 用戶的性別 # 商品表 product_id 商品的ID product_name 商品的名稱 extend_info 商品額外的信息 # 用戶訪問動做表 date 用戶點擊行爲的日期 user_id 用戶的ID session_id Session的ID page_id 某個頁面的ID action_time 點擊行爲的時間點 search_keyword 用戶搜索的關鍵詞 click_category_id 某一個商品品類的ID click_product_id 某一個商品的ID order_category_ids 一次訂單中全部品類的ID集合 order_product_ids 一次訂單中全部商品的ID集合 pay_category_ids 一次支付中全部品類的ID集合 pay_product_ids 一次支付中全部商品的ID集合 city_id 城市ID
輸出表
# 聚合統計表 taskid 當前計算批次的ID session_count 全部Session的總和 visit_length_1s_3s_ratio 1-3sSession訪問時長佔比 visit_length_4s_6s_ratio 4-6sSession訪問時長佔比 visit_length_7s_9s_ratio 7-9sSession訪問時長佔比 visit_length_10s_30s_ratio 10-30sSession訪問時長佔比 visit_length_30s_60s_ratio 30-60sSession訪問時長佔比 visit_length_1m_3m_ratio 1-3mSession訪問時長佔比 visit_length_3m_10m_ratio 3-10mSession訪問時長佔比 visit_length_10m_30m_ratio 10-30mSession訪問時長佔比 visit_length_30m_ratio 30mSession訪問時長佔比 step_length_1_3_ratio 1-3步長佔比 step_length_4_6_ratio 4-6步長佔比 step_length_7_9_ratio 7-9步長佔比 step_length_10_30_ratio 10-30步長佔比 step_length_30_60_ratio 30-60步長佔比 step_length_60_ratio 大於60步長佔比 # 品類Top10表 taskid categoryid clickCount orderCount payCount # Top10 Session taskid categoryid sessionid clickCount
統計出符合條件的session中,各訪問時長、步長的佔比,並將結果保存到MySQL中。符合條件的session指搜索過某些關鍵詞的用戶、訪問時間在某個時間段內的用戶、年齡在某個範圍內的用戶、職業在某個範圍內的用戶、所在某個城市的用戶,所發起的session。
除了將原rdd的實現改成DF外,本文還在兩方面進行了優化。第一是join前提早filter。原實現是先從用戶動做表中計算出訪問時長、步長後和用戶信息表進行關聯後再filter的,這無疑是對一些無關的用戶多餘地計算了訪問時長和步長,也增長了join是shuffle的數據量。第二點是原實現採用accumulator實現個訪問時長人數和各步長人數的統計,這會增長driver的負擔。而重寫後的代碼基於DF,且利用when函數對訪問時長和步長進行離散化,最後利用聚合函數得出統計結果,讓全部統計都在executors中並行執行。
// 原始數據包含「用戶訪問動做表」中的信息 // 先根據時間範圍篩選「用戶訪問動做表」,而後將它和「UserInfo表」進行inner join,補充用於進一步篩選的信息:age、professional、city、sex // 根據searchKeywords、clickCategoryIds和上面4個條件對數據進行篩選,得出所需的session。 // 利用spark sql篩選特定時間段的session spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'") // 下面代碼用於合成SQL語句並用於filter特定類型的session,但有必定的安全隱患,要對輸入的參數進行嚴格的校驗,防止SQL注入。 val selectClause = new StringBuilder("SELECT * FROM user_visit_action_to_user_info WHERE 1=1 ") if (ValidUtils.equal(Constants.PARAM_SEX, sex)){ selectClause append ("AND sex == '" + sex + "'") } if (ValidUtils.in(Constants.PARAM_PROFESSIONALS, professionals)){ selectClause append ("AND professional in (" + professionals + ")") } if (ValidUtils.in(Constants.PARAM_CITIES, cities)){ selectClause append ("AND cities in (" + cities + ")") } if (ValidUtils.in(Constants.PARAM_KEYWORDS, keywords)){ selectClause append ("AND search_keyword in (" + keywords + ")") } if (ValidUtils.in(Constants.PARAM_CATEGORY_IDS, categoryIds)){ selectClause append ("AND click_category_id in (" + categoryIds + ")") } if (ValidUtils.between(Constants.FIELD_AGE, startAge, endAge)){ selectClause append ("AND age BETWEEN " + startAge + " AND " + endAge) } val sqlQuery = selectClause.toString() // filter完後與「用戶表」創建鏈接 // 下面進行session聚合計算,結果獲得的信息包括sessionid、search_keyword、click_category_id、stepLength、visitLength、session開始時間start、AGE、PROFESSIONAL、CITY、SEX val timeFmt = "yyyy-MM-dd HH:mm:ss" val sessionid2ActionsRDD2 = UserVisitActionDF .withColumn("action_time", unix_timestamp($"action_time", timeFmt)) .groupBy("session_id") .agg(min("action_time") as "start", max("action_time") as "end", count("*") as "stepLength") .withColumn("visitLength", $"start" - $"end") .withColumn("discrete_VL", discretiseVisitLength) .withColumn("discrete_SL", discretiseStepLength) // 離散化 visitLength 和 stepLength val discretiseVisitLength = when($"visitLength" >= 1 && $"visitLength" <= 3 , Constants.TIME_PERIOD_1s_3s) .when($"visitLength" >= 4 && $"visitLength" <= 6 , Constants.TIME_PERIOD_4s_6s) ... .when($"visitLength" >= 1800, Constants.TIME_PERIOD_30m) // 統計信息,得到每種訪問時長的人數。將下面discrete_VL換成stepLength就是每種步長的人數了 val statisticVisitLength = sessionid2ActionsRDD2.groupBy("discrete_VL").agg(count("discrete_VL")).collect()
根據各時長、步長的比例抽樣。原實現利用rdd和scala自身數據結構和方法來實現,新的實現直接利用dataframe的統計函數sampleBy實現。
用df.stat.sampleBy("colName", fractions, seed)
,其中fractions爲Map,是每distinct key和其須要抽取的比例,如("a" -> 0.8)就是從key爲a的數據中抽80%條
val fractions = HashMap( TIME_PERIOD_1s_3s -> 0.1, TIME_PERIOD_4s_6s -> 0.1, TIME_PERIOD_7s_9s -> 0.1, TIME_PERIOD_10s_30s -> 0.1, TIME_PERIOD_30s_60s -> 0.1, TIME_PERIOD_1m_3m -> 0.1, TIME_PERIOD_3m_10m -> 0.1, TIME_PERIOD_10m_30m -> 0.1, TIME_PERIOD_30m -> 0.1 ) df.stat.sampleBy("time_period", fractions, 2L) // 若是time_period未知,用下面方式得出map df.select("time_period") .distinct .map(x=> (x, 0.8)) .collectAsMap
分別計算出各商品的點擊數、下單數、支付次數,而後將三個結果進行鏈接,並排序。排序規則是點擊數大的排前面,相同時下單數大的排前面,而後再相同時支付次數大的排前面。這裏的優化點是採用rdd的takeOrdered取前十,它的底層是每一個分區一個最小堆,取出每一個分區的前10,而後再彙總。這樣省去了原來實現當中的sortbykey+take,該方法進行了全排序,效率較低。
// 分別計算出各商品的點擊數、下單數、支付次數,而後將三個結果進行鏈接,並排序。 val clickCategoryId2CountDF = sessionid2detailDF .select("clickCategoryId") .na.drop() .groupBy("clickCategoryId") .agg(count("clickCategoryId")) .withColumnRenamed("clickCategoryId", "categoryId") val orderCategoryId2CountDF = sessionid2detailDF .select("order_category_ids") .na.drop() .withColumn("splitted_order_category_ids", split($"order_category_ids", ",")) .withColumn("single_order_category_ids", explode($"splitted_order_category_ids")) .groupBy("single_order_category_ids") .agg(count("single_order_category_ids")) .withColumnRenamed("single_order_category_ids", "categoryId") val payCategoryId2Count = sessionid2detailDF .select("pay_category_ids") .na.drop() .withColumn("splitted_pay_category_ids", split($"pay_category_ids", ",")) .withColumn("single_pay_category_ids", explode($"splitted_pay_category_ids")) .groupBy("single_pay_category_ids") .agg(count("single_pay_category_ids")) .withColumnRenamed("single_pay_category_ids", "categoryId") val top10CategoryId = clickCategoryId2CountDF.join(orderCategoryId2CountDF, Seq("categoryId"), "left") .join(payCategoryId2Count, Seq("categoryId"), "left") .na.fill(0L, Seq("")) .map(row => { (row.getAs[Int]("categoryId"), row.getAs[Int]("count(clickCategoryId)"), row.getAs[Int]("count(single_order_category_ids)"), row.getAs[Int]("count(single_pay_category_ids)")) }) .rdd .takeOrdered(10)(ordering) // 補充 implicit val ordering = new Ordering[(Int, Int, Int, Int)] { override def compare(x: (Int, Int, Int, Int), y: (Int, Int, Int, Int)): Int = { val compare2 = x._2.compareTo(y._2) if (compare2 != 0) return compare2 val compare3 = x._3.compareTo(y._3) if (compare3 != 0) return compare3 val compare4 = x._4.compareTo(y._4) if (compare4 != 0) return compare4 0 } }.reverse
對於top10的品類,每個都要獲取對它點擊次數排名前10的session。
原代碼的實現是先groupByKey,統計出每一個sessionid對各品類的點擊次數,而後再跟前10熱門品類鏈接來減小數據,而後再用groupBuKey,對每一個分組數據toList後排序取前10。這個實現並不太好,首先它一開始的groupByKey對非Top10熱門品類的數據進行了統計,這是一種浪費。更好的作法是提早filter,即先利用熱門品類這個名單進行filter。而後,原代碼在實現filter使用的是將熱門品類名單parallelise到集羣而後利用join實現過濾。這會觸發沒必要要的shuffle,更好的實現進行broadcast join,將名單廣播出去後進行join。而後groupByKey的統計也是一個問題,它沒有map side聚合,容易OOM,更好的實現是採用DF的groupby + agg。得出統計數據後利用windowfunction取得各熱門品類的前十session。即一次shuffle就能夠完成需求,windowfunction在這個並不須要shuffle,由於通過前面的shuffle聚合,df已經具備partitioner了,在原節點就能夠計算出topn。
// 把top10CategoryId的名單發到集羣 val top10CategoryIdRDD = spark.sparkContext.parallelize(top10CategoryId.map(_._1)).toDF("top10CategoryId") // 利用broadcast實現過濾,而後進行分組統計 val top10Category2SessionAndCount = filteredUserVisitActionDF.join(broadcast(top10CategoryIdRDD), $"click_category_id" === $"top10CategoryId") .groupBy("top10CategoryId", "sessionId") .agg(count("click_category_id") as "count") // 分組取前10 // windowfunction在這個並不須要shuffle,由於通過前面的shuffle聚合,df已經具備partitioner了,在原節點就能夠計算出topn。 val windowSpec = Window.partitionBy("top10CategoryId", "sessionId").orderBy(desc("count")) val top10SessionWithinTop10Category = top10Category2SessionAndCount.select(expr("*"), rank().over(windowSpec).as("rank")) .filter($"rank" <= 10)
計算關鍵頁面之間的單步跳轉轉化率。方法是先獲取目標頁面,如1,2,3,將它們拼接成1_2, 2_3得出兩個目標轉跳形式。一樣須要在df的數據中產生頁面轉跳。方法是利用windowfunction將數據按sessionid分組,訪問時間升序排序,而後利用concat_ws和window的lag函數實現當前頁面id與前一條數據的頁面id的拼接。集羣數據中產生轉跳數據後,利用filter篩選出以前的目標轉跳形式。最後按這些形式分組統計數量,便得出每種轉跳的數量,將它collect爲map。另外還須要計算起始頁1的數量,簡單的filter和count實現。接下來就能夠根據這些數據計算轉跳率了。遍歷目標轉跳形式,從map中獲取相應的數量,而後除以起始頁/上一頁的數量,進而得出結果。
// 獲取須要查詢的頁面id,結果如"3,1,4,5,2" val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW) // 對須要查詢的頁面id進行分割,結果如Array("3","1","4","5","2") val targetPages = targetPageFlow.split(",") // 構建目標轉跳頁面id,結果如Array(3_1,1_4,4_5,5_2) val targetPagePairs = targetPages .zip(targetPages.tail) .map(item => item._1 + "_" + item._2) val targetPageFlowBroadcast = spark.sparkContext.broadcast(targetPagePairs) // 設置將要用到的時間格式和window函數 val timeFmt = "yyyy-MM-dd HH:mm:ss" val windowSpec = Window .partitionBy("session_id") .orderBy($"action_time") val pagesPairFun = concat_ws("_", col("page_id"), lag("page_id", -1).over(windowSpec)) // 計算各目標轉跳id的數量 val pageSplitPvMap = df.na.drop(Seq("session_id")) .withColumn("action_time", to_timestamp($"action_time", timeFmt)) .withColumn("pagePairs", pagesPairFun) // 下面filter方式,條件少時可行,多時用broadcast jion .filter($"pagePairs" isin (targetPageFlowBroadcast.value: _*)) .groupBy("pagePairs") .agg(count("pagePairs")) .as[(String, Long)] .collect().toMap // 計算起始頁面的點擊數 val startPage = targetPages(0) val startPagePv = df.filter($"page_id" === startPage).count().toDouble var lastPageSplitPv = startPagePv // 存儲結果的map val convertRateMap = new mutable.HashMap[String, Double]() for(targetPage <- targetPagePairs){ val targetPageSplitPv = pageSplitPvMap(targetPage).toDouble val convertRate = "%.2f".format(targetPageSplitPv / lastPageSplitPv).toDouble convertRateMap.put(targetPage, convertRate) lastPageSplitPv = targetPageSplitPv }
原數據沒有地區列和城市列(有城市id),因此先廣播一個地區城市表,而後根據城市id進行join。以後按照地區和商品分組進行計數。最後利用windowfunction取各地區topn。
val cityInfo = Array((0L, "北京", "華北"), (1L, "上海", "華東"), (2L, "南京", "華東"), (3L, "廣州", "華南"), (4L, "三亞", "華南"), (5L, "武漢", "華中"), (6L, "長沙", "華中"), (7L, "西安", "西北"), (8L, "成都", "西南"), (9L, "哈爾濱", "東北")) // Row(city_id, city_name, area) val cityInfoDF = spark.sparkContext.makeRDD(cityInfo).toDF("city_id", "city_name", "area") // 提取 cityid 和 productid val cityid2clickActionDF = df.select("city_id", "product_id") .na.drop(Seq("product_id")) .filter($"product_id" =!= -1L) // (cityid, cityName, area, productid) val area_product_clickCount_cityListDF = cityid2clickActionDF.join(broadcast(cityInfoDF), Seq("city_id"), "inner") .withColumn("cityId_cityName", concat_ws(":", $"city_id", $"city_name")) .groupBy($"area", $"product_id") .agg(count("*") as "click_count", collect_set("cityId_cityName") as "city_list") // 和top10熱門session相似,利用window求topn val windowSpec = Window .partitionBy("area", "product_id") .orderBy($"click_count".desc) // 每一個地區前三熱門商品 val areaTop3ProductDF = area_product_clickCount_cityListDF.withColumn("rank", $"click_count".over(windowSpec)) .filter($"rank" <= 3) // productInfo表(對json的操做) val productInfoDF = df.select("product_id", "product_name", "extend_info") .withColumn("product_status", get_json_object($"extend_info", "$.product_status")) .withColumn("product_status", when($"product_status" === 0, "Self").otherwise("Third Party")) .drop("extend_info") // 補充信息 val areaTop3ProducFullInfoDF = areaTop3ProductDF.join(productInfoDF, Seq("product_id"), "inner")
通過實時黑名單過濾的天天各省各城市廣告點擊實時統計、天天各省topn熱門廣告、各廣告近1小時內每分鐘的點擊趨勢。這部分原代碼採用Spark Streaming實現,我將之改成基於Flink的實現。下面會首先介紹Spark Streaming的實現,而後到Flink。
流式數據的格式爲: timestamp 1450702800 province Jiangsu city Nanjing userid 100001 adid 100001
建立流,利用預先廣播的黑名單過濾信息,而後利用過濾後的信息更新黑名單、計算廣告點擊流量、統計天天每一個省份top3熱門廣告、統計一個小時窗口內每分鐘各廣告的點擊量。
// 構建Spark上下文 val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]") // 建立Spark客戶端 val spark = SparkSession.builder().config(sparkConf).getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(5)) // 設置檢查點目錄 ssc.checkpoint("./streaming_checkpoint") // --- 此處省略Kafka配置 --- // // 建立DStream val adRealTimeLogDStream = KafkaUtils.createDirectStream[String,String](...) var adRealTimeValueDStream = adRealTimeLogDStream.map(_.value) // 用於Kafka Stream的線程非安全問題,從新分區切斷血統 adRealTimeValueDStream = adRealTimeValueDStream.repartition(400) // 根據動態黑名單過濾數據。利用findAll來查找MySQL中全部的黑名單用戶,而後經過join實現過濾。 val filteredAdRealTimeLogDStream = filterByBlacklist(spark, adRealTimeValueDStream) // 業務功能一:生成動態黑名單 generateDynamicBlacklist(filteredAdRealTimeLogDStream) // 業務功能二:計算廣告點擊流量實時統計結果(yyyyMMdd_province_city_adid,clickCount) val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream) // 業務功能三:實時統計天天每一個省份top3熱門廣告 calculateProvinceTop3Ad(spark, adRealTimeStatDStream) // 業務功能四:實時統計天天每一個廣告在最近1小時的滑動窗口內的點擊趨勢(每分鐘的點擊量) calculateAdClickCountByWindow(adRealTimeValueDStream) ssc.start() ssc.awaitTermination()
實現實時的動態黑名單機制:將天天對某個廣告點擊超過100次的用戶拉黑。提取出日期(yyyyMMdd)、userid、adid,而後reduceByKey統計這一批數據的結果,並批量插入MySQL。而後過濾出新的黑名單用戶,實現爲從MySQL中查找每條數據的用戶是否對某條廣告的點擊超過100次,即成爲了新的黑名單用戶,找到後進行distinct操做得出新增黑名單用戶,並更新到MySQL。
// 從 adRealTimeValueDStream 中提取出下面三個值並構建(key, 1L) val key = datekey + "_" + userid + "_" + adid // 而後 reduceByKey(_ + _), 獲得這batch天天每一個用戶對每一個廣告的點擊量 dailyUserAdClickCountDStream.foreachRDD{ rdd => rdd.foreachPartition{ items => // items 是 Iterator(key, count),提取key的值,構成(date, userid, adid, clickCount),批量寫入mysql ... }} // 以後filter,每條數據到 mysql 查詢更新後的(date, userid, adid)的count是否大於100,表示當天某用戶對某個廣告是否點擊超過100次,若是是就true(留下)最後得出新黑名單blacklistDStream。去重後直接批量插入mysql blacklistDStream.transform(_.distinct())
天天各省各城市各廣告的點擊流量實時統計。分組,key爲日期+省份+城市+廣告id,利用updateStateByKey實現累加。新的統計結果更新到MySQL。
// 執行updateStateByKey算子 // spark streaming特有的一種算子,在spark集羣內存中,維護一份key的全局狀態 // 和黑名單同樣,先從string中提取出信息並構建key val aggregatedDStream = dailyUserAdClickDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) => var clickCount = 0L // 若是說,以前是存在這個狀態的,那麼就以以前的狀態做爲起點,進行值的累加 if(old.isDefined) { clickCount = old.get } // values表明了,batch rdd中,每一個key對應的全部的值 for(value <- values) { clickCount += value } Some(clickCount) } // 而後和黑名單中同樣,批量更新到mysql
利用上一步獲得的結果,即key爲日期+省份+城市+廣告id,value爲累積點擊量,進行統計及分組topn。reduceByKey + windowfunction
一樣在累積數據的基礎上操做,提取出時間,而後利用固定窗口實現需求。
// 從原始流(未去除黑名單的數據)中提取出timeMinute、adid兩個值進行聚合統計 pairDStream.reduceByKeyAndWindow((a: Long, b: Long) => a + b, Minutes(60L), Seconds(10L)) // 下面 items 就是 Iterator(timeMinute_adid, count) aggrRDD.foreachRDD { rdd => rdd.foreachPartition { items => ...}} // 從key中提取出date、hour和minute寫入mysql
Flink的思路是經過三個KeyedProcessFunction來實現的,由於他有state(累積各key的值)和timer(定時刪除state)功能。
第一個KeyedProcessFunction是記錄每一個userId-adId鍵的量,當達到閾值時對這類信息進行截流,從而實現黑名單的更新和過濾。
第二個是記錄每一個province的數據量,即每一個省的廣告點擊量
第三個是記錄一個map,裏面統計每一個省的點擊量,當進行了必定數量的更新後,就輸出一次這個map的前n個kv對(以排好序的string的形式),從而實現topn功能。
// 模塊結構 ├── Launcher.scala 啓動類 ├── bean │ └── AdLog.scala 三個case class ├── constant │ └── Constant.scala 定義了一些定值字符串 ├── function 處理函數,下面介紹。 │ ├── AccProvClick.scala │ ├── BetterGenerateTopK.scala │ └── FilterBlackListUser.scala └── schema └── AdLogDeserializationSchema.scala 用於反序列化Kafka信息
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // kafka配置 val consumerProps = new Properties() consumerProps.setProperty(KEY_BOOTSTRAP_SERVERS, args(0)) consumerProps.setProperty(KEY_GROUP_ID, args(1)) // kafka消費者 val consumer = new FlinkKafkaConsumer010( args(2), new AdLogDeserializationSchema(), consumerProps ) // 設置數據源 val adLogStream = env.addSource(consumer) // 對點擊某一廣告多於100的用戶進行截流,從而一次性完成黑名單過濾和黑名單更新。 val withSideOutput = adLogStream .keyBy(adLog => (adLog.userid, adLog.adid)) .process(new FilterBlackListUser) // (可選)新增的黑名單流。此處只輸出到控制檯,有須要能夠輸出到其餘端口。 withSideOutput.getSideOutput(realTimeBlackList) .print() // 在main函數外添加下面代碼才能取得sideoutput // val realTimeBlackList: OutputTag[String] = // new OutputTag[String]("black_list") // 實時統計廣告點擊量最多的前K個省份。一樣此處只輸出到控制檯,有須要能夠輸出到其餘端口。 withSideOutput .keyBy(_.province) // 按province進行分組累加的stateful操做 .process(new AccProvClick) // 這裏也能夠輸出到數據庫或者kafka等,從而對這些聚合好的數據進行不一樣需求的分析 .keyBy(_.dummyKey) .process(new BetterGenerateTopK(10)) .print() env.execute("TopK_Province")
廣告日誌類以及處理過程產生的一些新case class
// 從kafka獲取並實現反序列化後的數據 case class AdLog(userid: Int, adid: Int, province: String, city: String, timestamp: Long) // 通過FilterBlackListUser處理後獲得的數據,若是須要對adid、city都進行分組,也能夠在這裏加屬性 case class ProvinceWithCount(province: String, count: Int, dummyKey: Int)
class AdLogDeserializationSchema extends DeserializationSchema[AdLog]{ override def deserialize(bytes: Array[Byte]): AdLog = { val json = parse(new String(bytes)) implicit val formats = DefaultFormats json.extract[AdLog] } // 能夠根據接收的AdLog來判斷是否須要結束這個數據流。若是不須要這個功能就直接返回false。 override def isEndOfStream(t: AdLog): Boolean = false // 告訴Flink通過反序列化後獲得什麼類 override def getProducedType: TypeInformation[AdLog] = TypeInformation.of(AdLog.getClass.asInstanceOf[Class[AdLog]]) }
class FilterBlackListUser extends KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount] { // 存儲當前userId-adId鍵值的廣告點擊量 var countState: ValueState[Int] = _ // 標記當前userId-adId鍵值是否第一次進入黑名單的flag var firstSent: ValueState[Boolean] = _ // 記錄當前userId-adId鍵值state的生成時間 var resetTime: ValueState[Long] = _ // 初始化key state override def open(parameters: Configuration): Unit = { val countDescriptor = new ValueStateDescriptor[Int]("count", classOf[Int]) countState = getRuntimeContext .getState[Int](countDescriptor) val firstSeenDescriptor = new ValueStateDescriptor[Boolean]("firstSent", classOf[Boolean]) firstSent = getRuntimeContext .getState[Boolean](firstSeenDescriptor) val resetTimeDescriptor = new ValueStateDescriptor[Long]("resetTime", classOf[Long]) resetTime = getRuntimeContext .getState[Long](resetTimeDescriptor) } override def processElement(value: AdLog, ctx: KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount]#Context, out: Collector[ProvinceWithCount]): Unit = { val curCount = countState.value() // 第一次處理登記timer,24:00清除state if (curCount == 0) { val time = (ctx.timerService().currentProcessingTime() / 86400000 + 1) * 86400000 resetTime.update(time) ctx.timerService().registerProcessingTimeTimer(time) } // 加入黑名單,並在side output輸出,但只輸出一次 if (curCount >= 100) { // 默認初始爲false if (!firstSent.value()) { firstSent.update(true) ctx.output(Launcher.realTimeBlackList, value.userid.toString) } return } // 點擊次數+1 countState.update(curCount + 1) out.collect(ProvinceWithCount(value.province, 1,1)) } // 到達預約時間時清除state override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Int, Int), AdLog, ProvinceWithCount]#OnTimerContext, out: Collector[ProvinceWithCount]): Unit = { if (timestamp == resetTime.value()) { firstSent.clear() countState.clear() } } }
代碼形式和上面的類幾乎同樣
class AccProvClick extends KeyedProcessFunction[String, ProvinceWithCount, ProvinceWithCount] { // 存儲當前province鍵值的廣告點擊量 var countState: ValueState[Int] = _ var resetTime: ValueState[Long] = _ override def open //和上面相似 override def processElement(value: ProvinceWithCount, ctx: KeyedProcessFunction[String, ProvinceWithCount, ProvinceWithCount]#Context, out: Collector[ProvinceWithCount]): Unit = { // 和上面相似,若是countState值爲0,先設置timer val curCount = countState.value() + 1 countState.update(curCount) out.collect(ProvinceWithCount(value.province, curCount, 1)) } override def onTimer // 和上面相似 }
class BetterGenerateTopK(n: Int) extends KeyedProcessFunction[Int, ProvinceWithCount, String] { // 存儲各省的廣告點擊量 var prov2clickTable : MapState[String, Int] = _ var resetTime: ValueState[Long] = _ // 每積累到100條更新就發送一次排名結果 var sendFlag : Int = 0 override def open(parameters: Configuration): Unit = { val prov2clickDescriptor = new MapStateDescriptor[String, Int]("statistic", classOf[String], classOf[Int]) prov2clickTable = getRuntimeContext .getMapState[String, Int](prov2clickDescriptor) val resetTimeDescriptor = // 上面相似 } override def processElement(value: ProvinceWithCount, ctx: KeyedProcessFunction[Int, ProvinceWithCount, String]#Context, out: Collector[String]): Unit = { if (!prov2clickTable.iterator().hasNext) { val time = (ctx.timerService().currentProcessingTime() / 86400000 + 1) * 86400000 resetTime.update(time) ctx.timerService().registerProcessingTimeTimer(time) } prov2clickTable.put(value.province, value.count) sendFlag += 1 if (sendFlag % 100 == 0){ sendFlag = 0 val res = new StringBuilder prov2clickTable.iterator() .asScala .toArray .sortBy(_.getValue) .takeRight(n) .foreach(x => res.append(x.getKey + x.getValue)) out.collect(res.toString()) } } override def onTimer // 和上面相似 }