Flink實戰,實時流量統計 TOPN訪問URL

http://www.javashuo.com/article/p-xszcshqv-ny.html 相似場景,來從Nginx、Apache等web服務器的日誌中讀取數據,實時統計出來訪問熱度最高的TOPN訪問URL,而且要確保數據亂序的處理,lag等狀況下,還要確認數據的準確性css

目標:

從log文件中讀取數據(也能夠參考上一篇從kakfa中),取http 的method爲get的請求,而且把靜態文件訪問過濾掉,進行實時統計
    實現:
    一、讀取文件
    二、作過濾,method=get  url不爲靜態信息
    三、生成一個滑動窗口,大小10分鐘,每次滑動5s,watermask 5s(爲了保險容許數據延遲,allowedLateness 1分鐘)
    四、進行聚合統計分析排序輸出

準備日誌文件:

在resource目錄下生成一個nginx.log裏面內容:
1.1.1.1 - - 23/03/2020:05:06:03 GET /mapengfei/2580330
1.1.1.1 - - 23/03/2020:05:06:05 GET /mapengfei/2572888
1.1.1.3 - - 24/03/2020:05:06:05 GET /mapengfei/2572888

代碼:

新建一個HotUrlAnalysis.scal的object文件java

/*
 *
 * @author mafei
 * @date 2021/1/3
*/
package com.mafei.hotUrlAnalysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer

// 定義要提取的數據格式
case class NginxLog(clientIp: String, userId: String,ts:Long,method:String,url:String)
// 定義窗口聚合結果樣例類
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

object HotUrlAnalysis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間
    env.setParallelism(1) //防止亂序
    //一、從文件中讀取數據
    val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log")
    val dataStream = inputStream
      .map(data=>{
        val splitResult = data.split(" ") //由於日誌格式是以空格分隔的

        //由於日誌中格式是字符串的,咱們須要的是13位毫秒的時間戳,因此須要轉換下
        val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:時:分:秒
        val ts = dateFormatConvert.parse(splitResult(3)).getTime
        NginxLog(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5))
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog](Time.seconds(5)) {  //這裏設置watermark延遲5秒
        override def extractTimestamp(t: NginxLog): Long = t.ts   //指定時間事件的時間戳
      })

    //開窗聚合,排序輸出
    var aggStream = dataStream
      .filter(_.method == "GET")//過濾下,只要method是get請求的數據
      .filter(data=>{
        val pattern= "^((?!\\.(css|js)$).)*$".r
        (pattern findFirstIn data.url).nonEmpty
      }) //再過濾下,像css/js之類的url不算
//      .keyBy("url") //這樣子寫返回的是個元組類型
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .allowedLateness(Time.minutes(1)) //能夠watermark時間設置小一點,到時間先輸出,可是窗口先不關,等到allowedLateness的時間了再關
      .sideOutputLateData(new OutputTag[NginxLog]("late")) //加一個側輸出流,爲了防止數據的亂序超過了1分鐘
      .aggregate(new PageCountAgg(),new PageViewCountResult())
    val resultStream = aggStream
      .keyBy(_.windowEnd) //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據
      .process(new TopUrl(10))

    resultStream.print()

    aggStream.getSideOutput(new OutputTag[NginxLog]("late")).print("這是1分鐘以後的延遲數據。。。。")
    env.execute("執行熱門url訪問統計")

  }

}

class PageCountAgg() extends AggregateFunction[NginxLog,Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(in: NginxLog, acc: Long): Long = acc +1

  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc+acc1
}

class PageViewCountResult() extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {

    out.collect(UrlViewCount(key,window.getEnd,input.iterator.next()))
  }
}

/**
 * 輸入參數
 * K: 排序的字段類型,這裏是WindowEnd時間戳,因此是Long類型
 * I: 輸入的數據,是上一步PageViewCountResult輸出的類型,因此是UrlViewCount
 * O: 輸出的類型,這個本身定義,看實際狀況,這裏直接打印了,因此String
 */
class TopUrl(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{
  lazy val urlViewCountListState: ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("urlViewCountList", classOf[UrlViewCount]))
  override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
    urlViewCountListState.add(i)  //把每次的結果都加到自定義的list裏頭,方便後邊作排序
    context.timerService().registerEventTimeTimer(i.windowEnd) //註冊一個定時器,在窗口關閉的時候觸發
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    //爲了方便排序,定義另外一個ListBuffer,保存ListState的全部數據
    val allPageListBuffer: ListBuffer[UrlViewCount] = ListBuffer()
    val iter = urlViewCountListState.get().iterator()
    while (iter.hasNext){
      allPageListBuffer += iter.next()
    }

    //清空ListState的數據,已經放到urlViewCountListState 準備計算了,等下次觸發就應該是新的了
    urlViewCountListState.clear()

    // 先按照count,從大到小排序,而後再取前N個
    val sortItemViewCounts = allPageListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN)

    //格式化輸出數據:
    val result : StringBuilder = new StringBuilder
    result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp)).append("\n")

    //遍歷結果列表中的每一個ItemViewCount , 輸出到一行
    for(i <- sortItemViewCounts.indices){
      val currentItemViewCount = sortItemViewCounts(i)
      result.append("Top").append(i+1).append("\t")
        .append("URL = ").append(currentItemViewCount.url).append("\t")
        .append("訪問量: ").append(currentItemViewCount.count).append("\n")
    }

    result.append("---------------------------------------\n\n\n")
    Thread.sleep(1000)
    out.collect(result.toString())

  }
}

代碼結構及輸出效果:

Flink實戰,實時流量統計 TOPN訪問URL

問題點

在數據亂序的狀況下,雖然能所有輸出,但有2個問題點上面的代碼,
一個是在TopUrl 中保存數據用的是list,在滑動窗口先到達,延遲數據過會兒到達的時候,數據會重複輸出,也就是url會出現2次
第二個問題是在第二次延遲輸出的時候,原本應該加上以前的數據,可是沒有,而是從新從0開始計算
最終效果:nginx

URL 出現次數 出現緣由
/a 3 在5秒內統計數據輸出的
/a 1 allowedLateness延遲數據達到產生的

解決辦法:

從list改成map,而且由於以前每次都會清空list,能夠改成等真正的窗口結束後再清空就能夠了web

主要改動的地方:
processElement 和onTimer這2個方法sql

/*
 *
 * @author mafei
 * @date 2021/1/3
*/
package com.mafei.hotUrlAnalysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import java.text.SimpleDateFormat
import scala.collection.mutable.ListBuffer

/**
 * import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import java.sql.Timestamp
import java.util.Properties
import scala.collection.mutable.ListBuffer
 * @param clientIp
 * @param userId
 * @param ts
 * @param method
 * @param url
 */
// 定義要提取的數據格式
case class NginxLog2(clientIp: String, userId: String,ts:Long,method:String,url:String)
// 定義窗口聚合結果樣例類
case class UrlViewCount2(url: String, windowEnd: Long, count: Long)

object HotUrlAnalysis2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義取事件時間
    env.setParallelism(1) //防止亂序
    //一、從文件中讀取數據
    val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log")
    val dataStream = inputStream
      .map(data=>{
        val splitResult = data.split(" ") //由於日誌格式是以空格分隔的

        //由於日誌中格式是字符串的,咱們須要的是13位毫秒的時間戳,因此須要轉換下
        val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:時:分:秒
        val ts = dateFormatConvert.parse(splitResult(3)).getTime
        NginxLog2(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5))
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog2](Time.seconds(5)) {  //這裏設置watermark延遲5秒
        override def extractTimestamp(t: NginxLog2): Long = t.ts   //指定時間事件的時間戳
      })

    //開窗聚合,排序輸出
    var aggStream = dataStream
      .filter(_.method == "GET")//過濾下,只要method是get請求的數據
      .filter(data=>{
        val pattern= "^((?!\\.(css|js)$).)*$".r
        (pattern findFirstIn data.url).nonEmpty
      }) //再過濾下,像css/js之類的url不算
//      .keyBy("url") //這樣子寫返回的是個元組類型
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .allowedLateness(Time.minutes(1)) //能夠watermark時間設置小一點,到時間先輸出,可是窗口先不關,等到allowedLateness的時間了再關
      .sideOutputLateData(new OutputTag[NginxLog2]("late")) //加一個側輸出流,爲了防止數據的亂序超過了1分鐘
      .aggregate(new PageCountAgg2(),new PageViewCountResult2())
    val resultStream = aggStream
      .keyBy(_.windowEnd) //按照結束時間進行分組,收集當前窗口內的,取必定時間內的數據
      .process(new TopUrl2(10))

    resultStream.print()

    aggStream.getSideOutput(new OutputTag[NginxLog2]("late")).print("這是1分鐘以後的延遲數據。。。。")
    env.execute("執行熱門url訪問統計")

  }

}

class PageCountAgg2() extends AggregateFunction[NginxLog2,Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(in: NginxLog2, acc: Long): Long = acc +1

  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc+acc1
}

class PageViewCountResult2() extends WindowFunction[Long,UrlViewCount2,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount2]): Unit = {

    out.collect(UrlViewCount2(key,window.getEnd,input.iterator.next()))
  }
}

/**
 * 輸入參數
 * K: 排序的字段類型,這裏是WindowEnd時間戳,因此是Long類型
 * I: 輸入的數據,是上一步PageViewCountResult2輸出的類型,因此是UrlViewCount2
 * O: 輸出的類型,這個本身定義,看實際狀況,這裏直接打印了,因此String
 */
class TopUrl2(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount2,String]{
  lazy val UrlViewCount2MapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("UrlViewCount2Map",classOf[String],classOf[Long]))

  override def processElement(i: UrlViewCount2, context: KeyedProcessFunction[Long, UrlViewCount2, String]#Context, collector: Collector[String]): Unit = {
    UrlViewCount2MapState.put(i.url,i.count)
    context.timerService().registerEventTimeTimer(i.windowEnd) //註冊一個定時器,在窗口關閉的時候觸發
  //再另外註冊一個定時器,1分鐘以後觸發,這時窗口已經完全關閉,再也不有聚合結果輸出,能夠清空狀態
    context.timerService().registerEventTimeTimer(i.windowEnd + 60000L)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount2, String]#OnTimerContext, out: Collector[String]): Unit = {

    /**
     * 使用mapState的方式
     */
      //判判定時器觸發時間,若是已是窗口結束時間1分鐘以後,那麼直接清空狀態
      if(timestamp == ctx.getCurrentKey+60000L){
        UrlViewCount2MapState.clear()
        return
      }
    val allUrlViewCount2sBuffer: ListBuffer[(String,Long)] = ListBuffer()

    val iter = UrlViewCount2MapState.entries().iterator()
    while (iter.hasNext){
      val entry = iter.next()
      allUrlViewCount2sBuffer += ((entry.getKey, entry.getValue))
    }

        // 先按照count,從大到小排序,而後再取前N個
        val sortItemViewCounts = allUrlViewCount2sBuffer.sortBy(_._2)(Ordering.Long.reverse).take(topN)

        //格式化輸出數據:
        val result : StringBuilder = new StringBuilder
        result.append("當前窗口的結束時間:\t").append(new Timestamp(timestamp)).append("\n")

        //遍歷結果列表中的每一個ItemViewCount , 輸出到一行
        for(i <- sortItemViewCounts.indices){
          val currentItemViewCount = sortItemViewCounts(i)
          result.append("Top").append(i+1).append("\t")
            .append("URL = ").append(currentItemViewCount._1).append("\t")
            .append("訪問量: ").append(currentItemViewCount._2).append("\n")
        }

    result.append("---------------------------------------\n\n\n")
    Thread.sleep(1000)
    out.collect(result.toString())

  }
}
相關文章
相關標籤/搜索