官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻譯:https://www.jianshu.com/p/68ab40c7f347html
Window:Window是處理無界流的關鍵,Windows將流拆分爲一個個有限大小的buckets
,能夠能夠在每個buckets
中進行計算java
start_time,end_time:當Window時時間窗口的時候,每一個window都會有一個開始時間和結束時間(前開後閉),這個時間是系統時間apache
event-time: 事件發生時間,是事件發生所在設備的當地時間,好比一個點擊事件的時間發生時間,是用戶點擊操做所在的手機或電腦的時間json
Watermarks:能夠把他理解爲一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大於了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。api
什麼是亂序呢?能夠理解爲數據到達的順序和他的event-time排序不一致。致使這的緣由有不少,好比延遲,消息積壓,重試等等ide
由於Watermarks是用來觸發window窗口計算的,咱們能夠根據事件的event-time,計算出Watermarks,而且設置一些延遲,給遲到的數據一些機會。測試
假如咱們設置10s的時間窗口(window),那麼0~10s,10~20s都是一個窗口,以0~10s爲例,0位start-time,10爲end-time。假若有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),咱們設置Watermarks爲當前全部到達數據event-time的最大值減去延遲值3.5秒this
當A到達的時候,Watermarks爲max{8}-3.5=8-3.5 = 4.5 < 10
,不會觸發計算
當B到達的時候,Watermarks爲max(12.8,5)-3.5=12.5-3.5 = 9 < 10
,不會觸發計算
當C到達的時候,Watermarks爲max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10
,不會觸發計算
當D到達的時候,Watermarks爲max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10
,觸發計算
觸發計算的時候,會將AC(由於他們都小於10)都計算進去spa
經過上面這種方式,咱們就將遲到的C計算進去了翻譯
這裏的延遲3.5s是咱們假設一個數據到達的時候,比他早3.5s的數據確定也都到達了,這個是須要根據經驗推算的,加入D到達之後有到達了一個E,event-time=6,可是因爲0~10的時間窗口已經開始計算了,因此E就丟了。
下面代碼中的BoundedOutOfOrdernessGenerator就是一個典型的Watermarks實例
package xuwei.tech; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.meituan.flink.common.conf.FlinkConf; import com.meituan.flink.common.kafka.MTKafkaConsumer08; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by smile on 14/11/2017. 統計每 10 秒內每種操做有多少個 */ public class EventTimeWindowCount { private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class); public static void main(String[] args) throws Exception { // 獲取做業名 String jobName = FlinkConf.getJobName(args); // 獲取執行環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置使用 EventTime // 做爲時間戳(默認是 // ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 開啓 Checkpoint(每 10 秒保存一次檢查點,模式爲 Exactly Once) env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置從 Kafka 的 topic // "log.orderlog" 中讀取數據 MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName); DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默認接上次開始消費,如下的寫法(setStartFromLatest)能夠從最新開始消費,相應的還有(setStartFromEarliest // 從最舊開始消費) // DataStream<String> stream = // env.addSource(consumer.getInstance("log.orderlog", new // SimpleStringSchema()).setStartFromLatest()); DataStream<String> orderAmount = // 將讀入的字符串轉化爲 OrderRecord 對象 stream.map(new ParseOrderRecord()) // 設置從 OrderRecord 對象中提取時間戳的方式,下文 BoundedOutOfOrdernessGenerator // 類中具體實現該方法 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 對象的 action // 字段進行分流(相同 action // 的進入相同流,不一樣 action // 的進入不一樣流) .keyBy("action") // 觸發 10s 的滾動窗口,即每十秒的數據進入同一個窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 將同一窗口的每一個 OrderRecord 對象的 count // 字段加起來(其他字段只保留第一個進入該窗口的,後進去的丟棄) .sum("count") // 將結果從 OrderRecord 對象轉換爲 String,每十萬條輸出一條 .flatMap(new ParseResult()); // 若是想每條都輸出來,那就輸得慢一點,每 10 秒輸出一條數據(請將上一行的 flatMap 換成下一行的 map) // .map(new ParseResultSleep()); // 輸出結果(而後就能夠去 Task Manage 的 Stdout 裏面看) // 小數據量測試的時候能夠這麼寫,正式上線的時候不要這麼寫!數據量大建議仍是寫到 Kafka Topic 或者其餘的下游裏面去 orderAmount.print(); env.execute(jobName); } public static class ParseOrderRecord implements MapFunction<String, OrderRecord> { @Override public OrderRecord map(String s) throws Exception { JSONObject jsonObject = JSON.parseObject(s); long id = jsonObject.getLong("id"); int dealId = jsonObject.getInteger("dealid"); String action = jsonObject.getString("_mt_action"); double amount = jsonObject.getDouble("amount"); String timestampString = jsonObject.getString("_mt_datetime"); // 將字符串格式的時間戳解析爲 long 類型,單位毫秒 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date timestampDate = simpleDateFormat.parse(timestampString); long timestamp = timestampDate.getTime(); return new OrderRecord(id, dealId, action, amount, timestamp); } } public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 將數據中的時間戳字段(long 類型,精確到毫秒)賦給 // timestamp 變量,此處是 // OrderRecord 的 timestamp // 字段 long timestamp = record.timestamp; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the // out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } public static class ParseResult implements FlatMapFunction<OrderRecord, String> { private static long msgCount = 0; @Override public void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十萬條輸出一條,防止輸出太多在 Task // Manage 的 Stdout 裏面刷新不出來 if (msgCount == 0) { out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count); msgCount = 0; } msgCount++; msgCount %= 100000; } } public static class ParseResultSleep implements MapFunction<OrderRecord, String> { @Override public String map(OrderRecord record) throws Exception { // 每 10 秒輸出一條數據,防止輸出太多在 Task Manage 的 Stdout 裏面刷新不出來 // 正式上線的時候不要這麼寫! Thread.sleep(10000); return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count; } } public static class OrderRecord { public long id; public int dealId; public String action; public double amount; public long timestamp; public long count; public OrderRecord() { } public OrderRecord(long id, int dealId, String action, double amount, long timestamp) { this.id = id; this.dealId = dealId; this.action = action; this.amount = amount; this.timestamp = timestamp; this.count = 1; } } }