Apache Flink:特性、概念、組件棧、架構及原理分析(全)

Apache Flink是一個面向分佈式數據流處理和批量數據處理的開源計算平臺,它可以基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理做爲兩種不一樣的應用類型,由於他們它們所提供的SLA是徹底不相同的:流處理通常須要支持低延遲、Exactly-once保證,而批處理須要支持高吞吐、高效處理,因此在實現的時候一般是分別給出兩套實現方法,或者經過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapReduce、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。
Flink在實現流處理和批處理時,與傳統的一些方案徹底不一樣,它從另外一個視角看待流處理和批處理,將兩者統一塊兒來:Flink是徹底支持流處理,也就是說做爲流處理看待時輸入數據流是無界的;批處理被做爲一種特殊的流處理,只是它的輸入數據流被定義爲有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。html

基本特性web

關於Flink所支持的特性,我這裏只是經過分類的方式簡單作一下梳理,涉及到具體的一些概念及其原理會在後面的部分作詳細說明。算法

流處理特性apache

  • 支持高吞吐、低延遲、高性能的流處理
  • 支持帶有事件時間的窗口(Window)操做
  • 支持有狀態計算的Exactly-once語義
  • 支持高度靈活的窗口(Window)操做,支持基於time、count、session,以及data-driven的窗口操做
  • 支持具備Backpressure功能的持續流模型
  • 支持基於輕量級分佈式快照(Snapshot)實現的容錯
  • 一個運行時同時支持Batch on Streaming處理和Streaming處理
  • Flink在JVM內部實現了本身的內存管理
  • 支持迭代計算
  • 支持程序自動優化:避免特定狀況下Shuffle、排序等昂貴操做,中間結果有必要進行緩存

API支持api

  • 對Streaming數據類應用,提供DataStream API
  • 對批處理類應用,提供DataSet API(支持Java/Scala)

Libraries支持緩存

  • 支持機器學習(FlinkML)
  • 支持圖分析(Gelly)
  • 支持關係數據處理(Table)
  • 支持復瑣事件處理(CEP)

整合支持網絡

  • 支持Flink on YARN
  • 支持HDFS
  • 支持來自Kafka的輸入數據
  • 支持Apache HBase
  • 支持Hadoop程序
  • 支持Tachyon
  • 支持ElasticSearch
  • 支持RabbitMQ
  • 支持Apache Storm
  • 支持S3
  • 支持XtreemFS

基本概念session

Stream & Transformation & Operator架構

用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一箇中間結果數據,而Transformation是一個操做,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射爲Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它相似於一個DAG圖,在啓動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。
下面是一個由Flink程序映射爲Streaming Dataflow的示意圖,以下所示:
flink-streaming-dataflow-example
上圖中,FlinkKafkaConsumer是一個Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一個Sink Operator。併發

Parallel Dataflow

在Flink中,程序天生是並行和分佈式的:一個Stream能夠被分紅多個Stream分區(Stream Partitions),一個Operator能夠被分紅多個Operator Subtask,每個Operator Subtask是在不一樣的線程中獨立執行的。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度老是等於生成它的Operator的並行度。
有關Parallel Dataflow的實例,以下圖所示:
flink-parallel-dataflow
上圖Streaming Dataflow的並行視圖中,展示了在兩個Operator之間的Stream的兩種模式:

  • One-to-one模式

好比從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。

  • Redistribution模式

這種模式改變了輸入數據流的分區,好比從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下遊的多個不一樣的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關係。
另外,Source Operator對應2個Subtask,因此並行度爲2,而Sink Operator的Subtask只有1個,故而並行度爲1。

Task & Operator Chain

在Flink分佈式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每一個執行鏈會在TaskManager上一個獨立的線程中執行,以下圖所示:
flink-tasks-chains
上圖中上半部分表示的是一個Operator Chain,多個Operator經過Stream鏈接,而每一個Operator在運行時對應一個Task;圖中下半部分是上半部分的一個並行版本,也就是對每個Task都並行化爲多個Subtask。

Time & Window

Flink支持基於時間窗口操做,也支持基於數據的窗口操做,以下圖所示:
flink-window
上圖中,基於時間的窗口操做,在每一個相同的時間間隔對Stream中的記錄進行處理,一般各個時間間隔內的窗口操做處理的記錄數不固定;而基於數據驅動的窗口操做,能夠在Stream中選擇固定數量的記錄做爲一個窗口,對該窗口中的記錄進行處理。
有關窗口操做的不一樣類型,能夠分爲以下幾種:傾斜窗口(Tumbling Windows,記錄沒有重疊)、滑動窗口(Slide Windows,記錄有重疊)、會話窗口(Session Windows),具體能夠查閱相關資料。
在處理Stream中的記錄時,記錄中一般會包含各類典型的時間字段,Flink支持多種時間的處理,以下圖所示:
flink-event-ingestion-processing-time
上圖描述了在基於Flink的流處理系統中,各類不一樣的時間所處的位置和含義,其中,Event Time表示事件建立時間,Ingestion Time表示事件進入到Flink Dataflow的時間 ,Processing Time表示某個Operator對事件進行處理事的本地系統時間(是在TaskManager節點上)。這裏,談一下基於Event Time進行處理的問題,一般根據Event Time會給整個Streaming應用帶來必定的延遲性,由於在一個基於事件的處理系統中,進入系統的事件可能會基於Event Time而發生亂序現象,好比事件來源於外部的多個系統,爲了加強事件處理吞吐量會將輸入的多個Stream進行天然分區,每一個Stream分區內部有序,可是要保證全局有序必須同時兼顧多個Stream分區的處理,設置必定的時間窗口進行暫存數據,當多個Stream分區基於Event Time排列對齊後才能進行延遲處理。因此,設置的暫存數據記錄的時間窗口越長,處理性能越差,甚至嚴重影響Stream處理的實時性。
有關基於時間的Streaming處理,能夠參考官方文檔,在Flink中借鑑了Google使用的WaterMark實現方式,能夠查閱相關資料。

基本架構

Flink系統的架構與Spark相似,是一個基於Master-Slave風格的架構,以下圖所示:
flink-system-architecture
Flink集羣啓動時,會啓動一個JobManager進程、至少一個TaskManager進程。在Local模式下,會在同一個JVM內部啓動一個JobManager進程和TaskManager進程。當Flink程序提交後,會建立一個Client來進行預處理,並轉換爲一個並行數據流,這是對應着一個Flink Job,從而能夠被JobManager和TaskManager執行。在實現上,Flink基於Actor實現了JobManager和TaskManager,因此JobManager與TaskManager之間的信息交換,都是經過事件的方式來進行處理。
如上圖所示,Flink系統主要包含以下3個主要的進程:

JobManager

JobManager是Flink系統的協調者,它負責接收Flink Job,調度組成Job的多個Task的執行。同時,JobManager還負責收集Job的狀態信息,並管理Flink集羣中從節點TaskManager。JobManager所負責的各項管理功能,它接收到並處理的事件主要包括:

  • RegisterTaskManager

在Flink集羣啓動的時候,TaskManager會向JobManager註冊,若是註冊成功,則JobManager會向TaskManager回覆消息AcknowledgeRegistration。

  • SubmitJob

Flink程序內部經過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。

  • CancelJob

請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,若是成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。

  • UpdateTaskExecutionState

TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。

  • RequestNextInputSplit

運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。

  • JobStatusChanged

ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、FINISHED等。

TaskManager

TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每一個TaskManager負責管理其所在節點上的資源信息,如內存、磁盤、網絡,在啓動的時候將資源的狀態向JobManager彙報。TaskManager端能夠分紅兩個階段:

  • 註冊階段

TaskManager會向JobManager註冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,而後TaskManager就能夠進行初始化過程。

  • 可操做階段

該階段TaskManager能夠接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。若是TaskManager沒法鏈接到JobManager,這是TaskManager就失去了與JobManager的聯繫,會自動進入「註冊階段」,只有完成註冊才能繼續處理Task相關的消息。

Client

當用戶提交一個Flink程序時,會首先建立一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集羣中處理,因此Client須要從用戶提交的Flink程序配置中獲取JobManager的地址,並創建到JobManager的鏈接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 而且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的以下信息:JobID、Job名稱、配置信息、一組JobVertex等。

組件棧

Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧以下圖所示:
flink-component-stack
下面,咱們自下而上,分別針對每一層進行解釋說明:

  • Deployment層

該層主要涉及了Flink的部署模式,Flink支持多種部署模式:本地、集羣(Standalone/YARN)、雲(GCE/EC2)。Standalone部署模式與Spark相似,這裏,咱們看一下Flink on YARN的部署模式,以下圖所示:
flink-on-yarn
瞭解YARN的話,對上圖的原理很是熟悉,實際Flink也實現了知足在YARN集羣上運行的各個組件:Flink YARN Client負責與YARN RM通訊協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。經過上圖能夠看到,YARN AM與Flink JobManager在同一個Container中,這樣AM能夠知道Flink JobManager的地址,從而AM能夠申請Container去啓動Flink TaskManager。待Flink成功運行在YARN集羣上,Flink YARN Client就能夠提交Flink Job到Flink JobManager,並進行後續的映射、調度和計算處理。

  • Runtime層

Runtime層提供了支持Flink計算的所有核心實現,好比:支持分佈式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,爲上層API層提供基礎服務。

  • API層

API層主要實現了面向無界Stream的流處理和麪向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。

  • Libraries層

該層也能夠稱爲Flink應用框架層,根據API層的劃分,在API層之上構建的知足特定應用的實現計算框架,也分別對應於面向流處理和麪向批處理兩類。面向流處理支持:CEP(復瑣事件處理)、基於SQL-like的操做(基於Table的關係操做);面向批處理支持:FlinkML(機器學習庫)、Gelly(圖處理)。

內部原理

容錯機制

Flink基於Checkpoint機制實現容錯,它的原理是不斷地生成分佈式Streaming數據流Snapshot。在流處理失敗時,經過這些Snapshot能夠恢復數據流處理。理解Flink的容錯機制,首先須要瞭解一下Barrier這個概念:
Stream Barrier是Flink分佈式Snapshotting中的核心元素,它會做爲數據流的記錄被同等看待,被插入到數據流中,將數據流中記錄的進行分組,並沿着數據流的方向向前推動。每一個Barrier會攜帶一個Snapshot ID,屬於該Snapshot的記錄會被推向該Barrier的前方。由於Barrier很是輕量,因此並不會中斷數據流。帶有Barrier的數據流,以下圖所示:
flink-stream-barriers
基於上圖,咱們經過以下要點來講明:

  • 出現一個Barrier,在該Barrier以前出現的記錄都屬於該Barrier對應的Snapshot,在該Barrier以後出現的記錄屬於下一個Snapshot
  • 來自不一樣Snapshot多個Barrier可能同時出如今數據流中,也就是說同一個時刻可能併發生成多個Snapshot
  • 當一箇中間(Intermediate)Operator接收到一個Barrier後,它會發送Barrier到屬於該Barrier的Snapshot的數據流中,等到Sink Operator接收到該Barrier後會向Checkpoint Coordinator確認該Snapshot,直到全部的Sink Operator都確認了該Snapshot,才被認爲完成了該Snapshot

這裏還須要強調的是,Snapshot並不只僅是對數據流作了一個狀態的Checkpoint,它也包含了一個Operator內部所持有的狀態,這樣纔可以在保證在流處理系統失敗時可以正確地恢復數據流處理。也就是說,若是一個Operator包含任何形式的狀態,這種狀態必須是Snapshot的一部分。
Operator的狀態包含兩種:一種是系統狀態,一個Operator進行計算處理的時候須要對數據進行緩衝,因此數據緩衝區的狀態是與Operator相關聯的,以窗口操做的緩衝區爲例,Flink系統會收集或聚合記錄數據並放到緩衝區中,直到該緩衝區中的數據被處理完成;另外一種是用戶自定義狀態(狀態能夠經過轉換函數進行建立和修改),它能夠是函數中的Java對象這樣的簡單變量,也能夠是與函數相關的Key/Value狀態。
對於具備輕微狀態的Streaming應用,會生成很是輕量的Snapshot並且很是頻繁,但並不會影響數據流處理性能。Streaming應用的狀態會被存儲到一個可配置的存儲系統中,例如HDFS。在一個Checkpoint執行過程當中,存儲的狀態信息及其交互過程,以下圖所示:
flink-checkpointing
在Checkpoint過程當中,還有一個比較重要的操做——Stream Aligning。當Operator接收到多個輸入的數據流時,須要在Snapshot Barrier中對數據流進行排列對齊,以下圖所示:
flink-stream-aligning
具體排列過程以下:

  1. Operator從一個incoming Stream接收到Snapshot Barrier n,而後暫停處理,直到其它的incoming Stream的Barrier n(不然屬於2個Snapshot的記錄就混在一塊兒了)到達該Operator
  2. 接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中
  3. 一旦最後一個Stream接收到Barrier n,Operator會emit全部暫存在Buffer中的記錄,而後向Checkpoint Coordinator發送Snapshot n
  4. 繼續處理來自多個Stream的記錄

基於Stream Aligning操做可以實現Exactly Once語義,可是也會給流處理應用帶來延遲,由於爲了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤爲是在數據流並行度很高的場景下可能更加明顯,一般以最遲對齊Barrier的一個Stream爲處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,若是關掉則Exactly Once會變成At least once。

調度機制

在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射爲一個ExecutionGraph,以下圖所示:
flink-job-and-execution-graph
經過上圖能夠看出:
JobGraph是一個Job的用戶邏輯視圖表示,將一個用戶要對數據流進行的處理表示爲單個DAG圖(對應於JobGraph),DAG圖由頂點(JobVertex)和中間結果集(IntermediateDataSet)組成,其中JobVertex表示了對數據流進行的轉換操做,好比map、flatMap、filter、keyBy等操做,而IntermediateDataSet是由上游的JobVertex所生成,同時做爲下游的JobVertex的輸入。
而ExecutionGraph是JobGraph的並行表示,也就是實際JobManager調度一個Job在TaskManager上運行的邏輯視圖,它也是一個DAG圖,是由ExecutionJobVertex、IntermediateResult(或IntermediateResultPartition)組成,ExecutionJobVertex實際對應於JobGraph圖中的JobVertex,只不過在ExecutionJobVertex內部是一種並行表示,由多個並行的ExecutionVertex所組成。另外,這裏還有一個重要的概念,就是Execution,它是一個ExecutionVertex的一次運行Attempt,也就是說,一個ExecutionVertex可能對應多個運行狀態的Execution,好比,一個ExecutionVertex運行產生了一個失敗的Execution,而後還會建立一個新的Execution來運行,這時就對應這個2次運行Attempt。每一個Execution經過ExecutionAttemptID來惟一標識,在TaskManager和JobManager之間進行Task狀態的交換都是經過ExecutionAttemptID來實現的。
下面看一下,在物理上進行調度,基於資源的分配與使用的一個例子,來自官網,以下圖所示:
flink-scheduled-task-slots
說明以下:

  • 左上子圖:有2個TaskManager,每一個TaskManager有3個Task Slot
  • 左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應一個JobGraph
  • 左下子圖:用戶提交的Flink Job對各個Operator進行的配置——data source的並行度設置爲4,MapFunction的並行度也爲4,ReduceFunction的並行度爲3,在JobManager端對應於ExecutionGraph
  • 右上子圖:TaskManager 1上,有2個並行的ExecutionVertex組成的DAG圖,它們各佔用一個Task Slot
  • 右下子圖:TaskManager 2上,也有2個並行的ExecutionVertex組成的DAG圖,它們也各佔用一個Task Slot
  • 在2個TaskManager上運行的4個Execution是並行執行的

迭代機制

機器學習和圖計算應用,都會使用到迭代計算,Flink經過在迭代Operator中定義Step函數來實現迭代算法,這種迭代算法包括Iterate和Delta Iterate兩種類型,在實現上它們反覆地在當前迭代狀態上調用Step函數,直到知足給定的條件纔會中止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代算法原理進行說明:

  • Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,經過該輪迭代計算出下一輪計算所須要的輸入(也稱爲Next Partial Solution),知足迭代的終止條件後,會輸出最終迭代結果,具體執行流程以下圖所示:
flink-iterations-iterate-operator
Step函數在每一輪迭代中都會被執行,它能夠是由map、reduce、join等Operator組成的數據流。下面經過官網給出的一個例子來講明Iterate Operator,很是簡單直觀,以下圖所示:
flink-iterations-iterate-operator-example
上面迭代過程當中,輸入數據爲1到5的數字,Step函數就是一個簡單的map函數,會對每一個輸入的數字進行加1處理,而Next Partial Solution對應於通過map函數處理後的結果,好比第一輪迭代,對輸入的數字1加1後結果爲2,對輸入的數字2加1後結果爲3,直到對輸入數字5加1後結果爲變爲6,這些新生成結果數字2~6會做爲第二輪迭代的輸入。迭代終止條件爲進行10輪迭代,則最終的結果爲11~15。

  • Delta Iterate

Delta Iterate Operator實現了增量迭代,它的實現原理以下圖所示:
flink-iterations-delta-iterate-operator
基於Delta Iterate Operator實現增量迭代,它有2個輸入,其中一個是初始Workset,表示輸入待處理的增量Stream數據,另外一個是初始Solution Set,它是通過Stream方向上Operator處理過的結果。第一輪迭代會將Step函數做用在初始Workset上,獲得的計算結果Workset做爲下一輪迭代的輸入,同時還要增量更新初始Solution Set。若是反覆迭代知道知足迭代終止條件,最後會根據Solution Set的結果,輸出最終迭代結果。
好比,咱們如今已知一個Solution集合中保存的是,已有的商品分類大類中購買量最多的商品,而Workset輸入的是來自線上實時交易中最新達成購買的商品的人數,通過計算會生成新的商品分類大類中商品購買量最多的結果,若是某些大類中商品購買量忽然增加,它須要更新Solution Set中的結果(原來購買量最多的商品,通過增量迭代計算,可能已經不是最多),最後會輸出最終商品分類大類中購買量最多的商品結果集合。更詳細的例子,能夠參考官網給出的「Propagate Minimum in Graph」,這裏再也不累述。

Backpressure監控

Backpressure在流式計算系統中會比較受到關注,由於在一個Stream上進行處理的多個Operator之間,它們處理速度和方式可能很是不一樣,因此就存在上游Operator若是處理速度過快,下游Operator處可能機會堆積Stream記錄,嚴重會形成處理延遲或下游Operator負載太重而崩潰(有些系統可能會丟失數據)。所以,對下游Operator處理速度跟不上的狀況,若是下游Operator可以將本身處理狀態傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,好比經過告警的方式通知現有流處理系統存在的問題。
Flink Web界面上提供了對運行Job的Backpressure行爲的監控,它經過使用Sampling線程對正在運行的Task進行堆棧跟蹤採樣來實現,具體實現方式以下圖所示:
flink-back-pressure-sampling
JobManager會反覆調用一個Job的Task運行所在線程的Thread.getStackTrace(),默認狀況下,JobManager會每間隔50ms觸發對一個Job的每一個Task依次進行100次堆棧跟蹤調用,根據調用調用結果來肯定Backpressure,Flink是經過計算獲得一個比值(Radio)來肯定當前運行Job的Backpressure狀態。在Web界面上能夠看到這個Radio值,它表示在一個內部方法調用中阻塞(Stuck)的堆棧跟蹤次數,例如,radio=0.01,表示100次中僅有1次方法調用阻塞。Flink目前定義了以下Backpressure狀態:

  • OK: 0 <= Ratio <= 0.10
  • LOW: 0.10 < Ratio <= 0.5
  • HIGH: 0.5 < Ratio <= 1

另外,Flink還提供了3個參數來配置Backpressure監控行爲:

參數名稱 默認值 說明
jobmanager.web.backpressure.refresh-interval 60000 默認1分鐘,表示採樣統計結果刷新時間間隔
jobmanager.web.backpressure.num-samples 100 評估Backpressure狀態,所使用的堆棧跟蹤調用次數
jobmanager.web.backpressure.delay-between-samples 50 默認50毫秒,表示對一個Job的每一個Task依次調用的時間間隔

經過上面個定義的Backpressure狀態,以及調整相應的參數,能夠肯定當前運行的Job的狀態是否正常,而且保證不影響JobManager提供服務。

參考連接

相關文章
相關標籤/搜索