初學Flink,對Watermarks的一些理解和感悟(透徹2)

官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻譯:https://www.jianshu.com/p/68ab40c7f347html

1. 幾個重要的概念簡述:

  • Window:Window是處理無界流的關鍵,Windows將流拆分爲一個個有限大小的buckets,能夠能夠在每個buckets中進行計算java

  • start_time,end_time:當Window時時間窗口的時候,每一個window都會有一個開始時間和結束時間(前開後閉),這個時間是系統時間apache

  • event-time: 事件發生時間,是事件發生所在設備的當地時間,好比一個點擊事件的時間發生時間,是用戶點擊操做所在的手機或電腦的時間json

  • Watermarks:能夠把他理解爲一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大於了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。api

2.如何使用Watermarks處理亂序的數據流

什麼是亂序呢?能夠理解爲數據到達的順序和他的event-time排序不一致。致使這的緣由有不少,好比延遲,消息積壓,重試等等ide

由於Watermarks是用來觸發window窗口計算的,咱們能夠根據事件的event-time,計算出Watermarks,而且設置一些延遲,給遲到的數據一些機會。測試

假如咱們設置10s的時間窗口(window),那麼0~10s,10~20s都是一個窗口,以0~10s爲例,0位start-time,10爲end-time。假若有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),咱們設置Watermarks爲當前全部到達數據event-time的最大值減去延遲值3.5秒this

當A到達的時候,Watermarks爲max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發計算
當B到達的時候,Watermarks爲max(12.8,5)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當C到達的時候,Watermarks爲max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當D到達的時候,Watermarks爲max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發計算
觸發計算的時候,會將AC(由於他們都小於10)都計算進去spa

經過上面這種方式,咱們就將遲到的C計算進去了翻譯

這裏的延遲3.5s是咱們假設一個數據到達的時候,比他早3.5s的數據確定也都到達了,這個是須要根據經驗推算的,加入D到達之後有到達了一個E,event-time=6,可是因爲0~10的時間窗口已經開始計算了,因此E就丟了。

3.看一個代碼的實際例子

下面代碼中的BoundedOutOfOrdernessGenerator就是一個典型的Watermarks實例

package xuwei.tech;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.meituan.flink.common.conf.FlinkConf;
import com.meituan.flink.common.kafka.MTKafkaConsumer08;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by smile on 14/11/2017. 統計每 10 秒內每種操做有多少個
 */
public class EventTimeWindowCount {
	private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class);

	public static void main(String[] args) throws Exception { // 獲取做業名
		String jobName = FlinkConf.getJobName(args); // 獲取執行環境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置使用 EventTime
																										// 做爲時間戳(默認是
																										// ProcessingTime)
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 開啓 Checkpoint(每 10 秒保存一次檢查點,模式爲 Exactly Once)
		env.enableCheckpointing(10000);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置從 Kafka 的 topic
																						// "log.orderlog" 中讀取數據
		MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName);
		DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默認接上次開始消費,如下的寫法(setStartFromLatest)能夠從最新開始消費,相應的還有(setStartFromEarliest
																													// 從最舊開始消費)
		// DataStream<String> stream =
		// env.addSource(consumer.getInstance("log.orderlog", new
		// SimpleStringSchema()).setStartFromLatest());

		DataStream<String> orderAmount = // 將讀入的字符串轉化爲 OrderRecord 對象
				stream.map(new ParseOrderRecord()) // 設置從 OrderRecord 對象中提取時間戳的方式,下文 BoundedOutOfOrdernessGenerator
													// 類中具體實現該方法
						.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 對象的 action
																								// 字段進行分流(相同 action
																								// 的進入相同流,不一樣 action
																								// 的進入不一樣流)
						.keyBy("action") // 觸發 10s 的滾動窗口,即每十秒的數據進入同一個窗口
						.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 將同一窗口的每一個 OrderRecord 對象的 count
																				// 字段加起來(其他字段只保留第一個進入該窗口的,後進去的丟棄)
						.sum("count") // 將結果從 OrderRecord 對象轉換爲 String,每十萬條輸出一條
						.flatMap(new ParseResult()); // 若是想每條都輸出來,那就輸得慢一點,每 10 秒輸出一條數據(請將上一行的 flatMap 換成下一行的 map)
		// .map(new ParseResultSleep());

		// 輸出結果(而後就能夠去 Task Manage 的 Stdout 裏面看)
		// 小數據量測試的時候能夠這麼寫,正式上線的時候不要這麼寫!數據量大建議仍是寫到 Kafka Topic 或者其餘的下游裏面去
		orderAmount.print();
		env.execute(jobName);
	}

	public static class ParseOrderRecord implements MapFunction<String, OrderRecord> {
		@Override
		public OrderRecord map(String s) throws Exception {
			JSONObject jsonObject = JSON.parseObject(s);
			long id = jsonObject.getLong("id");
			int dealId = jsonObject.getInteger("dealid");
			String action = jsonObject.getString("_mt_action");
			double amount = jsonObject.getDouble("amount");
			String timestampString = jsonObject.getString("_mt_datetime"); // 將字符串格式的時間戳解析爲 long 類型,單位毫秒
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			Date timestampDate = simpleDateFormat.parse(timestampString);
			long timestamp = timestampDate.getTime();
			return new OrderRecord(id, dealId, action, amount, timestamp);
		}
	}

	public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {
		private final long maxOutOfOrderness = 3500; // 3.5 seconds

		private long currentMaxTimestamp;

		@Override
		public long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 將數據中的時間戳字段(long 類型,精確到毫秒)賦給
																							// timestamp 變量,此處是
																							// OrderRecord 的 timestamp
																							// 字段
			long timestamp = record.timestamp;
			currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
			return timestamp;
		}

		@Override
		public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the
													// out-of-orderness bound
			return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
		}
	}

	public static class ParseResult implements FlatMapFunction<OrderRecord, String> {
		private static long msgCount = 0;

		@Override
		public void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十萬條輸出一條,防止輸出太多在 Task
																							// Manage 的 Stdout 裏面刷新不出來
			if (msgCount == 0) {
				out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp)
						+ " action: " + record.action + " count = " + record.count);
				msgCount = 0;
			}
			msgCount++;
			msgCount %= 100000;
		}
	}

	public static class ParseResultSleep implements MapFunction<OrderRecord, String> {
		@Override
		public String map(OrderRecord record) throws Exception { // 每 10 秒輸出一條數據,防止輸出太多在 Task Manage 的 Stdout 裏面刷新不出來
			// 正式上線的時候不要這麼寫!
			Thread.sleep(10000);
			return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: "
					+ record.action + " count = " + record.count;
		}
	}

	public static class OrderRecord {        public long id;        public int dealId;        public String action;        public double amount;        public long timestamp;        public long count;        public OrderRecord() {
        }        public OrderRecord(long id, int dealId, String action, double amount, long timestamp) {            this.id = id;            this.dealId = dealId;            this.action = action;            this.amount = amount;            this.timestamp = timestamp;            this.count = 1;
        }
    }
}
相關文章
相關標籤/搜索