在大多數場景下,咱們須要統計的數據流都是無界的,所以咱們沒法等待整個數據流終止後才進行統計。一般狀況下,咱們只須要對某個時間範圍或者數量範圍內的數據進行統計分析:如每隔五分鐘統計一次過去一小時內全部商品的點擊量;或者每發生1000次點擊後,都去統計一下每一個商品點擊率的佔比。在 Flink 中,咱們使用窗口 (Window) 來實現這類功能。按照統計維度的不一樣,Flink 中的窗口能夠分爲 時間窗口 (Time Windows) 和 計數窗口 (Count Windows) 。html
Time Windows 用於以時間爲維度來進行數據聚合,具體分爲如下四類:java
滾動窗口 (Tumbling Windows) 是指彼此之間沒有重疊的窗口。例如:每隔1小時統計過去1小時內的商品點擊量,那麼 1 天就只能分爲 24 個窗口,每一個窗口彼此之間是不存在重疊的,具體以下:git
這裏咱們以詞頻統計爲例,給出一個具體的用例,代碼以下:github
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的數據輸入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = value.split("\t");
for (String word : words) {
out.collect(new Tuple2<>(word, 1L));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒統計一次每一個單詞出現的數量
env.execute("Flink Streaming");
複製代碼
測試結果以下:apache
滑動窗口用於滾動進行聚合分析,例如:每隔 6 分鐘統計一次過去一小時內全部商品的點擊量,那麼統計窗口彼此之間就是存在重疊的,即 1天能夠分爲 240 個窗口。圖示以下:windows
能夠看到 window 1 - 4 這四個窗口彼此之間都存在着時間相等的重疊部分。想要實現滑動窗口,只須要在使用 timeWindow 方法時額外傳遞第二個參數做爲滾動時間便可,具體以下:socket
// 每隔3秒統計一次過去1分鐘內的數據
timeWindow(Time.minutes(1),Time.seconds(3))
複製代碼
當用戶在進行持續瀏覽時,可能每時每刻都會有點擊數據,例如在活動區間內,用戶可能頻繁的將某類商品加入和移除購物車,而你只想知道用戶本次瀏覽最終的購物車狀況,此時就能夠在用戶持有的會話結束後再進行統計。想要實現這類統計,能夠經過 Session Windows 來進行實現。ide
具體的實現代碼以下:oop
// 以處理時間爲衡量標準,若是10秒內沒有任何數據輸入,就認爲會話已經關閉,此時觸發統計
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件時間爲衡量標準
window(EventTimeSessionWindows.withGap(Time.seconds(10)))
複製代碼
最後一個窗口是全局窗口, 全局窗口會將全部 key 相同的元素分配到同一個窗口中,其一般配合觸發器 (trigger) 進行使用。若是沒有相應觸發器,則計算將不會被執行。測試
這裏繼續以上面詞頻統計的案例爲例,示例代碼以下:
// 當單詞累計出現的次數每達到10次時,則觸發計算,計算整個窗口內該單詞出現的總數
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
複製代碼
Count Windows 用於以數量爲維度來進行數據聚合,一樣也分爲滾動窗口和滑動窗口,實現方式也和時間窗口徹底一致,只是調用的 API 不一樣,具體以下:
// 滾動計數窗口,每1000次點擊則計算一次
countWindow(1000)
// 滑動計數窗口,每10次點擊發生後,則計算過去1000次點擊的狀況
countWindow(1000,10)
複製代碼
實際上計數窗口內部就是調用的咱們上一部分介紹的全局窗口來實現的,其源碼以下:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
複製代碼
Flink Windows: ci.apache.org/projects/fl…
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南