專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 java
針對stream數據中的時間,能夠分爲如下三種:git
在水印的處理中,咱們通常取事件時間序列的最大值做爲水印的生成時間參考。github
按照信號發生的順序,時間是不斷增長的,因此在時間序列上若出現事件時間小於時間序列最大值,通常都是延時的事件,時間序列最大值不會改變。apache
每處理一條事件數據,watermark時間就生成一次,後面窗的觸發就是依據水印時間。若設置亂序延時爲10s,則生成規則就是:windows
final Long maxOutOfOrderness = 10000L;// 最大容許的亂序時間是10s
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
複製代碼
數據會按照時間進行依次Append,api
水印依賴的條件是窗,水印只是決定了窗的觸發時間,若設置最大容許的亂序時間是maxOutOfOrderness=10s,則窗的觸發時機就是:網絡
watermark 時間 >= window_end_time
複製代碼
窗觸發時,數據除了正常的時間序列,同時也包含延時到達的序列。在窗觸發前(也就水印時間),計算除了把以前的正常窗數據給觸發了,同時還包含了原本也屬於該窗的延時數據。app
事件時間的最大值,也就是當前的實際事件時間,所以須要以此爲參考點。socket
實際窗:意思就是數據就在那裏Append,窗數據已經準備好,等待觸發時機。ide
水印時間不受影響:就是每次來的數據的事件時間最大值,不受延遲數據時間影響。
下面例子中,等水印時間爲10:11:33時,知足時間窗 10:11:30 <-> 10:11:33的觸發時機,此時須要處理的數據不只包含正常數據10:11:32 ,同時還包含亂序數據10:11:31。
再次強調:窗時機到來時,會遍歷亂序數據和原窗數據。
實際窗在流動,只是暫不觸發。
水印也在標記流動
窗時機觸發也在流動。
watermark 時間 >= window_end_time時,觸發歷史窗執行。
繼續
流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡延遲等緣由,致使亂序的產生,特別是使用kafka的話,多個分區的數據沒法保證有序。因此在進行window計算的時候,咱們又不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark,watermark是用於處理亂序事件的。
一般,在接收到source的數據後,應該馬上生成watermark;可是,也能夠在source後,應用簡單的map或者filter操做後,再生成watermark。注意:若是指定屢次watermark,後面指定的會覆蓋前面的值。 生成方式
With Periodic Watermarks
週期性的觸發watermark的生成和發送,默認是100ms,每隔N秒自動向流裏
注入一個WATERMARK 時間間隔由ExecutionConfig.setAutoWatermarkInterval
決定. 每次調用getCurrentWatermark 方法, 若是獲得的WATERMARK
不爲空而且比以前的大就注入流中
能夠定義一個最大容許亂序的時間,這種比較經常使用
實現AssignerWithPeriodicWatermarks接口
複製代碼
With Punctuated Watermarks
基於某些事件觸發watermark的生成和發送基於事件向流裏注入一個WATERMARK,
每個元素都有機會判斷是否生成一個WATERMARK. 若是獲得的WATERMARK
不爲空而且比以前的大就注入流中實現AssignerWithPunctuatedWatermarks接口
複製代碼
多並行度流的watermarks
注意:多並行度的狀況下,watermark對齊會取全部channel最小的watermark。
public class StreamingWindowWatermark {
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port = 9010;
//獲取運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置並行度爲1,默認並行度是當前機器的cpu數量
env.setParallelism(1);
//鏈接socket獲取輸入的數據
DataStream<String> text = env.socketTextStream("SparkMaster", port, "\n");
//解析輸入的數據
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大容許的亂序時間是10s
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
long id = Thread.currentThread().getId();
System.out.println("做者:秦凱新 鍵值 :"+element.f0+",事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
sdf.format(currentMaxTimestamp)+" ],水印時間:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");
return timestamp;
}
});
DataStream<String> window = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果同樣
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對window內的數據進行排序,保證數據的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "\n 做者:秦凱新 鍵值 : "+ key + "\n 觸發窗內數據個數 : " + arrarList.size() + "\n 觸發窗起始數據: " + sdf.format(arrarList.get(0)) + "\n 觸發窗最後(多是延時)數據:" + sdf.format(arrarList.get(arrarList.size() - 1))
+ "\n 實際窗起始和結束時間: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";
out.collect(result);
}
});
//測試-把結果打印到控制檯便可
window.print();
//注意:由於flink是懶加載的,因此必須調用execute方法,上面的代碼纔會執行
env.execute("eventtime-watermark");
}
}
複製代碼
執行測試數據
0001,1538359882000 2018-10-01 10:11:22
0002,1538359886000 2018-10-01 10:11:26
0003,1538359892000 2018-10-01 10:11:32
0004,1538359893000 2018-10-01 10:11:33
0005,1538359894000 2018-10-01 10:11:34
0006,1538359896000 2018-10-01 10:11:36
0007,1538359897000 2018-10-01 10:11:37
0008,1538359899000 2018-10-01 10:11:39
0009,1538359891000 2018-10-01 10:11:31
0010,1538359903000 2018-10-01 10:11:43
0011,1538359892000 2018-10-01 10:11:32
0012,1538359891000 2018-10-01 10:11:31
0010,1538359906000 2018-10-01 10:11:46
複製代碼
第一個窗觸發:2018-10-01 10:11:21.000《----》2018-10-01 10:11:24.000
第二個窗觸發:2018-10-01 10:11:24.000《----》2018-10-01 10:11:27.000
第三個窗觸發:2018-10-01 10:11:30.000《----》2018-10-01 10:11:33.000
第四個窗觸發:10:11:33.000《----》2018-10-01 10:11:36.000
在某些狀況下, 咱們但願對遲到的數據再提供一個寬容的時間。 Flink 提供了 allowedLateness 方法能夠實現對遲到的數據設置一個延遲時間, 在指定延遲時間內到達的數據仍是能夠觸發 window 執行的。
第二次(或屢次)觸發的條件是 watermark < window_end_time + allowedLateness 時間內, 這個窗口有 late 數據到達時。
舉例:當 watermark 等於 10:11:34 的時候, 咱們輸入 eventtime 爲 10:11:30、 10:11:3一、10:11:32 的數據的時候, 是能夠觸發的, 由於這些數據的 window_end_time 都是 10:11:33, 也就是10:11:34<10:11:33+2 爲 true。
舉例:可是當 watermark 等於 10:11:35 的時候,咱們再輸入 eventtime 爲 10:11:30、10:11:3一、10:11:32的數據的時候, 這些數據的 window_end_time 都是 10:11:33, 此時, 10:11:35< 10:11:33+2 爲false 了。 因此最終這些數據遲到的時間過久了, 就不會再觸發 window 執行了,預示着丟棄。
同時注意,對於延遲的數據,咱們徹底能夠把它揪出來做分析。經過設置sideOutputLateData。
public class StreamingWindowWatermark2 {
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port = 9000;
//獲取運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置並行度爲1,默認並行度是當前機器的cpu數量
env.setParallelism(1);
//鏈接socket獲取輸入的數據
DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");
//解析輸入的數據
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大容許的亂序時間是10s
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯
* 默認100ms被調用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
return timestamp;
}
});
//保存被丟棄的數據
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
//注意,因爲getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,因此這裏的類型,不能使用它的父類dataStream。
SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果同樣
//.allowedLateness(Time.seconds(2))//容許數據遲到2秒
.sideOutputLateData(outputTag)
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對window內的數據進行排序,保證數據的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
}
});
//把遲到的數據暫時打印到控制檯,實際中能夠保存到其餘存儲介質中
DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
sideOutput.print();
//測試-把結果打印到控制檯便可
window.print();
//注意:由於flink是懶加載的,因此必須調用execute方法,上面的代碼纔會執行
env.execute("eventtime-watermark");
}
}
複製代碼
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class StreamingWindowWatermark2 {
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port = 9010;
//獲取運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置並行度爲1,默認並行度是當前機器的cpu數量
env.setParallelism(8);
//鏈接socket獲取輸入的數據
DataStream<String> text = env.socketTextStream("SparkMaster", port, "\n");
//解析輸入的數據
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大容許的亂序時間是10s
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯
* 默認100ms被調用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
long id = Thread.currentThread().getId();
System.out.println("做者:秦凱新 鍵值 :"+element.f0+"線程驗證 :"+ id +" , 事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
sdf.format(currentMaxTimestamp)+" ],水印時間:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]"); return timestamp;
}
});
//保存被丟棄的數據
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
//注意,因爲getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,因此這裏的類型,不能使用它的父類dataStream。
SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果同樣
//.allowedLateness(Time.seconds(2))//容許數據遲到2秒
.sideOutputLateData(outputTag)
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對window內的數據進行排序,保證數據的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "\n 做者:秦凱新 鍵值 : "+ key + "\n 觸發窗內數據個數 : " + arrarList.size() + "\n 觸發窗起始數據: " + sdf.format(arrarList.get(0)) + "\n 觸發窗最後(多是延時)數據:" + sdf.format(arrarList.get(arrarList.size() - 1))
+ "\n 實際窗起始和結束時間: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";
out.collect(result);
}
});
//把遲到的數據暫時打印到控制檯,實際中能夠保存到其餘存儲介質中
DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
sideOutput.print();
//測試-把結果打印到控制檯便可
window.print();
//注意:由於flink是懶加載的,因此必須調用execute方法,上面的代碼纔會執行
env.execute("eventtime-watermark");
}
}
複製代碼
env.setParallelism(1);
複製代碼
若是這裏不設置的話, 代碼在運行的時候會默認讀取本機 CPU 數量設置並行度。
下面咱們來驗證一下, 把代碼中的並行度調整爲 2:
env.setParallelism(2);
複製代碼
發現玄機以下:在第二條事件時,其實已經達到窗的觸發時機,可是由於並行度爲2,只有等到最小
watermark 到的時候纔會觸發窗計算。發現線程44處理的是001和003 ,線程42處理的是0002,因此只有等到線程42到達後,水印纔會起做用執行2018-10-01 10:11:33.000所在的窗。
0001,1538359890000 2018-10-01 10:11:30
0002,1538359903000 2018-10-01 10:11:43
0003,1538359908000 2018-10-01 10:11:48
複製代碼
發現 這 7 條數據都是被不一樣的線程處理的。 每一個線程都有一個 watermark。且每個線程都是基於本身接收數據的事件時間最大值。
所以,致使到最後如今還沒獲取到最小的 watermark, 因此 window 沒法被觸發執行。
只有全部的線程的最小watermark都知足watermark 時間 >= window_end_time時,觸發歷史窗纔會執行。
0001,1538359882000 2018-10-01 10:11:22
0002,1538359886000 2018-10-01 10:11:26
0003,1538359892000 2018-10-01 10:11:32
0004,1538359893000 2018-10-01 10:11:33
0005,1538359894000 2018-10-01 10:11:34
0006,1538359896000 2018-10-01 10:11:36
0007,1538359897000 2018-10-01 10:11:37
複製代碼
當持續發生事件數據時。一旦全部線程都達到最低的窗觸發時機時,就會進行窗觸發執行了。輸入數據以下:
0007,1538359897000 2018-10-01 10:11:37
0008,1538359897000 2018-10-01 10:11:37
0009,1538359897000 2018-10-01 10:11:37
0010,1538359897000 2018-10-01 10:11:37
0011,1538359897000 2018-10-01 10:11:37
0012,1538359897000 2018-10-01 10:11:37
0013,1538359897000 2018-10-01 10:11:37
0014,1538359897000 2018-10-01 10:11:37
0015,1538359897000 2018-10-01 10:11:37
複製代碼
專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。
秦凱新 於深圳