【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

在使用eventTime的時候如何處理亂序數據?咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡延遲等緣由,致使亂序的產生,特別是使用kafka的話,多個分區的數據沒法保證有序。因此在進行window計算的時候,咱們又不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。Watermark是用於處理亂序事件的,用於衡量Event Time進展的機制。watermark能夠翻譯爲水位線。java

1、Watermark的核心原理

Watermark的核心本質能夠理解成一個延遲觸發機制。
在 Flink 的窗口處理過程當中,若是肯定所有數據到達,就能夠對 Window 的全部數據作 窗口計算操做(如彙總、分組等),若是數據沒有所有到達,則繼續等待該窗口中的數據全 部到達纔開始處理。這種狀況下就須要用到水位線(WaterMarks)機制,它可以衡量數據處 理進度(表達數據到達的完整性),保證事件數據(所有)到達 Flink 系統,或者在亂序及 延遲到達時,也可以像預期同樣計算出正確而且連續的結果。當任何 Event 進入到 Flink 系統時,會根據當前最大事件時間產生 Watermarks 時間戳。apache

那麼 Flink 是怎麼計算 Watermak 的值呢?編程

Watermark =進入Flink 的最大的事件時間(mxtEventTime)-指定的延遲時間(t)windows

那麼有 Watermark 的 Window 是怎麼觸發窗口函數的呢?
若是有窗口的中止時間等於或者小於 maxEventTime - t(當時的warkmark),那麼這個窗口被觸發執行。api

其核心處理流程以下圖所示。網絡

【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

2、Watermark的三種使用狀況

一、原本有序的Stream中的 Watermark

若是數據元素的事件時間是有序的,Watermark 時間戳會隨着數據元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(由於既然是有序的時間,就不須要設置延遲了,那麼t就是 0。因此 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 Watermark 時間大於 Windows 結束時間就會觸發對 Windows 的數據計算,以此類推, 下一個 Window 也是同樣。這種狀況實際上是亂序數據的一種特殊狀況。app

二、亂序事件中的Watermark

現實狀況下數據元素每每並非按照其產生順序接入到 Flink 系統中進行處理,而頻繁 出現亂序或遲到的狀況,這種狀況就須要使用 Watermarks 來應對。好比下圖,設置延遲時間t爲2。socket

三、並行數據流中的Watermark

在多並行度的狀況下,Watermark 會有一個對齊機制,這個對齊機制會取全部 Channel 中最小的 Watermark。ide

3、設置Watermark的核心代碼

一、首先,正確設置事件處理的時間語義,通常都是採用Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

二、其次,指定生成Watermark的機制,包括:延時處理的時間和EventTime對應的字段。以下:

【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

注意:不論是數據是否有序,均可以使用上面的代碼。有序的數據只是無序數據的一種特殊狀況。函數

4、Watermark編程案例

測試數據:基站的手機通話數據,以下:

【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

需求:按基站,每5秒統計通話時間最長的記錄。

  • StationLog用於封裝基站數據
package watermark;

//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
    private String stationID;   //基站ID
    private String from;        //呼叫放
    private String to;          //被叫方
    private long duration;      //通話的持續時間
    private long callTime;      //通話的呼叫時間
    public StationLog(String stationID, String from, 
                      String to, long duration, 
                      long callTime) {
        this.stationID = stationID;
        this.from = from;
        this.to = to;
        this.duration = duration;
        this.callTime = callTime;
    }
    public String getStationID() {
        return stationID;
    }
    public void setStationID(String stationID) {
        this.stationID = stationID;
    }
    public long getCallTime() {
        return callTime;
    }
    public void setCallTime(long callTime) {
        this.callTime = callTime;
    }
    public String getFrom() {
        return from;
    }
    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public long getDuration() {
        return duration;
    }
    public void setDuration(long duration) {
        this.duration = duration;
    }
}
  • 代碼實現:WaterMarkDemo用於完成計算(注意:爲了方便我們測試設置任務的並行度爲1)
package watermark;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

//每隔五秒,將過去是10秒內,通話時間最長的通話日誌輸出。
public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        //獲得Flink流式處理的運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //設置週期性的產生水位線的時間間隔。當數據流很大的時候,若是每一個事件都產生水位線,會影響性能。
        env.getConfig().setAutoWatermarkInterval(100);//默認100毫秒

        //獲得輸入流
        DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
        stream.flatMap(new FlatMapFunction<String, StationLog>() {

            public void flatMap(String data, Collector<StationLog> output) throws Exception {
                String[] words = data.split(",");
                //                           基站ID            from    to        通話時長                                                    callTime
                output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
            }
        }).filter(new FilterFunction<StationLog>() {

            @Override
            public boolean filter(StationLog value) throws Exception {
                return value.getDuration() > 0?true:false;
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime對應的字段
                    }
                })
        ).keyBy(new KeySelector<StationLog, String>(){
            @Override
            public String getKey(StationLog value) throws Exception {
                return value.getStationID();  //按照基站分組
            }}
        ).timeWindow(Time.seconds(5)) //設置時間窗口
        .reduce(new MyReduceFunction(),new MyProcessWindows()).print();

        env.execute();
    }
}
//用於如何處理窗口中的數據,即:找到窗口內通話時間最長的記錄。
class MyReduceFunction implements ReduceFunction<StationLog> {
    @Override
    public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
        // 找到通話時間最長的通話記錄
        return value1.getDuration() >= value2.getDuration() ? value1 : value2;
    }
}
//窗口處理完成後,輸出的結果是什麼
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
    @Override
    public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
            Iterable<StationLog> elements, Collector<String> out) throws Exception {
        StationLog maxLog = elements.iterator().next();

        StringBuffer sb = new StringBuffer();
        sb.append("窗口範圍是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
        sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
          .append("呼叫時間:").append(maxLog.getCallTime()).append("\t")
          .append("主叫號碼:").append(maxLog.getFrom()).append("\t")
          .append("被叫號碼:")  .append(maxLog.getTo()).append("\t")
          .append("通話時長:").append(maxLog.getDuration()).append("\n");
        out.collect(sb.toString());
    }
}

【趙強老師】Flink的Watermark機制(基於Flink 1.11.0實現)

相關文章
相關標籤/搜索