網站通常都須要根據廣告點擊量來制定對應的訂價策略和調整市場推廣的方式,通常也會收集用戶的一些偏好和其餘信息,這裏實現一個統計不一樣省份/或者市用戶對不一樣廣告的點擊狀況,有助於市場部對於廣告的更精準投放,而且要防止有人惡意點擊,不停的點同一個廣告(固然同一個ip一直點不一樣的廣告也是同樣)java
準備的日誌文件ClickLog.csv:sql
543462,1715,beijing,beijing,1512652431 543461,1713,shanghai,shanghai,1512652433 543464,1715,shanxi,xian,1512652435 543464,1715,shanxi,weinan,1512652441 543464,1715,shanxi,weinan,1512652442 543464,1715,shanxi,weinan,1512652443 543464,1715,shanxi,weinan,1512652444 543464,1715,shanxi,weinan,1512652445 543464,1715,shanxi,weinan,1512652446 543464,1715,shanxi,weinan,1512652447 543464,1715,shanxi,weinan,1512652451 543464,1715,shanxi,weinan,1512652452 543464,1715,shanxi,weinan,1512652453 543464,1715,shanxi,weinan,1512652454 543464,1715,shanxi,weinan,1512652455 543464,1715,shanxi,weinan,1512652456 543464,1715,shanxi,weinan,1512652457 543464,1715,shanxi,hanzhong,1512652461 543464,1715,shanxi,yanan,1512652561
代碼:apache
/* * * @author mafei * @date 2021/1/10 */ package com.mafei.market_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp /** * 定義輸入的樣例類 * 543464,1715,shanxi,weinan,1512652459 */ case class AdClickLog(userId: Long,adId: Long,province: String, city: String,timestamp:Long) /** * 定義輸出的樣例類 * 統計每一個省對每一個廣告的點擊量 */ case class AdClickCountByProvince(windowEnd: String,province: String, count: Long) object AdClickAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件時間爲窗口和watermark的時間 env.setParallelism(1) //從文件中讀取數據 val resource = getClass.getResource("/ClickLog.csv") val inputStream = env.readTextFile(resource.getPath) // 轉換成樣例類,並提取時間戳watermark val adLogStream = inputStream .map(d=>{ val arr = d.split(",") AdClickLog(arr(0).toLong,arr(1).toLong,arr(2),arr(3),arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 定義窗口,聚合統計 val adCountResultStream = adLogStream .keyBy(_.province) .timeWindow(Time.days(1),Time.seconds(50)) .aggregate(new AdCountAgg(), new AdCountWindowResult()) adCountResultStream.print() env.execute("統計廣告點擊狀況") } } class AdCountAgg() extends AggregateFunction[AdClickLog, Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: AdClickLog, acc: Long): Long = acc+1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountWindowResult() extends WindowFunction[Long,AdClickCountByProvince,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = { out.collect(AdClickCountByProvince(windowEnd = new Timestamp(window.getEnd).toString, province = key, count = input.head)) } }
上面代碼中,同一個用戶的重複點擊是會疊加計算的,在實際生產場景中,同一個用戶可能會重複點開某一個廣告,可是若是用戶在一段時間內很是頻繁的點擊廣告,這明顯不是個正常行爲了,在刷點擊量,因此能夠作個限制,好比同一個廣告,同一我的天天最多點100次,超過了就把這個用戶加到黑名單裏頭並告警,後邊的點擊行爲就再也不統計了
那來個改進的版本:windows
/* * * @author mafei * @date 2021/1/10 */ package com.mafei.market_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp /** * 定義輸入的樣例類 * 543464,1715,shanxi,weinan,1512652459 */ case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long) /** * 定義輸出的樣例類 * 統計每一個省對每一個廣告的點擊量 */ case class AdClickCountByProvince(windowEnd: String, province: String, count: Long) /** * 黑名單預警輸出的樣例類 */ case class UserBlackListWarning(userId: String, adId: String, msg: String) object AdClickAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件時間爲窗口和watermark的時間 env.setParallelism(1) //從文件中讀取數據 val resource = getClass.getResource("/ClickLog.csv") val inputStream = env.readTextFile(resource.getPath) // 轉換成樣例類,並提取時間戳watermark val adLogStream = inputStream .map(d => { val arr = d.split(",") AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 插入一步操做,把有刷單行爲的用戶信息輸出到黑名單(側輸出流中)並作過濾 val userBlackListFilterStream: DataStream[AdClickLog] = adLogStream .keyBy(data => { (data.userId, data.adId) }) .process(new FilterUserBlackListResult(10L)) // 定義窗口,聚合統計 val adCountResultStream = userBlackListFilterStream .keyBy(_.province) .timeWindow(Time.days(1), Time.seconds(50)) .aggregate(new AdCountAgg(), new AdCountWindowResult()) adCountResultStream.print() //打印測輸出流 userBlackListFilterStream.getSideOutput(new OutputTag[UserBlackListWarning]("warning")).print("測輸出流") env.execute("統計廣告點擊狀況") } } class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long] { override def createAccumulator(): Long = 0L override def add(in: AdClickLog, acc: Long): Long = acc + 1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = { out.collect(AdClickCountByProvince(windowEnd = new Timestamp(window.getEnd).toString, province = key, count = input.head)) } } /** * key是上面定義的二元組 * 輸入和輸出不變,只是作過濾 */ class FilterUserBlackListResult(macCount: Long) extends KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog] { /** * 定義狀態,保存每個用戶對每一個廣告的點擊量 */ lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long])) /** * 定義天天0點定時清空狀態的時間戳 */ lazy val resetTimeTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("resetTs", classOf[Long])) /** * 定義用戶有沒有進入黑名單 */ lazy val isBlackList: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isBlackList", classOf[Boolean])) override def processElement(i: AdClickLog, context: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = { val curCount = countState.value() //初始狀態 if(curCount == 0){ /** * 獲取明天0點的時間戳,用來註冊定時器,明天0點把狀態所有置空 * * * 獲取明天的天數: context.timerService().currentProcessingTime()/(1000*60*60*24)+1 * * (24*60*60*1000) 是轉換成明天0點的時間戳 * - 8*60*60*1000 是從倫敦時間轉爲東8區 * */ val ts = (context.timerService().currentProcessingTime()/(1000*60*60*24)+1) * (24*60*60*1000) - 8*60*60*1000 context.timerService().registerProcessingTimeTimer(ts) resetTimeTsState.update(ts) //定義重置的時間點 } //判斷次數是否是超過了定義的閾值,若是超過了那就輸出到側輸出流 if(curCount > macCount){ // println("超出閾值了,curCount:"+curCount + " isBlackList:"+isBlackList.value()) //判斷下,是否是在黑名單裏頭,沒有的話才輸出到側輸出流,不然就會重複輸出 if(!isBlackList.value()){ isBlackList.update(true) context.output(new OutputTag[UserBlackListWarning]("warning"),UserBlackListWarning(i.userId.toString,i.adId.toString,curCount+"超過了出現的次數"+macCount)) } return } //正常狀況,每次都計數加1,而後把數據原樣輸出,畢竟這裏只是爲了裹一層 countState.update(curCount +1) collector.collect(i) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = { if(timestamp == resetTimeTsState.value()){ isBlackList.clear() countState.clear() } } }