1、簡介數據庫
開源流式處理系統在不斷地發展,從一開始只關注低延遲指標到如今兼顧延遲、吞吐與結果準確性,在發展過程當中解決了不少問題,編程API的易用性也在不斷地提升。本文介紹一下 Flink 中的核心概念,這些概念是學習與使用 Flink 十分重要的基礎知識,在後續開發 Flink 程序過程當中將會幫助開發人員更好地理解 Flink 內部的行爲和機制。apache
這裏引用一張圖來對經常使用的實時計算框架作個對比:編程
Flink 是有狀態的和容錯的,能夠在維護一次應用程序狀態的同時無縫地從故障中恢復。它支持大規模計算能力,可以在數千個節點上併發運行。它具備很好的吞吐量和延遲特性。同時,Flink 提供了多種靈活的窗口函數。Flink 在流式計算裏屬於真正意義上的單條處理,每一條數據都觸發計算,而不是像 Spark 同樣的 Mini Batch 做爲流式處理的妥協。Flink的容錯機制較爲輕量,對吞吐量影響較小,並且擁有圖和調度上的一些優化,使得 Flink 能夠達到很高的吞吐量。而 Strom 的容錯機制須要對每條數據進行ack,所以其吞吐量瓶頸也是備受詬病。後端
2、工做原理數組
Flink基本工做原理以下圖:網絡
JobClient:負責接收程序,解析和優化程序的執行計劃,而後提交執行計劃到JobManager。這裏執行的程序優化是將相鄰的Operator融合,造成Operator Chain,Operator的融合能夠減小task的數量,提升TaskManager的資源利用率。session
JobManagers:負責申請資源,協調以及控制整個job的執行過程,具體包括,調度任務、處理checkpoint、容錯等等。數據結構
TaskManager:TaskManager運行在不一樣節點上的JVM進程,負責接收並執行JobManager發送的task,而且與JobManager通訊,反饋任務狀態信息,若是說JobManager是master的話,那麼TaskManager就是worker用於執行任務。每一個TaskManager像是一個容器,包含一個或者多個Slot。併發
Slot:Slot是TaskManager資源粒度的劃分,每一個Slot都有本身獨立的內存。全部Slot平均分配TaskManager的內存,值得注意的是,Slot僅劃份內存,不涉及CPU的劃分,即CPU是共享使用。每一個Slot能夠運行多個task。Slot的個數就表明了一個程序的最高並行度。框架
Task:Task是在operators的subtask進行鏈化以後造成的,具體Flink job中有多少task和operator的並行度和鏈化的策略有關。
SubTask:由於Flink是分佈式部署的,程序中的每一個算子,在實際執行中被分隔爲一個或者多個subtask,運算符子任務(subtask)的數量是該特定運算符的並行度。數據流在算子之間流動,就對應到SubTask之間的數據傳輸。Flink容許同一個job中來自不一樣task的subtask能夠共享同一個slot。每一個slot能夠執行一個並行的pipeline。能夠將pipeline看做是多個subtask的組成的。
3、核心概念
1、Time(時間語義)
Flink 中的 Time 分爲三種:事件時間、達到時間與處理時間。
1)事件時間:是事件真實發生的時間。
2)達到時間:是系統接收到事件的時間,即服務端接收到事件的時間。
3)處理時間:是系統開始處理到達事件的時間。
在某些場景下,處理時間等於達到時間。由於處理時間沒有亂序的問題,因此服務端作基於處理時間的計算是比較簡單的,無遲到與亂序數據。
Flink 中只須要經過 env 環境變量便可設置Time:
//建立環境上下文 val env = StreamExecutionEnvironment.getExecutionEnvironment // 設置在當前程序中使用 ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2、Window(窗口)
窗口本質就是將無限數據集沿着時間(或者數量)的邊界切分紅有限數據集。
1)Time Window:基於時間的,分爲Tumbling Window和Sliding Window 。
2)Count Window:基於數量的,分爲Tumbling Window和Sliding Window。
3)Session Window:基於會話的,一個session window關閉一般是因爲一段時間沒有收到元素。
4)Global Window:全局窗口。
在實際操做中,window又分爲兩大類型的窗口:Keyed Window 和 Non-keyed Window,兩種類型的窗口操做API有細微的差異。
3、Trigger
1)自定義觸發器
觸發器決定了窗口什麼時候會被觸發計算,Flink 中開發人員須要在 window 類型的操做以後才能調用 trigger 方法傳入觸發器定義。Flink 中的觸發器定義須要繼承並實現 Trigger 接口,該接口有如下方法:
以上方法會返回決定如何觸發執行的 TriggerResult:
2)預約義觸發器
若是開發人員未指定觸發器,則 Flink 會自動根據場景使用默認的預約義好的觸發器。在基於事件時間的窗口中使用 EventTimeTrigger,該觸發器會在watermark經過窗口邊界後當即觸發(即watermark出現關閉改窗口時)。在全局窗口(GlobalWindow)中使用 NeverTrigger,該觸發器永遠不會觸發,因此在使用全局窗口時用戶須要自定義觸發器。
4、State
Managed State 是由flink runtime管理來管理的,自動存儲、自動恢復,在內存管理上有優化機制。且Managed State 支持常見的多種數據結構,如value、list、map等,在大多數業務場景中都有適用之處。整體來講是對開發人員來講是比較友好的,所以 Managed State 是 Flink 中最經常使用的狀態。Managed State 又分爲 Keyed State 和 Operator State 兩種。
Raw State 由用戶本身管理,須要序列化,只能使用字節數組的數據結構。Raw State 的使用和維度都比 Managed State 要複雜,建議在自定義的Operator場景中酌情使用。
5、狀態存儲
Flink中狀態的實現有三種:MemoryState、FsState、RocksDBState。三種狀態存儲方式與使用場景各不相同,詳細介紹以下:
1)MemoryStateBackend
構造函數:MemoryStateBackend(int maxStateSize, boolean asyncSnapshot)
存儲方式:State存儲於各個 TaskManager內存中,Checkpoint存儲於 JobManager內存
容量限制:單個State最大5M、maxStateSize<=akka.framesize(10M)、總大小不超過JobManager內存
使用場景:無狀態或者JobManager掛掉不影響的測試環境等,不建議在生產環境使用
2)FsStateBackend
構造函數:FsStateBackend(URI checkpointUri, boolean asyncSnapshot)
存儲方式:State存儲於 TaskManager內存,Checkpoint存儲於 外部文件系統(本次磁盤 or HDFS)
容量限制:State總量不超過TaskManager內存、Checkpoint總大小不超過外部存儲空間
使用場景:常規使用狀態的做業,分鐘級的窗口聚合等,可在生產環境使用
3)RocksDBStateBackend
構造函數:RocksDBStateBackend(URI checkpointUri, boolean enableincrementCheckpoint)
存儲方式:State存儲於 TaskManager上的kv數據庫(內存+磁盤),Checkpoint存儲於 外部文件系統(本次磁盤 or HDFS)
容量限制:State總量不超過TaskManager內存+磁盤、單key最大2g、Checkpoint總大小不超過外部存儲空間
使用場景:超大狀態的做業,天級的窗口聚合等,對讀寫性能要求不高的場景,可在生產環境使用
根據業務場景須要用戶選擇最合適的 StateBackend ,代碼中只需在相應的 env 環境中設置便可:
// flink 上下文環境變量 val env = StreamExecutionEnvironment.getExecutionEnvironment // 設置狀態後端爲 FsStateBackend,數據存儲到 hdfs /tmp/flink/checkpoint/test 中 env.setStateBackend(new FsStateBackend("hdfs://ns1/tmp/flink/checkpoint/test", false))
6、Checkpoint
Checkpoint 是分佈式全域一致的,數據會被寫入hdfs等共享存儲中。且其產生是異步的,在不中斷、不影響運算的前提下產生。
用戶只需在相應的 env 環境中設置便可:
// 1000毫秒進行一次 Checkpoint 操做 env.enableCheckpointing(1000) // 模式爲準確一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 兩次 Checkpoint 之間最少間隔 500毫秒 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) // Checkpoint 過程超時時間爲 60000毫秒,即1分鐘視爲超時失敗 env.getCheckpointConfig.setCheckpointTimeout(60000) // 同一時間只容許1個Checkpoint的操做在執行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
7、Watermark
Flink 程序並 不能自動提取數據源中哪一個字段/標識爲數據的事件時間,從而也就沒法本身定義 Watermark 。
開發人員須要經過 Flink 提供的 API 來 提取和定義 Timestamp/Watermark,能夠在 數據源或者數據流中 定義。
1)自定義數據源設置 Timestamp/Watermark
自定義的數據源類須要繼承並實現 SourceFunction[T] 接口,其中 run 方法是定義數據生產的地方:
//自定義的數據源爲自定義類型MyType class MySource extends SourceFunction[MyType]{ //重寫run方法,定義數據生產的邏輯 override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() //設置timestamp從MyType的哪一個字段獲取(eventTimestamp) ctx.collectWithTimestamp(next, next.eventTimestamp) if (next.hasWatermarkTime) { //設置watermark從MyType的那個方法獲取(getWatermarkTime) ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } } }
2)在數據流中設置 Timestamp/Watermark
在數據流中,能夠設置 stream 的 Timestamp Assigner ,該 Assigner 將會接收一個 stream,並生產一個帶 Timestamp和Watermark 的新 stream。
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
8、廣播狀態(Broadcast State)
和 Spark 中的廣播變量同樣,Flink 也支持在各個節點中各存一份小數據集,所在的計算節點實例可在本地內存中直接讀取被廣播的數據,能夠避免Shuffle提升並行效率。
廣播狀態(Broadcast State)的引入是爲了支持一些來自一個流的數據須要廣播到全部下游任務的狀況,它存儲在本地,用於處理其餘流上的全部傳入元素。
// key the shapes by color KeyedStream<Item, Color> colorPartitionedStream = shapeStream.keyBy(new KeySelector<Shape, Color>(){...}); // a map descriptor to store the name of the rule (string) and the rule itself. MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint<Rule>() {})); // broadcast the rules and create the broadcast state BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor); DataStream<Match> output = colorPartitionedStream.connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<Color, Item, Rule, String>(){...});
9、Operator Chain
Flink做業中,能夠指定相關的chain將相關性很是強的轉換操做(operator)綁定在一塊兒,使得上下游的Task在同一個Pipeline中執行,避免由於數據在網絡或者線程之間傳輸致使的開銷。
通常狀況下Flink在Map類型的操做中默認開啓 Operator Chain 以提升總體性能,開發人員也能夠根據須要建立或者禁止 Operator Chain 對任務進行細粒度的鏈條控制。
//建立 chain dataStream.filter(...).map(...).startNewChain().map(...) //禁止 chain dataStream.map(...).disableChaining()
建立的鏈條只對當前的操做符和以後的操做符有效,不不影響其餘操做,如上代碼只針對兩個map操做進行鏈條綁定,對前面的filter操做無效,若是須要能夠在filter和map之間使用 startNewChain方法便可。
參考:
https://zhuanlan.zhihu.com/p/93507000
https://ci.apache.org/projects/flink/flink-docs-release-1.9/