你們好,我是後來,我會分享我在學習和工做中遇到的點滴,但願有機會個人某篇文章可以對你有所幫助,全部的文章都會在公衆號首發,歡迎你們關注個人公衆號" 後來X大數據 ",感謝你的支持與承認。
經過前2篇flink的學習,已經基本掌握了flink的基本使用,可是關於flink真正內核的東西還沒開始說,那先簡單介紹一下,flink的核心亮點:html
咱們在第一篇的學習瞭解到了flink的wordCount,以及在第二篇的API 中,咱們也只是獲取到數據,進行簡單的轉換,就直接把數據輸出。java
可是咱們在以前都是以事件爲驅動,等於說是來了一條數據,我就處理一次,可是如今遇到的問題是:apache
咱們能夠簡單的把wordCount的需求比作公司的訂單金額,也就是訂單金額會隨着訂單的增長而只增不減,那麼若是運營部門提了如下需求:編程
那麼面對這個需求,由於時間一直是流動的,你們有什麼想法?windows
基於這些需求,咱們來說一下flink的窗口。api
窗口:不管是hive中的開窗函數,仍是Spark中的批次計算中的窗口,仍是咱們這裏講的窗口,本質上都是對數據進行劃分,而後對劃分後的數據進行計算。網絡
那麼Windows是處理無限流的核心。Windows將流分紅有限大小的「存儲桶」,咱們能夠在其上應用計算。app
在flink中,窗口式Flink程序通常有2類,框架
stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
惟一的區別是:對鍵控流的keyBy(…)調用window(…),而非鍵控流則是調用windowAll(…)。socket
咱們上面說窗口就是對數據進行劃分到不一樣的「桶」中,而後進行計算,那麼什麼開始有這個桶,何時就算是分完了呢?
簡而言之,一旦應屬於該窗口的第一個元素到達,就會建立一個窗口,當時間超過用戶設置的時間戳時,flink將刪除這個窗口。
那咱們來理解一下窗口的類型:
TimeWindow:按照時間生成Window。
從文字也不難看出,CountWindow就是按照數據條數生成窗口,樣例代碼以下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CountWindowsTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val wordDS = env.socketTextStream("master102",3456) wordDS .map((_,1)) .keyBy(0) //累計單個Key中3條數據就進行處理 .countWindow(3) .sum(1) .print("測試:") env.execute() } }
執行結果以下:
能夠看出,不一樣的單詞根據keyby進入不一樣的窗口,而後當窗口中的單個key的數據個數達到3個以後進行輸出。
接下來,咱們主要來講一下時間窗口,這些窗口的結束與開始都是根據數據的時間來判斷的,因此這裏就引出了咱們今天的第二個重點:時間語義
Flink 在流式傳輸程序中支持不一樣的時間概念:
咱們根據業務的需求還判斷使用哪一個時間類型,通常來講使用Event Time更多,好比:在統計最近5分鐘的訂單總金額時,咱們須要的是真實的訂單時間,而不是進入flink的時間或者是處理時間。
在Flink的流式處理中,絕大部分的業務都會使用EventTime,通常只在EventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。默認狀況下,Flink框架中處理的時間語義爲ProcessingTime,若是要使用EventTime,那麼須要引入EventTime的時間屬性,引入方式以下所示:
import org.apache.flink.streaming.api.TimeCharacteristic val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
這裏注意:若是要使用事件時間,那麼必需要爲數據定義事件時間,而且還要註冊水位線
好了,又是一個新的知識點:水位線
咱們暫時先有這些概念,而後咱們再返回來繼續說咱們的窗口的類型。說完窗口類型,再詳細說水位線的應用。
因此這也爲後面的數據亂序埋下了坑,好比,2條訂單,它們的訂單時間差很少,一前一後,可是由於先下單的這條訂單的網絡狀況很差,致使後到達flink窗口,也就是咱們常說的數據亂序,那麼這種狀況該怎麼辦?咱們後面再說這個問題
特別注意:窗口是左閉右開的。
滾動窗口具備固定的尺寸和不重疊,例如,若是指定大小爲5分鐘的滾動窗口,則每五分鐘將啓動一個新窗口,以下圖所示。
樣例代碼以下:
import java.text.SimpleDateFormat import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * @description: ${description} * @author: Liu Jun Jun * @create: 2020-06-29 13:59 **/ object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataDS = env.socketTextStream("bigdata101", 3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).keyBy(0) //窗口大小爲5s的滾動窗口 //.timeWindow(Time.seconds(5))和下面的這種寫法都是能夠的 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }") } }.print("windows:>>>") env.execute() } }
經過運行,大概會發現,咱們輸入的時間戳並不會起做用,默認使用的確實是處理時間:
同時,能夠看出,滾動窗口的時間窗口不會有重疊,一條數據只會屬於一個窗口,並且,窗口是左閉右開的。
滑動窗口也是固定長度的窗口,不過因爲滑動的頻率,當滑動頻率小於窗口大小時,滑動窗口會重疊,在這種狀況下,一個元素被分配到多個窗口。
例如:指定大小爲10分鐘的窗口滑動5分鐘。這樣,您每隔5分鐘就會獲得一個窗口,其中包含最近10分鐘內到達的事件,以下圖所示。
接下來,我只貼改動代碼,其他代碼和上面的滾動代碼是同樣的:
//滾動5秒,滑動3秒 //.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))和下面的這句話是同樣的 .timeWindow(Time.seconds(5),Time.seconds(3))
很是關鍵的是:你們發現,flink默認的分配窗口是從每秒從0開始數的,舉例:會把5秒的窗口分爲:
[0-5),[5,10),[10-15),....
3秒的窗口爲:
[0-3),[3,6),[6-9),....
與滾動窗口和滑動窗口相比,會話窗口不重疊且沒有固定的開始和結束時間。相反,會話窗口在必定時間段內未收到元素時(即,出現不活動間隙時)關閉。隨後的元素將分配給新的會話窗口。
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
能夠看出,此次的窗口大小並非固定的,那麼我在測試輸入的時候,輸完一些後等了一下子才繼續輸入的,那麼就出現了第一個窗口,因此只要processtime間隔時間超過10s,就會輸出上一個窗口。
若是使用timewindow()方法,那麼會隨着事件時間的指定會更改成以事件時間爲標準的窗口,而若是使用window()方法,那麼其中的參數會發生變化。
//滾動窗口 //事件時間 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //處理時間 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //滑動窗口 //事件時間 .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) //處理時間 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //會話窗口 //事件時間 .window(EventTimeSessionWindows.withGap(Time.minutes(10))) //處理時間 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
[0-5),[5,10),[10-15),....
3秒的窗口爲:
[0-3),[3,6),[6-9),....
那麼可不能夠作到窗口的劃分爲[1-6),[6,11)...
固然能夠,flink有窗口偏移設置。通常用不到,我在這裏簡單貼一下使用方式:
//5秒的窗口偏移3秒 .window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
能從上圖看出,窗口從原來的80-85,偏移到了83-88。那我再把方法總結一下
//窗口偏移方法總結 //滾動窗口 //事件時間 .window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3))) //處理時間 .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3))) //滑動窗口 //事件時間 .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) //處理時間 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3))) //會話窗口 //事件時間 .window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3))) //處理時間 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3)))
關於窗口的使用基本上差很少了,接下來只要說一說水位線
WaterMark,叫作水位線,那它是幹啥的?支持事件時間的流處理器須要一種衡量事件時間進度的方法。
這裏要注意:使用事件時間必需要使用註冊水位線,而水位線也是事件時間專用的
例如,當以事件時間開窗1小時,目前窗口剛超過一個小時,須要通知構建每小時窗口的窗口操做員,關閉正在進行中的這個窗口程序。
那問題來了,怎麼衡量時間到了沒?因此Flink中用於衡量事件時間進度的機制是水位線。
強調:並非每條數據都會生成水位線。水位線也是一條數據,是流數據的一部分,watermark是一個全局的值,不是某一個key下的值,因此即便不是同一個key的數據,其warmark也會增長。
同時,水位線還有一個重要做用,就是處理延遲數據,咱們在文章開頭的部分也提到了,數據亂序怎麼處理,那麼有些數據由於網絡的緣由,延遲了幾秒,因此也能夠把水位線看做是窗口最後的執行時間。
好比說,咱們規定滾動窗口爲5秒,也就是[5-10),同時咱們預測數據通常可能延遲3秒,因此咱們但願窗口是當10s的數據到達後,繼續等待3秒,看這3秒內,仍是否有本來是[5-10)中的數據,一塊兒歸併到這個窗口中,等到出現了時間爲大於等於13s的數據時,就會觸發[5-10)這個窗口的數據執行。這就是延遲處理。(代碼案例看下面的週期性水位線)
有兩種分配時間戳和生成水位線的方法:
Event Time的使用必定要指定數據源中的時間戳。不然程序沒法知道事件的事件時間是什麼(數據源裏的數據沒有時間戳的話,就只能使用Processing Time了)。
那咱們就利用第二種方式來生成水位線吧,注意要在事件時間的第一個操做(例如第一個窗口操做)以前指定分配器,例如:
咱們發現註冊水位線的有2個接口能夠實現:
一個一個說,先說週期性生成水位線:
//flink默認200ms(毫秒)生成一條水位線,那咱們也能夠修改 @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } //單位是毫秒,因此我這裏模擬設置的爲10s env.getConfig.setAutoWatermarkInterval(10000)
那麼這裏的時間間隔指的是系統時間的10s,可不是事件時間的10s,這個不要弄混,不相信的話能夠等會看個人測試案例。
import java.text.SimpleDateFormat import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * @description: ${本案例模擬的是:以事件時間爲標準,窗口滾動時間爲5秒} * @author: Liu Jun Jun * @create: 2020-06-28 18:31 **/ object WaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //設置以事件時間爲基準 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //並行度設置爲1。關於並行度的案例會在後面測試 env.setParallelism(1) //設置10s生成一次水位線 env.getConfig.setAutoWatermarkInterval(10000) val dataDS = env.socketTextStream("bigdata101", 3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(String,Long,Int)]{ var maxTs :Long= 0 //獲得水位線,週期性調用這個方法,獲得水位線,我這裏設置的也就是延遲5秒 override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000) //負責抽取事件事件 override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { maxTs = maxTs.max(element._2 * 1000L) element._2 * 1000L } } /*new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)) { override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000 }*/ ) val result = tsDS .keyBy(0) //窗口大小爲5s的滾動窗口 .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }") } } tsDS.print("water") result.print("windows:>>>") env.execute() } }
那麼從結果能夠看出:
【10- 15)的窗口是20這條數據觸發的,在我輸入20這條數據等了幾秒後輸出了第一個窗口
證明:10s的間隔時間爲系統時間,同時水位線=當前時間戳 - 延遲時間 ,若是窗口的end time <= 水位線,則會觸發這個窗口的執行
【15- 20)的窗口是25這條數據觸發的,一樣符合窗口的end time <= 水位線
那麼若是數據的窗口已經觸發了,但還有一點數據仍是遲到了怎麼辦?
全部還有個概念就是allowedLateness(容許接收延遲數據),而且還會繼續把數據放入對應的窗口。看代碼吧:
//其他代碼和上面案例的同樣,只是在開窗以後多了一行 .keyBy(0) .timeWindow(Time.seconds(5)) //具體這2秒錶明什麼意思,看完測試結果案例就懂了 .allowedLateness(Time.seconds(2) .apply{}
經過看圖應該能明白這裏allowedLateness(Time.seconds(2)是什麼意思了,只要是窗口觸發後,時間小於設定的延遲時間,收到的延遲數據均可以處理,但要是沒有設置allowedLateness(Time.seconds(2)),那麼窗口觸發後的延遲數據都不會處理。
數據的延遲老是不可徹底預測的,假如時間已經超過了容許接收的延遲數據時間,還有一點點數據遲到,就是上圖中,在22這條數據以後我輸入的14這條數據,那怎麼辦?這種狀況下,咱們不能爲了偶爾的一點數據就把全部窗口的等待時間延遲好久,全部還有個概念就是側輸出流,將晚到的數據放置在側輸出流中。來看代碼:
//只加了3行,其他的和以前的代碼同樣 val outputTag = new OutputTag[(String, Long, Int)]("lateData")//新加的 val result = tsDS .keyBy(0) .timeWindow(Time.seconds(5)) .allowedLateness(Time.seconds(2)) .sideOutputLateData(outputTag)//新加的 .apply {} result.getSideOutput(outputTag).print("side>>>")//新加的
知識不少東西是想通的,因此開始講延遲數據就巴拉巴拉一堆,再繼續說間標記水位線,爲何叫作標記呢?由於這種水位線的生成與時間無關,而是決定於什麼時候收到標記事件。
默認狀況下,全部的數據都屬於標記事件,意味着每條數據都會生成水位線。
因此使用這種方式的時候,須要對某些特定事件進行標記。
object WaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataDS = env.socketTextStream("bigdata101", 3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPunctuatedWatermarks[(String,Long,Int)] { override def checkAndGetNextWatermark(lastElement: (String, Long, Int), extractedTimestamp: Long): Watermark = { if (lastElement._1 .contains("later")){ println("間歇性生成了水位線.....") // 間歇性生成水位線數據 new Watermark(extractedTimestamp) } return null } override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { element._2 * 1000L } } ) val result = tsDS .keyBy(0) .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }") } } tsDS.print("water") result.print("calc") env.execute() } }
看一下個人測試結果:
固然即使咱們設置了標記,在TPS很高的場景下依然會產生大量的Watermark,在必定程度上對下游算子形成壓力,因此只有在實時性要求很是高的場景纔會選擇Punctuated的方式進行Watermark的生成。
細心的小夥伴也會發現,我在上面的全部的案例中,使用的並行度都是1,但實際生產中確定不是1啊,這個會有什麼變化麼?固然是有的。
我先說結論:
若是並行度不爲1,那麼在計算窗口時,是按照各自的並行度單獨計算的。只有當全部並行度中都觸發了同一個窗口,那麼這個窗口才會觸發。
口說無憑,咱們來看案例,此次放完整代碼:
import java.text.SimpleDateFormat import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * @description: ${模擬多並行度下,窗口如何觸發} * @author: Liu Jun Jun * @create: 2020-06-28 18:31 **/ object WaterMarkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //注掉了並行度爲1,默認並行度=cpu核數,我這裏cpu爲4個 //env.setParallelism(1) env.getConfig.setAutoWatermarkInterval(10000) val dataDS = env.socketTextStream("bigdata101", 3456) val tsDS = dataDS.map(str => { val strings = str.split(",") (strings(0), strings(1).toLong, 1) }).assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks[(String,Long,Int)]{ var maxTs :Long= 0 override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000) override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = { maxTs = maxTs.max(element._2 * 1000L) element._2 * 1000L } } ) //該案例中,爲了簡單,去掉了allowedLateness和側輸出流 val result = tsDS .keyBy(0) .timeWindow(Time.seconds(5)) .apply { (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => { val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }") out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }") } } tsDS.print("water") result.print("calc") env.execute() } }
看一下測試結果吧:
好了,到這裏,窗口、時間語義以及水位線的基本原理就說完了,理解了這些再看看文章開頭提到了4個需求,是否是就有些想法了呢?
到目前爲止,咱們只是對數據進行了開窗,可是數據在一個窗口內怎麼處理尚未說,那麼下一章就來講處理函數,以及Flink的狀態編程。
在此次學習中發現的不錯的帖子:https://www.cnblogs.com/rossi...