跟 http://www.javashuo.com/article/p-xszcshqv-ny.html 相似場景,來從Nginx、Apache等web服務器的日誌中讀取數據,實時統計出來訪問熱度最高的TOPN訪問URL,而且要確保數據亂序的處理,lag等狀況下,還要確認數據的準確性css
從log文件中讀取數據(也能夠參考上一篇從kakfa中),取http 的method爲get的請求,而且把靜態文件訪問過濾掉,進行實時統計 實現: 一、讀取文件 二、作過濾,method=get url不爲靜態信息 三、生成一個滑動窗口,大小10分鐘,每次滑動5s,watermask 5s(爲了保險容許數據延遲,allowedLateness 1分鐘) 四、進行聚合統計分析排序輸出
在resource目錄下生成一個nginx.log裏面內容: 1.1.1.1 - - 23/03/2020:05:06:03 GET /mapengfei/2580330 1.1.1.1 - - 23/03/2020:05:06:05 GET /mapengfei/2572888 1.1.1.3 - - 24/03/2020:05:06:05 GET /mapengfei/2572888
新建一個HotUrlAnalysis.scal的object文件java
/* * * @author mafei * @date 2021/1/3 */ package com.mafei.hotUrlAnalysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import java.text.SimpleDateFormat import scala.collection.mutable.ListBuffer // 定義要提取的數據格式 case class NginxLog(clientIp: String, userId: String,ts:Long,method:String,url:String) // 定義窗口聚合結果樣例類 case class UrlViewCount(url: String, windowEnd: Long, count: Long) object HotUrlAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間 env.setParallelism(1) //防止亂序 //一、從文件中讀取數據 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log") val dataStream = inputStream .map(data=>{ val splitResult = data.split(" ") //由於日誌格式是以空格分隔的 //由於日誌中格式是字符串的,咱們須要的是13位毫秒的時間戳,因此須要轉換下 val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:時:分:秒 val ts = dateFormatConvert.parse(splitResult(3)).getTime NginxLog(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog](Time.seconds(5)) { //這裏設置watermark延遲5秒 override def extractTimestamp(t: NginxLog): Long = t.ts //指定時間事件的時間戳 }) //開窗聚合,排序輸出 var aggStream = dataStream .filter(_.method == "GET")//過濾下,只要method是get請求的數據 .filter(data=>{ val pattern= "^((?!\\.(css|js)$).)*$".r (pattern findFirstIn data.url).nonEmpty }) //再過濾下,像css/js之類的url不算 // .keyBy("url") //這樣子寫返回的是個元組類型 .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) //能夠watermark時間設置小一點,到時間先輸出,可是窗口先不關,等到allowedLateness的時間了再關 .sideOutputLateData(new OutputTag[NginxLog]("late")) //加一個側輸出流,爲了防止數據的亂序超過了1分鐘 .aggregate(new PageCountAgg(),new PageViewCountResult()) val resultStream = aggStream .keyBy(_.windowEnd) //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據 .process(new TopUrl(10)) resultStream.print() aggStream.getSideOutput(new OutputTag[NginxLog]("late")).print("這是1分鐘以後的延遲數據。。。。") env.execute("執行熱門url訪問統計") } } class PageCountAgg() extends AggregateFunction[NginxLog,Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: NginxLog, acc: Long): Long = acc +1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc+acc1 } class PageViewCountResult() extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = { out.collect(UrlViewCount(key,window.getEnd,input.iterator.next())) } } /** * 輸入參數 * K: 排序的字段類型,這裏是WindowEnd時間戳,因此是Long類型 * I: 輸入的數據,是上一步PageViewCountResult輸出的類型,因此是UrlViewCount * O: 輸出的類型,這個本身定義,看實際狀況,這裏直接打印了,因此String */ class TopUrl(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{ lazy val urlViewCountListState: ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("urlViewCountList", classOf[UrlViewCount])) override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = { urlViewCountListState.add(i) //把每次的結果都加到自定義的list裏頭,方便後邊作排序 context.timerService().registerEventTimeTimer(i.windowEnd) //註冊一個定時器,在窗口關閉的時候觸發 } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //爲了方便排序,定義另外一個ListBuffer,保存ListState的全部數據 val allPageListBuffer: ListBuffer[UrlViewCount] = ListBuffer() val iter = urlViewCountListState.get().iterator() while (iter.hasNext){ allPageListBuffer += iter.next() } //清空ListState的數據,已經放到urlViewCountListState 準備計算了,等下次觸發就應該是新的了 urlViewCountListState.clear() // 先按照count,從大到小排序,而後再取前N個 val sortItemViewCounts = allPageListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN) //格式化輸出數據: val result : StringBuilder = new StringBuilder result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp)).append("\n") //遍歷結果列表中的每一個ItemViewCount , 輸出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("URL = ").append(currentItemViewCount.url).append("\t") .append("訪問量: ").append(currentItemViewCount.count).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }
在數據亂序的狀況下,雖然能所有輸出,但有2個問題點上面的代碼,
一個是在TopUrl 中保存數據用的是list,在滑動窗口先到達,延遲數據過會兒到達的時候,數據會重複輸出,也就是url會出現2次
第二個問題是在第二次延遲輸出的時候,原本應該加上以前的數據,可是沒有,而是從新從0開始計算
最終效果:nginx
URL | 出現次數 | 出現緣由 |
---|---|---|
/a | 3 | 在5秒內統計數據輸出的 |
/a | 1 | allowedLateness延遲數據達到產生的 |
從list改成map,而且由於以前每次都會清空list,能夠改成等真正的窗口結束後再清空就能夠了web
主要改動的地方:
processElement 和onTimer這2個方法sql
/* * * @author mafei * @date 2021/1/3 */ package com.mafei.hotUrlAnalysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import java.text.SimpleDateFormat import scala.collection.mutable.ListBuffer /** * import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import java.sql.Timestamp import java.util.Properties import scala.collection.mutable.ListBuffer * @param clientIp * @param userId * @param ts * @param method * @param url */ // 定義要提取的數據格式 case class NginxLog2(clientIp: String, userId: String,ts:Long,method:String,url:String) // 定義窗口聚合結果樣例類 case class UrlViewCount2(url: String, windowEnd: Long, count: Long) object HotUrlAnalysis2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間 env.setParallelism(1) //防止亂序 //一、從文件中讀取數據 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log") val dataStream = inputStream .map(data=>{ val splitResult = data.split(" ") //由於日誌格式是以空格分隔的 //由於日誌中格式是字符串的,咱們須要的是13位毫秒的時間戳,因此須要轉換下 val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:時:分:秒 val ts = dateFormatConvert.parse(splitResult(3)).getTime NginxLog2(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog2](Time.seconds(5)) { //這裏設置watermark延遲5秒 override def extractTimestamp(t: NginxLog2): Long = t.ts //指定時間事件的時間戳 }) //開窗聚合,排序輸出 var aggStream = dataStream .filter(_.method == "GET")//過濾下,只要method是get請求的數據 .filter(data=>{ val pattern= "^((?!\\.(css|js)$).)*$".r (pattern findFirstIn data.url).nonEmpty }) //再過濾下,像css/js之類的url不算 // .keyBy("url") //這樣子寫返回的是個元組類型 .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) //能夠watermark時間設置小一點,到時間先輸出,可是窗口先不關,等到allowedLateness的時間了再關 .sideOutputLateData(new OutputTag[NginxLog2]("late")) //加一個側輸出流,爲了防止數據的亂序超過了1分鐘 .aggregate(new PageCountAgg2(),new PageViewCountResult2()) val resultStream = aggStream .keyBy(_.windowEnd) //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據 .process(new TopUrl2(10)) resultStream.print() aggStream.getSideOutput(new OutputTag[NginxLog2]("late")).print("這是1分鐘以後的延遲數據。。。。") env.execute("執行熱門url訪問統計") } } class PageCountAgg2() extends AggregateFunction[NginxLog2,Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: NginxLog2, acc: Long): Long = acc +1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc+acc1 } class PageViewCountResult2() extends WindowFunction[Long,UrlViewCount2,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount2]): Unit = { out.collect(UrlViewCount2(key,window.getEnd,input.iterator.next())) } } /** * 輸入參數 * K: 排序的字段類型,這裏是WindowEnd時間戳,因此是Long類型 * I: 輸入的數據,是上一步PageViewCountResult2輸出的類型,因此是UrlViewCount2 * O: 輸出的類型,這個本身定義,看實際狀況,這裏直接打印了,因此String */ class TopUrl2(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount2,String]{ lazy val UrlViewCount2MapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("UrlViewCount2Map",classOf[String],classOf[Long])) override def processElement(i: UrlViewCount2, context: KeyedProcessFunction[Long, UrlViewCount2, String]#Context, collector: Collector[String]): Unit = { UrlViewCount2MapState.put(i.url,i.count) context.timerService().registerEventTimeTimer(i.windowEnd) //註冊一個定時器,在窗口關閉的時候觸發 //再另外註冊一個定時器,1分鐘以後觸發,這時窗口已經完全關閉,再也不有聚合結果輸出,能夠清空狀態 context.timerService().registerEventTimeTimer(i.windowEnd + 60000L) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount2, String]#OnTimerContext, out: Collector[String]): Unit = { /** * 使用mapState的方式 */ //判判定時器觸發時間,若是已是窗口結束時間1分鐘以後,那麼直接清空狀態 if(timestamp == ctx.getCurrentKey+60000L){ UrlViewCount2MapState.clear() return } val allUrlViewCount2sBuffer: ListBuffer[(String,Long)] = ListBuffer() val iter = UrlViewCount2MapState.entries().iterator() while (iter.hasNext){ val entry = iter.next() allUrlViewCount2sBuffer += ((entry.getKey, entry.getValue)) } // 先按照count,從大到小排序,而後再取前N個 val sortItemViewCounts = allUrlViewCount2sBuffer.sortBy(_._2)(Ordering.Long.reverse).take(topN) //格式化輸出數據: val result : StringBuilder = new StringBuilder result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp)).append("\n") //遍歷結果列表中的每一個ItemViewCount , 輸出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("URL = ").append(currentItemViewCount._1).append("\t") .append("訪問量: ").append(currentItemViewCount._2).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }