Flink統計當日的UV、PV

  Flink 統計當日的UV、PVjava

  測試環境:node

    flink 1.7.2git

  一、數據流程github

    

    a.模擬數據生成,發送到kafka(json 格式)  spring

    b.flink 讀取數據,countapache

    c. 輸出數據到kafka(爲了方便查看,輸出了一份到控制檯)json

  二、模擬數據生成器windows

    數據格式以下 : {"id" : 1, "createTime" : "2019-05-24 10:36:43.707"}api

    id 爲數據生成的序號(累加),時間爲數據時間(默認爲數據生成時間)ide

  模擬數據生成器代碼以下:

  

/**
  * test data maker
  */

object CurrentDayMaker {


  var minute : Int = 1
  val calendar: Calendar = Calendar.getInstance()

  /**
    * 一天時間比較長,不方便觀察,將時間改成當前時間,
    * 每次累加10分鐘,這樣一天只須要144次循環,也就是144秒
    * @return
    */
  def getCreateTime(): String = {
//    minute = minute + 1
    calendar.add(Calendar.MINUTE, 10)
    sdf.format(calendar.getTime)
  }
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  def main(args: Array[String]): Unit = {
    val producer = new KafkaProducer[String, String](Common.getProp)
  // 初始化開始時間爲當前時間 calendar.setTime(
new Date()) println(sdf.format(calendar.getTime)) var i =0; while (true) { // val map = Map("id"-> i, "createTime"-> sdf.format(System.currentTimeMillis())) val map = Map("id"-> i, "createTime"-> getCreateTime()) val jsonObject: JSONObject = new JSONObject(map) println(jsonObject.toString())     // topic current_day val msg = new ProducerRecord[String, String]("current_day", jsonObject.toString()) producer.send(msg) producer.flush()
    // 控制數據頻率 Thread.sleep(
1000) i = i + 1 } } }

  生成數據以下:  

{"id" : 0, "createTime" : "2019-05-24 18:02:26.292"}
{"id" : 1, "createTime" : "2019-05-24 18:12:26.292"}
{"id" : 2, "createTime" : "2019-05-24 18:22:26.292"}
{"id" : 3, "createTime" : "2019-05-24 18:32:26.292"}
{"id" : 4, "createTime" : "2019-05-24 18:42:26.292"}

  三、flink 程序 

  

package com.venn.stream.api.dayWindow

import java.io.File
import java.text.SimpleDateFormat

import com.venn.common.Common
import com.venn.source.TumblingEventTimeWindows
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousEventTimeTrigger}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

/**
  * Created by venn on 19-5-23.
  *
  * use TumblingEventTimeWindows count current day pv
  * for test, update day window to minute window
  *
  * .windowAll(TumblingEventTimeWindows.of(Time.minutes(1), Time.seconds(0)))
  * TumblingEventTimeWindows can ensure count o minute event,
  * and time start at 0 second (like : 00:00:00 to 00:00:59)
  *
  */
object CurrentDayPvCount {

  def main(args: Array[String]): Unit = {
    println(1558886400000L - (1558886400000L - 8 + 86400000) % 86400000)
    // environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    if ("\\".equals(File.pathSeparator)) {
      val rock = new RocksDBStateBackend(Common.CHECK_POINT_DATA_DIR)
      env.setStateBackend(rock)
      // checkpoint interval
      env.enableCheckpointing(10000)
    }

    val topic = "current_day"
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
    val kafkaSource = new FlinkKafkaConsumer[ObjectNode](topic, new JsonNodeDeserializationSchema(), Common.getProp)
    val sink = new FlinkKafkaProducer[String](topic + "_out", new SimpleStringSchema(), Common.getProp)
    sink.setWriteTimestampToKafka(true)

    val stream = env.addSource(kafkaSource)
      .map(node => {
        Event(node.get("id").asText(), node.get("createTime").asText())
      })
      //            .assignAscendingTimestamps(event => sdf.parse(event.createTime).getTime)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(60)) {
      override def extractTimestamp(element: Event): Long = {
        sdf.parse(element.createTime).getTime
      }
    })
      // window is one minute, start at 0 second
      //.windowAll(TumblingEventTimeWindows.of(Time.minutes(1), Time.seconds(0)))
      // window is one hour, start at 0 second 注意事件時間,須要事件觸發,在窗口結束的時候可能沒有數據,有數據的時候,已是下一個窗口了
      //      .windowAll(TumblingEventTimeWindows.of(Time.hours(1), Time.seconds(0)))
      // window is one day, start at 0 second, todo there have a bug(FLINK-11326), can't use negative number, 1.8 修復
      //      .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
      .windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
      // every event one minute
      //      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(3800)))
      // every process one minute
      //      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
      // every event, export current value,
      //      .trigger(CountTrigger.of(1))
      .reduce(new ReduceFunction[Event] {
      override def reduce(event1: Event, event2: Event): Event = {

        // 將結果中,id的最小值和最大值輸出
        new Event(event1.id, event2.id, event1.count + event2.count)
      }
    })
    // format output even, connect min max id, add current timestamp
    //      .map(event => Event(event.id + "-" + event.createTime, sdf.format(System.currentTimeMillis()), event.count))
    stream.print("result : ")
    // execute job
    env.execute("CurrentDayCount")
  }

}

case class Event(id: String, createTime: String, count: Int = 1) {}

 

  四、運行結果

  測試數據以下:    

{"id" : 0, "createTime" : "2019-05-24 20:29:49.102"}
{"id" : 1, "createTime" : "2019-05-24 20:39:49.102"}
...
{"id" : 20, "createTime" : "2019-05-24 23:49:49.102"}
{"id" : 21, "createTime" : "2019-05-24 23:59:49.102"}
{"id" : 22, "createTime" : "2019-05-25 00:09:49.102"}
{"id" : 23, "createTime" : "2019-05-25 00:19:49.102"}
...
{"id" : 163, "createTime" : "2019-05-25 23:39:49.102"}
{"id" : 164, "createTime" : "2019-05-25 23:49:49.102"}
{"id" : 165, "createTime" : "2019-05-25 23:59:49.102"}
{"id" : 166, "createTime" : "2019-05-26 00:09:49.102"}
...
{"id" : 308, "createTime" : "2019-05-26 23:49:49.102"}
{"id" : 309, "createTime" : "2019-05-26 23:59:49.102"}
{"id" : 310, "createTime" : "2019-05-27 00:09:49.102"}

0 - 21 是 24號

22 -  165 是 25 號

166 - 309 是 26 號

輸出結果(程序中reduce 方法,將窗口中第一條和最後一條數據的id,都放到 Event中 )以下:

  

與測試數據對應

五、說明

  不少人會錯誤的覺得,窗口時間的開始時間會是程序啓動(初始化)的時間。事實上,窗口(以TumblingEventTimeWindows爲例)的定義有兩個重載的方法:包含兩個參數,窗口的長度窗口的offset(默認爲0) 

源碼:org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows : 


@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            System.out.println("start : " + start + ", end : " + (start+size));
            String startStr =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start);
            String endStar =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start + size);
            System.out.println("window start: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start));
            System.out.println("window end: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(start + size));
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }/**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp.
     *
     * @param size The size of the generated windows.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size) { return new TumblingEventTimeWindows(size.toMilliseconds(), 0); } /**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp and offset.
     *
     * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
     * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
     * time windows start at 0:15:00,1:15:00,2:15:00,etc.
     *
     * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, * such as China which is using UTC+08:00,and you want a time window with size of one day, * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
     *
     * @param size The size of the generated windows.
     * @param offset The offset which window start would be shifted by.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size, Time offset) { return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); }
}

每條數據都會觸發: assignWindows 方法

計算函數以下:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

dubug 以下:

 

  六、特別說明

    FLink 1.6.3/1.7.1/1.7.2 在 TumblingEventTimeWindows 構造器上有個bug:offset 不能小於0, 可是of 方法中又說明,可使用: of(Time.days(1),Time.hours(-8)) 表示在中國的 0 點開始的一天窗口。

 JIRA : FLINK-11326 ,jira 上註明1.8.0 修復。(我原本準備提個bug的,有人先下手了)

 

這個bug 能夠經過本身建立一個相同包的相同類,將對應代碼修改便可。

flink 1.7.2 源碼:

protected TumblingEventTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
        }

        this.size = size;
        this.offset = offset;
    }

最新版源碼:

protected TumblingEventTimeWindows(long size, long offset) {
        if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
        }

        this.size = size;
        this.offset = offset;
    }

修改:

    

七、上面的案例主要講Flink 的窗口,pv、uv核心代碼以下:

.keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
      .evictor(TimeEvictor.of(Time.seconds(0), true))
      .process(new ProcessWindowFunction[(String, String), (String, String, Long), Tuple, TimeWindow] {
        /*
        這是使用state是由於,窗口默認只會在建立結束的時候觸發一次計算,而後數據結果,
        若是長時間的窗口,好比:一天的窗口,要是等到一天結束在輸出結果,那還不如跑批。
        全部大窗口會添加trigger,以必定的頻率輸出中間結果。
        加evictor 是由於,每次trigger,觸發計算是,窗口中的全部數據都會參與,因此數據會觸發不少次,比較浪費,加evictor 驅逐已經計算過的數據,就不會重複計算了
        驅逐了已經計算過的數據,致使窗口數據不徹底,因此須要state 存儲咱們須要的中間結果
         */
        var wordState: MapState[String, String] = _
        var pvCount: ValueState[Long] = _

        override def open(parameters: Configuration): Unit = {
          // new MapStateDescriptor[String, String]("word", classOf[String], classOf[String])
          wordState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("word", classOf[String], classOf[String]))
          pvCount = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pvCount", classOf[Long]))
        }

        override def process(key: Tuple, context: Context, elements: Iterable[(String, String)], out: Collector[(String, String, Long)]): Unit = {


          var pv = 0;
          val elementsIterator = elements.iterator
          // 遍歷窗口數據,獲取惟一word
          while (elementsIterator.hasNext) {
            pv += 1
            val word = elementsIterator.next()._2
            wordState.put(word, null)
          }
          // add current
          pvCount.update(pvCount.value() + pv)
          var count: Long = 0
          val wordIterator = wordState.keys().iterator()
          while (wordIterator.hasNext) {
            wordIterator.next()
            count += 1
          }
          // uv
          out.collect((key.getField(0), "uv", count))
          out.collect(key.getField(0), "pv", pv)

        }
      })

完整代碼見:  https://github.com/springMoon/flink-rookie/blob/master/src/main/scala/com/venn/demo/WordCountDistinct.scala

相關文章
相關標籤/搜索