其中用到了滑動窗口函數大小30秒,間隔15秒,且大於窗口10秒的數據,被丟棄。(實際業務這三個值 應爲是 10 分鐘,1分鐘,5分鐘)。代碼先記錄一下sql
public static void main(String[] arg) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableSysoutLogging();//開啓Sysout打日誌 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設置窗口的時間單位爲process time Properties props = new Properties(); props.put("bootstrap.servers", "kafkaip:9092"); props.put("group.id", "metric-group4"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); //value 反序列化 DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "im-message-topic3", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); DataStream<Message> bean3DataStream = dataStreamSource.map(new MapFunction<String, Message>() { @Override public Message map(String value) throws Exception { logger.info("receive msg:"+value); JSONObject jsonObject =JSONObject.parseObject(value); Message s= new Message( jsonObject.getString("sessionId"), jsonObject.getString("fromUid"), jsonObject.getString("toUid"), jsonObject.getString("chatType"), jsonObject.getString("type"), jsonObject.getString("msgId"), jsonObject.getString("msg"), jsonObject.getLong("timestampSend") ); return s; } }); //設置水印,並過濾數據 DataStream<Message> bean3DataStreamWithAssignTime = bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction<Message, Message,TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Message> values, Collector<Message> out) throws Exception { for (Message t: values) { logger.info("window start time:"+new Date(window.getStart()).toString()); logger.info("real time:"+new Date(t.getTimestampSend()).toString()); if(t.getTimestampSend()<window.getStart()+1000*10) { logger.info("yes"); out.collect(t); }else { logger.info("no"); } } } }); //bean3DataStreamWithAssignTime.addSink(new Sink()); //bean3DataStreamWithAssignTime.writeAsText("/usr/local/whk3", WriteMode.OVERWRITE); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerDataStream("myTable", bean3DataStreamWithAssignTime, "sessionId, fromUid,toUid,chatType,type,msgId,msg,timestampSend,rowtime.rowtime"); Table temp=tableEnv.scan("myTable"); System.out.println("schema is:"); temp.printSchema(); // Table tb3 = tableEnv.sqlQuery("select * from myTable"); // DataStream<Row> appendStream =tableEnv.toAppendStream(tb3, Row.class); // appendStream.addSink(new Sink()); //對過濾後的數據,使用正則匹配數據 Table tb2 = tableEnv.sqlQuery( "SELECT " + " * " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.timestampSend as answerTime, "+ "LAST(e1.timestampSend) as customer_event_time, " + "e2.fromUid as empUid, " + "e1.timestampSend as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1+ e2+?) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream<Row> appendStream2 =tableEnv.toAppendStream(tb2, Row.class); appendStream2.addSink(new Sink2()); env.execute("msg v5"); } public static class TruckTimestamp extends AscendingTimestampExtractor<Message> { private static final long serialVersionUID = 1L; @Override public long extractAscendingTimestamp(Message element) { return element.getTimestampSend(); } } public static class Sink implements SinkFunction<Row> { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"orinal time:"+value.toString()); } } public static class Sink2 implements SinkFunction<Row> { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"new time:"+value.toString()); } }