1、Timejava
在Flink的流式處理中,會涉及到時間的不一樣概念web
Event Time(事件時間):是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳apache
Ingestion Time(採集時間):是數據進入Flink的時間windows
Processing Time(處理時間):是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。api
例如一條日誌進入Flink的時間爲2017-11-12 10:00:00.123 到達window的系統時間爲 2017-11-12 10:00:01.234,日誌內容以下:網絡
2017-11-02 18:37:15.624 INFO Fair over to rm2session
對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?----- eventTime,由於咱們要根據日誌的生成時間進行統計。socket
若是要想聚合,不可能對無解數據流進行聚合。ide
2、Windowspa
一、streaming流式計算是一種被設計用於處理處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。
Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的"buckets"桶,咱們能夠在這些桶上作計算操做。
共有兩類,五種時間窗口。
二、Window類型(兩類)
2.一、CountWindow:按照指定的數據條數生成一個window,與時間無關
2.二、TimeWindow:按照時間生成window。(按照Processing Time來劃分Window)
對於TimeWindow和CountWindow,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
(1)滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切分。
特色:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。
(2)滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特色:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。
所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。
使用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警。)
(3)會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成。相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特色:時間無對齊。
session 窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的
時間週期內再也不收到元素,即非活動間隔產生,那這個窗口就會關閉。一個Session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍
週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。
3、Window API
3.一、CountWindow
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滾動窗口(Tumbling Windows) * 將數據依據固定的窗口長度對數據進行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。 val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }
3.2
import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} /** * CountWindow 中的滑動窗口(Sliding Windows) * 將數據依據固定的窗口長度對數據進行切分。 */ object TimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream: DataStream[String] = env.socketTextStream("localhost",11111) val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0) //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的全部元素的總數。 //知足步長,就執行一次,按第一個參數的長度 val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2) .reduce((item1, item2)=>(item1._1,item1._2+item2._2)) streamWindow.print() env.execute("TimeAndWindow") } }
4、EventTime與Window
一、EventTime的引入
在Flink的流式處理中,絕大部分的業務都會使用eventTime,通常只在eventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。
若是要使用EventTime,那麼須要引入EventTime的時間戳,引入方式以下所示:
二、Watermark
概念:咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的
事件戳順序來的,可是也不排除因爲網絡、背壓等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的EventTime順序排列的。
Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。
Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。
數據流中的Watermark用於表示eventTime小於Watermark的數量,都已經到達了,所以,window的執行也是由Watermark觸發的。
Watermark能夠理解成一個延遲觸發機制。咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime 小於
maxEventTime-t 的全部數據都已經到達。若是有窗口的中止時間等於maxEventTime-t,那麼這個窗口被觸發執行。
滾動窗口/滑動窗口/會話窗口
import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow/** * TimeWindow */object EventTimeAndWindow { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //開啓watermark //從調用時刻開始給env建立的每個stream追加時間特徵。 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) { override def extractTimestamp(element: String): Long = { // event word eventTime是日誌生成時間,咱們從日誌中解析EventTime val eventTime = element.split(" ")(0).toLong println(eventTime) eventTime } } ).map(item => (item.split(" ")(1),1L)).keyBy(0) //加上滾動窗口,窗口大小是5s,調用window的api// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5))) //滑動窗口// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) //會話窗口 val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5))) val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2)) streamReduce.print() env.execute("EventTimeAndWindow") }}