flink學習系列--基礎知識學習(三)

前言

前面的第二講,咱們說過要介紹flink的水印,觸發器相關概念。如今讓咱們先了解一下水印,觸發器,遲到生存週期的概念。這裏的概念有點抽象,須要動腦筋去理解。java

-事件時間,進入時間,處理時間apache

在理解水印以前,咱們須要先行介紹flink裏面的三個時間:時間時間,進入時間,處理時間。
下面先看一張圖:
clipboard.png
這裏我自問自答一下:
問:爲何會要有事件時間和處理時間?
答:假設生產者生存消息之後,因爲網絡延遲或者其餘因素,咱們(flink)拿到數據的時間老是晚於生產者生產消息的時間的。那麼這個時間間隔,總該有個約束吧?好比我(flink)等10s或者更久,那麼在這10秒鐘之內到達的數據,咱們稱之爲早到或者按時到達的數據,對於10秒之後到的數據咱們稱之爲遲到數據。按時,早到的數據咱們均可以正常處理,那麼遲到的數據該怎麼辦呢?是否丟棄?或者將這些數據存放在某個地方後續統一處理?...這些flink都爲咱們考慮到了,而且有相應的類和方法,輪子已經造好,僅僅須要你去揚帆...哈哈,扯遠了..json

如上圖,咱們以從隊列讀取數據爲例,事件時間是生產者產生數據的時候,存入數據的。進入時間是咱們從datasource獲取到生產者的消息的時間,處理時間就是咱們真正處理這條數據的時間。相比較於事件時間,進入時間程序不一樣處理無序和遲到的事件,可是這個程序不必定義怎樣去生成水印。對於內部來講,進入時間更像是事件時間,可是有自動的時間戳分配和自動的水印生成。bootstrap

clipboard.png

下面咱們將經過一個具體例子來了解水印,觸發器相關用法網絡

public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "172.19.141.60:31090");
        properties.setProperty("group.id", "crm_stream_window");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        DataStream<String> stream =
                env.addSource(new FlinkKafkaConsumer011<>("test-demo12", new SimpleStringSchema(), properties));
        env.setParallelism(1);
        DataStream<Tuple3<String, Long, Integer>> inputMap = stream.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
            private static final long serialVersionUID = -8812094804806854937L;

            @Override
            public Tuple3<String, Long, Integer> map(String value) throws Exception {
                KafkaEntity kafkaEntity = JSON.parseObject(value, KafkaEntity.class);
                return new Tuple3(kafkaEntity.getName(), kafkaEntity.getCreate_time(), kafkaEntity.getId());
            }
        });
        DataStream<Tuple3<String, Long, Integer>> watermark =
                inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {

                    private static final long serialVersionUID = 8252616297345284790L;
                    Long currentMaxTimestamp = 0L;
                    Long maxOutOfOrderness = 2000L;//最大容許的亂序時間是2s
                    Watermark watermark = null;
                    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                        return watermark;
                    }

                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
                        Long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        System.out.println("timestamp : " + element.f1 + "|" + format.format(element.f1) + " currentMaxTimestamp : " + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + " watermark : " + watermark.getTimestamp() + "|" + format.format(watermark.getTimestamp()));
                        return timestamp;
                    }
                });

        OutputTag<Tuple3<String, Long, Integer>> lateOutputTag = new OutputTag<Tuple3<String, Long, Integer>>("late-data") {
            private static final long serialVersionUID = -1552769100986888698L;
        };

        SingleOutputStreamOperator<String> resultStream = watermark
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .trigger(new Trigger<Tuple3<String, Long, Integer>, TimeWindow>() {
                    private static final long serialVersionUID = 2742133264310093792L;
                    ValueStateDescriptor<Integer> sumStateDescriptor = new ValueStateDescriptor<Integer>("sum", Integer.class);

                    @Override
                    public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                        ValueState<Integer> sumState = ctx.getPartitionedState(sumStateDescriptor);
                        if (null == sumState.value()) {
                            sumState.update(0);
                        }
                        sumState.update(element.f2 + sumState.value());
                        System.out.println(sumState.value());
//                        if (sumState.value() >= 2) {
                            //這裏能夠選擇手動處理狀態
                            //  默認的trigger發送是TriggerResult.FIRE 不會清除窗口數據
//                            return TriggerResult.FIRE_AND_PURGE;
                                return TriggerResult.FIRE_AND_PURGE;
//                        }
//                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        System.out.println("清理窗口狀態 | 窗口內保存值爲" + ctx.getPartitionedState(sumStateDescriptor).value());
                        ctx.getPartitionedState(sumStateDescriptor).clear();
                    }
                })
                //若是使用allowedLateness會有重複計算的效果
                //默認的trigger狀況下
                // 在event time>window_end_time+watermark+allowedLateness時會觸發窗口的clear
                // 後續數據若是屬於該窗口並且數據的event_time>watermark-allowedLateness 會觸發從新計算
                //
                //在使用自定義的trigger狀況下
                //同一個窗口內只要知足要求能夠不停的觸發窗口數據往下流
                //在event time>window_end_time+watermark+allowedLateness時會觸發窗口clear
                //後續數據若是屬於該窗口並且數據的event_time>watermark-allowedLateness 會觸發從新計算
                //
                //窗口狀態的clear只和時間有關與是否自定義trigger無關
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(lateOutputTag)
                .apply(new WindowFunction<Tuple3<String, Long, Integer>, String, Tuple, TimeWindow>() {
                    private static final long serialVersionUID = 7813420265419629362L;

                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Long, Integer>> input, Collector<String> out) throws Exception {
                        for (Tuple3<String, Long, Integer> stringLongTuple2 : input) {
                            System.out.println(stringLongTuple2.f1);
                        }
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        out.collect("window  " + format.format(window.getStart()) + "   window  " + format.format(window.getEnd()));
                        System.out.println("-------------------------");
                    }
                });

        resultStream.print();
        resultStream.getSideOutput(lateOutputTag).print();
        env.execute("window test");

    }
}

package cn.crawler.mft_seconed.demo4;

import cn.crawler.mft_seconed.KafkaEntity;
import cn.crawler.mft_seconed.demo2.SendDataToKafkaSql;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;

public class SendDataToKafkaDemo4 {

    public static void main(String[] args){
        SendDataToKafkaDemo4 sendDataToKafkaDemo4 = new SendDataToKafkaDemo4();
        for(int i=0;i<40;i++){
            KafkaEntity build = KafkaEntity.builder().id(1).message("message"+i).create_time(System.currentTimeMillis()).name(""+1).build();
            System.out.println(build.toString());
            sendDataToKafkaDemo4.send("test-demo13", "123", JSON.toJSONString(build));
        }
    }

    public void send(String topic,String key,String data){
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.19.141.60:31090");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props);
        for(int i=1;i<2;i++){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            producer.send(new ProducerRecord<String, String>(topic, key+i, data));
 

           }
            producer.close();
        }
    }

下面看一下輸出數據:app

timestamp : 1564038394140|2019-07-25 15:06:34.140 currentMaxTimestamp : 1564038394140|2019-07-25 15:06:34.140, watermark : -2000|1970-01-01 07:59:58.000
1
1564038394140
window  2019-07-25 15:06:33.000   window  2019-07-25 15:06:36.000
-------------------------
timestamp : 1564038395056|2019-07-25 15:06:35.056 currentMaxTimestamp : 1564038395056|2019-07-25 15:06:35.056, watermark : 1564038392140|2019-07-25 15:06:32.140
2
1564038395056
window  2019-07-25 15:06:33.000   window  2019-07-25 15:06:36.000
-------------------------
timestamp : 1564038395363|2019-07-25 15:06:35.363 currentMaxTimestamp : 1564038395363|2019-07-25 15:06:35.363, watermark : 1564038393056|2019-07-25 15:06:33.056
3
1564038395363
window  2019-07-25 15:06:33.000   window  2019-07-25 15:06:36.000
-------------------------
timestamp : 1564038395786|2019-07-25 15:06:35.786 currentMaxTimestamp : 1564038395786|2019-07-25 15:06:35.786, watermark : 1564038393363|2019-07-25 15:06:33.363
4
1564038395786
window  2019-07-25 15:06:33.000   window  2019-07-25 15:06:36.000
-------------------------
timestamp : 1564038396216|2019-07-25 15:06:36.216 currentMaxTimestamp : 1564038396216|2019-07-25 15:06:36.216, watermark : 1564038393786|2019-07-25 15:06:33.786
1
1564038396216
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038396504|2019-07-25 15:06:36.504 currentMaxTimestamp : 1564038396504|2019-07-25 15:06:36.504, watermark : 1564038394216|2019-07-25 15:06:34.216
2
1564038396504
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038396960|2019-07-25 15:06:36.960 currentMaxTimestamp : 1564038396960|2019-07-25 15:06:36.960, watermark : 1564038394504|2019-07-25 15:06:34.504
3
1564038396960
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038397376|2019-07-25 15:06:37.376 currentMaxTimestamp : 1564038397376|2019-07-25 15:06:37.376, watermark : 1564038394960|2019-07-25 15:06:34.960
4
1564038397376
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038397755|2019-07-25 15:06:37.755 currentMaxTimestamp : 1564038397755|2019-07-25 15:06:37.755, watermark : 1564038395376|2019-07-25 15:06:35.376
5
1564038397755
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038398077|2019-07-25 15:06:38.077 currentMaxTimestamp : 1564038398077|2019-07-25 15:06:38.077, watermark : 1564038395755|2019-07-25 15:06:35.755
6
1564038398077
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038398511|2019-07-25 15:06:38.511 currentMaxTimestamp : 1564038398511|2019-07-25 15:06:38.511, watermark : 1564038396077|2019-07-25 15:06:36.077
7
1564038398511
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038398904|2019-07-25 15:06:38.904 currentMaxTimestamp : 1564038398904|2019-07-25 15:06:38.904, watermark : 1564038396511|2019-07-25 15:06:36.511
8
1564038398904
window  2019-07-25 15:06:36.000   window  2019-07-25 15:06:39.000
-------------------------
timestamp : 1564038399218|2019-07-25 15:06:39.218 currentMaxTimestamp : 1564038399218|2019-07-25 15:06:39.218, watermark : 1564038396904|2019-07-25 15:06:36.904
1
1564038399218
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038399635|2019-07-25 15:06:39.635 currentMaxTimestamp : 1564038399635|2019-07-25 15:06:39.635, watermark : 1564038397218|2019-07-25 15:06:37.218
2
1564038399635
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038399874|2019-07-25 15:06:39.874 currentMaxTimestamp : 1564038399874|2019-07-25 15:06:39.874, watermark : 1564038397635|2019-07-25 15:06:37.635
3
1564038399874
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038400261|2019-07-25 15:06:40.261 currentMaxTimestamp : 1564038400261|2019-07-25 15:06:40.261, watermark : 1564038397874|2019-07-25 15:06:37.874
4
1564038400261
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038400614|2019-07-25 15:06:40.614 currentMaxTimestamp : 1564038400614|2019-07-25 15:06:40.614, watermark : 1564038398261|2019-07-25 15:06:38.261
5
1564038400614
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038400935|2019-07-25 15:06:40.935 currentMaxTimestamp : 1564038400935|2019-07-25 15:06:40.935, watermark : 1564038398614|2019-07-25 15:06:38.614
6
1564038400935
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038401351|2019-07-25 15:06:41.351 currentMaxTimestamp : 1564038401351|2019-07-25 15:06:41.351, watermark : 1564038398935|2019-07-25 15:06:38.935
7
1564038401351
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
清理窗口狀態 | 窗口內保存值爲4 //這裏!!!!!!觸發了觸發器的clear()操做
timestamp : 1564038401856|2019-07-25 15:06:41.856 currentMaxTimestamp : 1564038401856|2019-07-25 15:06:41.856, watermark : 1564038399351|2019-07-25 15:06:39.351
8
1564038401856
window  2019-07-25 15:06:39.000   window  2019-07-25 15:06:42.000
-------------------------
timestamp : 1564038402142|2019-07-25 15:06:42.142 currentMaxTimestamp : 1564038402142|2019-07-25 15:06:42.142, watermark : 1564038399856|2019-07-25 15:06:39.856
1
1564038402142
window  2019-07-25 15:06:42.000   window  2019-07-25 15:06:45.000
-------------------------
timestamp : 1564038402501|2019-07-25 15:06:42.501 currentMaxTimestamp : 1564038402501|2019-07-25 15:06:42.501, watermark : 1564038400142|2019-07-25 15:06:40.142
2
1564038402501
window  2019-07-25 15:06:42.000   window  2019-07-25 15:06:45.000
-------------------------

咱們分析一下以上代碼:
SendDataToKafkaDemo4類發送了40條數據進kafka,WatermarkTest會接到數據,並將其轉換爲java實體類。而後爲其添加水印(最大遲到時間是2S)。並將窗口劃分爲3s的固定大小窗口。根據第一個字段key by後,爲每一個key by 後的窗口設置(更新)state的值。當水印時間 = window end time + 3s時,觸動觸發器的clear方法,執行清除窗口數據的操做。固然,咱們也能夠看到,觸發器的重寫方法有好幾種,咱們能夠在本身須要的地方重寫方法。
例如:
第一個時間窗口: 15:06:33.000 - 15:06:36.000
第一個時間窗口最終value: 4
咱們拿到第一個時間窗口的最後時間 36s + 3s(allowedLateness時間) = 39 s 的時間點,即當水印達到15:06:39 000 時間點的時候,會執行窗口觸發器的clear方法,隨即,咱們在事件時間爲timestamp : 1564038401856|2019-07-25 15:06:41.856 的時候,水印時間戳已經達到了39s的時間點,即:
watermark : 1564038399351|2019-07-25 15:06:39.3518 是這個點 。此時觸發....dom

相關文章
相關標籤/搜索