Flink中的Time與Window

1、Timejava

在Flink的流式處理中,會涉及到時間的不一樣概念web

Event Time(事件時間):是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳apache

Ingestion Time(採集時間):是數據進入Flink的時間windows

Processing Time(處理時間):是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。api

例如一條日誌進入Flink的時間爲2017-11-12 10:00:00.123 到達window的系統時間爲 2017-11-12 10:00:01.234,日誌內容以下:網絡

2017-11-02 18:37:15.624 INFO Fair over to rm2session

對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?----- eventTime,由於咱們要根據日誌的生成時間進行統計。socket

  

若是要想聚合,不可能對無解數據流進行聚合。ide

 

2、Windowspa

一、streaming流式計算是一種被設計用於處理處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。

Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的"buckets"桶,咱們能夠在這些桶上作計算操做。

共有兩類,五種時間窗口。

二、Window類型(兩類)

2.一、CountWindow:按照指定的數據條數生成一個window,與時間無關

2.二、TimeWindow:按照時間生成window。(按照Processing Time來劃分Window)

對於TimeWindow和CountWindow,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。

(1)滾動窗口(Tumbling Windows)

將數據依據固定的窗口長度對數據進行切分。

特色:時間對齊,窗口長度固定,沒有重疊。

滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。

(2)滑動窗口(Sliding Windows)

滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。

特色:時間對齊,窗口長度固定,有重疊。

滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。

所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。

使用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警。)

(3)會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成。相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。

特色:時間無對齊。

session 窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的

時間週期內再也不收到元素,即非活動間隔產生,那這個窗口就會關閉。一個Session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍

週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。

 

3、Window API

3.一、CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。

注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滾動窗口(Tumbling Windows) * 將數據依據固定的窗口長度對數據進行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }

3.2

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滑動窗口(Sliding Windows) * 將數據依據固定的窗口長度對數據進行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。 //知足步長,就執行一次,按第一個參數的長度
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }

4、EventTime與Window

一、EventTime的引入

在Flink的流式處理中,絕大部分的業務都會使用eventTime,通常只在eventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。

若是要使用EventTime,那麼須要引入EventTime的時間戳,引入方式以下所示:

二、Watermark

  概念:咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的

事件戳順序來的,可是也不排除因爲網絡、背壓等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的EventTime順序排列的。

  Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。

  Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。

  數據流中的Watermark用於表示eventTime小於Watermark的數量,都已經到達了,所以,window的執行也是由Watermark觸發的。

  Watermark能夠理解成一個延遲觸發機制。咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime 小於

maxEventTime-t 的全部數據都已經到達。若是有窗口的中止時間等於maxEventTime-t,那麼這個窗口被觸發執行。

滾動窗口/滑動窗口/會話窗口

 
  
import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow/**  * TimeWindow  */object EventTimeAndWindow {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //開啓watermark    //從調用時刻開始給env建立的每個stream追加時間特徵。    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(      new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {        override def extractTimestamp(element: String): Long = {          // event word  eventTime是日誌生成時間,咱們從日誌中解析EventTime          val eventTime = element.split(" ")(0).toLong          println(eventTime)          eventTime        }      }    ).map(item => (item.split(" ")(1),1L)).keyBy(0)    //加上滾動窗口,窗口大小是5s,調用window的api//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))    //滑動窗口//    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))    //會話窗口    val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))    val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))    streamReduce.print()        env.execute("EventTimeAndWindow")  }}
相關文章
相關標籤/搜索