前言
前面的第二講,咱們說過要介紹flink的水印,觸發器相關概念。如今讓咱們先了解一下水印,觸發器,遲到生存週期的概念。這裏的概念有點抽象,須要動腦筋去理解。java
-事件時間,進入時間,處理時間apache
在理解水印以前,咱們須要先行介紹flink裏面的三個時間:時間時間,進入時間,處理時間。
下面先看一張圖:
這裏我自問自答一下:
問:爲何會要有事件時間和處理時間?
答:假設生產者生存消息之後,因爲網絡延遲或者其餘因素,咱們(flink)拿到數據的時間老是晚於生產者生產消息的時間的。那麼這個時間間隔,總該有個約束吧?好比我(flink)等10s或者更久,那麼在這10秒鐘之內到達的數據,咱們稱之爲早到或者按時到達的數據,對於10秒之後到的數據咱們稱之爲遲到數據。按時,早到的數據咱們均可以正常處理,那麼遲到的數據該怎麼辦呢?是否丟棄?或者將這些數據存放在某個地方後續統一處理?...這些flink都爲咱們考慮到了,而且有相應的類和方法,輪子已經造好,僅僅須要你去揚帆...哈哈,扯遠了..json
如上圖,咱們以從隊列讀取數據爲例,事件時間是生產者產生數據的時候,存入數據的。進入時間是咱們從datasource獲取到生產者的消息的時間,處理時間就是咱們真正處理這條數據的時間。相比較於事件時間,進入時間程序不一樣處理無序和遲到的事件,可是這個程序不必定義怎樣去生成水印。對於內部來講,進入時間更像是事件時間,可是有自動的時間戳分配和自動的水印生成。bootstrap
下面咱們將經過一個具體例子來了解水印,觸發器相關用法網絡
public class WatermarkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "172.19.141.60:31090"); properties.setProperty("group.id", "crm_stream_window"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("test-demo12", new SimpleStringSchema(), properties)); env.setParallelism(1); DataStream<Tuple3<String, Long, Integer>> inputMap = stream.map(new MapFunction<String, Tuple3<String, Long, Integer>>() { private static final long serialVersionUID = -8812094804806854937L; @Override public Tuple3<String, Long, Integer> map(String value) throws Exception { KafkaEntity kafkaEntity = JSON.parseObject(value, KafkaEntity.class); return new Tuple3(kafkaEntity.getName(), kafkaEntity.getCreate_time(), kafkaEntity.getId()); } }); DataStream<Tuple3<String, Long, Integer>> watermark = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() { private static final long serialVersionUID = 8252616297345284790L; Long currentMaxTimestamp = 0L; Long maxOutOfOrderness = 2000L;//最大容許的亂序時間是2s Watermark watermark = null; SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Nullable @Override public Watermark getCurrentWatermark() { watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness); return watermark; } @Override public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) { Long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); System.out.println("timestamp : " + element.f1 + "|" + format.format(element.f1) + " currentMaxTimestamp : " + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + " watermark : " + watermark.getTimestamp() + "|" + format.format(watermark.getTimestamp())); return timestamp; } }); OutputTag<Tuple3<String, Long, Integer>> lateOutputTag = new OutputTag<Tuple3<String, Long, Integer>>("late-data") { private static final long serialVersionUID = -1552769100986888698L; }; SingleOutputStreamOperator<String> resultStream = watermark .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .trigger(new Trigger<Tuple3<String, Long, Integer>, TimeWindow>() { private static final long serialVersionUID = 2742133264310093792L; ValueStateDescriptor<Integer> sumStateDescriptor = new ValueStateDescriptor<Integer>("sum", Integer.class); @Override public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ValueState<Integer> sumState = ctx.getPartitionedState(sumStateDescriptor); if (null == sumState.value()) { sumState.update(0); } sumState.update(element.f2 + sumState.value()); System.out.println(sumState.value()); // if (sumState.value() >= 2) { //這裏能夠選擇手動處理狀態 // 默認的trigger發送是TriggerResult.FIRE 不會清除窗口數據 // return TriggerResult.FIRE_AND_PURGE; return TriggerResult.FIRE_AND_PURGE; // } // return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); System.out.println("清理窗口狀態 | 窗口內保存值爲" + ctx.getPartitionedState(sumStateDescriptor).value()); ctx.getPartitionedState(sumStateDescriptor).clear(); } }) //若是使用allowedLateness會有重複計算的效果 //默認的trigger狀況下 // 在event time>window_end_time+watermark+allowedLateness時會觸發窗口的clear // 後續數據若是屬於該窗口並且數據的event_time>watermark-allowedLateness 會觸發從新計算 // //在使用自定義的trigger狀況下 //同一個窗口內只要知足要求能夠不停的觸發窗口數據往下流 //在event time>window_end_time+watermark+allowedLateness時會觸發窗口clear //後續數據若是屬於該窗口並且數據的event_time>watermark-allowedLateness 會觸發從新計算 // //窗口狀態的clear只和時間有關與是否自定義trigger無關 .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateOutputTag) .apply(new WindowFunction<Tuple3<String, Long, Integer>, String, Tuple, TimeWindow>() { private static final long serialVersionUID = 7813420265419629362L; @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Long, Integer>> input, Collector<String> out) throws Exception { for (Tuple3<String, Long, Integer> stringLongTuple2 : input) { System.out.println(stringLongTuple2.f1); } SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); out.collect("window " + format.format(window.getStart()) + " window " + format.format(window.getEnd())); System.out.println("-------------------------"); } }); resultStream.print(); resultStream.getSideOutput(lateOutputTag).print(); env.execute("window test"); } }
package cn.crawler.mft_seconed.demo4; import cn.crawler.mft_seconed.KafkaEntity; import cn.crawler.mft_seconed.demo2.SendDataToKafkaSql; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; import java.util.UUID; public class SendDataToKafkaDemo4 { public static void main(String[] args){ SendDataToKafkaDemo4 sendDataToKafkaDemo4 = new SendDataToKafkaDemo4(); for(int i=0;i<40;i++){ KafkaEntity build = KafkaEntity.builder().id(1).message("message"+i).create_time(System.currentTimeMillis()).name(""+1).build(); System.out.println(build.toString()); sendDataToKafkaDemo4.send("test-demo13", "123", JSON.toJSONString(build)); } } public void send(String topic,String key,String data){ Properties props = new Properties(); props.put("bootstrap.servers", "172.19.141.60:31090"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=1;i<2;i++){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>(topic, key+i, data)); } producer.close(); } }
下面看一下輸出數據:app
timestamp : 1564038394140|2019-07-25 15:06:34.140 currentMaxTimestamp : 1564038394140|2019-07-25 15:06:34.140, watermark : -2000|1970-01-01 07:59:58.000 1 1564038394140 window 2019-07-25 15:06:33.000 window 2019-07-25 15:06:36.000 ------------------------- timestamp : 1564038395056|2019-07-25 15:06:35.056 currentMaxTimestamp : 1564038395056|2019-07-25 15:06:35.056, watermark : 1564038392140|2019-07-25 15:06:32.140 2 1564038395056 window 2019-07-25 15:06:33.000 window 2019-07-25 15:06:36.000 ------------------------- timestamp : 1564038395363|2019-07-25 15:06:35.363 currentMaxTimestamp : 1564038395363|2019-07-25 15:06:35.363, watermark : 1564038393056|2019-07-25 15:06:33.056 3 1564038395363 window 2019-07-25 15:06:33.000 window 2019-07-25 15:06:36.000 ------------------------- timestamp : 1564038395786|2019-07-25 15:06:35.786 currentMaxTimestamp : 1564038395786|2019-07-25 15:06:35.786, watermark : 1564038393363|2019-07-25 15:06:33.363 4 1564038395786 window 2019-07-25 15:06:33.000 window 2019-07-25 15:06:36.000 ------------------------- timestamp : 1564038396216|2019-07-25 15:06:36.216 currentMaxTimestamp : 1564038396216|2019-07-25 15:06:36.216, watermark : 1564038393786|2019-07-25 15:06:33.786 1 1564038396216 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038396504|2019-07-25 15:06:36.504 currentMaxTimestamp : 1564038396504|2019-07-25 15:06:36.504, watermark : 1564038394216|2019-07-25 15:06:34.216 2 1564038396504 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038396960|2019-07-25 15:06:36.960 currentMaxTimestamp : 1564038396960|2019-07-25 15:06:36.960, watermark : 1564038394504|2019-07-25 15:06:34.504 3 1564038396960 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038397376|2019-07-25 15:06:37.376 currentMaxTimestamp : 1564038397376|2019-07-25 15:06:37.376, watermark : 1564038394960|2019-07-25 15:06:34.960 4 1564038397376 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038397755|2019-07-25 15:06:37.755 currentMaxTimestamp : 1564038397755|2019-07-25 15:06:37.755, watermark : 1564038395376|2019-07-25 15:06:35.376 5 1564038397755 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038398077|2019-07-25 15:06:38.077 currentMaxTimestamp : 1564038398077|2019-07-25 15:06:38.077, watermark : 1564038395755|2019-07-25 15:06:35.755 6 1564038398077 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038398511|2019-07-25 15:06:38.511 currentMaxTimestamp : 1564038398511|2019-07-25 15:06:38.511, watermark : 1564038396077|2019-07-25 15:06:36.077 7 1564038398511 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038398904|2019-07-25 15:06:38.904 currentMaxTimestamp : 1564038398904|2019-07-25 15:06:38.904, watermark : 1564038396511|2019-07-25 15:06:36.511 8 1564038398904 window 2019-07-25 15:06:36.000 window 2019-07-25 15:06:39.000 ------------------------- timestamp : 1564038399218|2019-07-25 15:06:39.218 currentMaxTimestamp : 1564038399218|2019-07-25 15:06:39.218, watermark : 1564038396904|2019-07-25 15:06:36.904 1 1564038399218 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038399635|2019-07-25 15:06:39.635 currentMaxTimestamp : 1564038399635|2019-07-25 15:06:39.635, watermark : 1564038397218|2019-07-25 15:06:37.218 2 1564038399635 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038399874|2019-07-25 15:06:39.874 currentMaxTimestamp : 1564038399874|2019-07-25 15:06:39.874, watermark : 1564038397635|2019-07-25 15:06:37.635 3 1564038399874 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038400261|2019-07-25 15:06:40.261 currentMaxTimestamp : 1564038400261|2019-07-25 15:06:40.261, watermark : 1564038397874|2019-07-25 15:06:37.874 4 1564038400261 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038400614|2019-07-25 15:06:40.614 currentMaxTimestamp : 1564038400614|2019-07-25 15:06:40.614, watermark : 1564038398261|2019-07-25 15:06:38.261 5 1564038400614 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038400935|2019-07-25 15:06:40.935 currentMaxTimestamp : 1564038400935|2019-07-25 15:06:40.935, watermark : 1564038398614|2019-07-25 15:06:38.614 6 1564038400935 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038401351|2019-07-25 15:06:41.351 currentMaxTimestamp : 1564038401351|2019-07-25 15:06:41.351, watermark : 1564038398935|2019-07-25 15:06:38.935 7 1564038401351 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- 清理窗口狀態 | 窗口內保存值爲4 //這裏!!!!!!觸發了觸發器的clear()操做 timestamp : 1564038401856|2019-07-25 15:06:41.856 currentMaxTimestamp : 1564038401856|2019-07-25 15:06:41.856, watermark : 1564038399351|2019-07-25 15:06:39.351 8 1564038401856 window 2019-07-25 15:06:39.000 window 2019-07-25 15:06:42.000 ------------------------- timestamp : 1564038402142|2019-07-25 15:06:42.142 currentMaxTimestamp : 1564038402142|2019-07-25 15:06:42.142, watermark : 1564038399856|2019-07-25 15:06:39.856 1 1564038402142 window 2019-07-25 15:06:42.000 window 2019-07-25 15:06:45.000 ------------------------- timestamp : 1564038402501|2019-07-25 15:06:42.501 currentMaxTimestamp : 1564038402501|2019-07-25 15:06:42.501, watermark : 1564038400142|2019-07-25 15:06:40.142 2 1564038402501 window 2019-07-25 15:06:42.000 window 2019-07-25 15:06:45.000 -------------------------
咱們分析一下以上代碼:
SendDataToKafkaDemo4類發送了40條數據進kafka,WatermarkTest會接到數據,並將其轉換爲java實體類。而後爲其添加水印(最大遲到時間是2S)。並將窗口劃分爲3s的固定大小窗口。根據第一個字段key by後,爲每一個key by 後的窗口設置(更新)state的值。當水印時間 = window end time + 3s時,觸動觸發器的clear方法,執行清除窗口數據的操做。固然,咱們也能夠看到,觸發器的重寫方法有好幾種,咱們能夠在本身須要的地方重寫方法。
例如:
第一個時間窗口: 15:06:33.000 - 15:06:36.000
第一個時間窗口最終value: 4
咱們拿到第一個時間窗口的最後時間 36s + 3s(allowedLateness時間) = 39 s 的時間點,即當水印達到15:06:39 000 時間點的時候,會執行窗口觸發器的clear方法,隨即,咱們在事件時間爲timestamp : 1564038401856|2019-07-25 15:06:41.856 的時候,水印時間戳已經達到了39s的時間點,即:
watermark : 1564038399351|2019-07-25 15:06:39.3518 是這個點 。此時觸發....dom