原創不易,轉載請註明出處。對大數據和AI感興趣的朋友能夠加個人微信 aistevelu,相互交流學習。程序員
要想熟練掌握一個大數據框架,僅僅是學習一些網絡上的樣例程序是遠遠不夠的,咱們必須系統地瞭解它背後的設計和運行原理。算法
本文將以WordCount的案例爲主線,主要介紹Flink的設計和運行原理。後續還將不斷更新相關文章,深度解讀Flink技術。數據庫
本文內容主要包括:編程
Flink的數據流圖,以及如何將數據流圖從邏輯視角轉化爲物理執行圖;後端
Flink分佈式架構;緩存
Flink時間處理機制;微信
Flink狀態與檢查點機制;網絡
閱讀完本章後,讀者能夠對Flink的設計和運行原理有一個全面的認識。session
咱們開始對數據流作處理,計算數據流中單詞出現的頻次。若是輸入數據流是「Hello Flink Hello World「,這個程序將統計出Hello的頻次爲2,Flink和World的頻次爲1。在大數據領域,詞頻統計(WordCount)程序就像是一個編程語言的HelloWorld程序,它展現了一個大數據引擎的基本規範。麻雀雖小,五臟俱全,從這個樣例中,咱們能夠一窺Flink設計和運行原理。數據結構
圖 1 Flink樣例程序示意圖如圖 1所示,程序分爲三大部分,第一部分讀取數據源(Source),第二部分對數據作轉換操做(Transformation),最後將轉換結果輸出到一個目的地(Sink)。 代碼中的函數被稱爲算子(Operator),是Flink提供給程序員的接口,程序員須要經過這些算子對數據作操做。
咱們能夠把算子理解爲1 + 2 運算中的加號,加號(+)是這個算子的一個符號表示,它表示對數字1和數字2作加法運算。一樣,在Flink或Spark這樣的大數據引擎中,算子對數據進行某種操做,程序員能夠根據本身的需求調用合適的算子,完成所需計算任務。經常使用的算子有map、flatMap、keyBy、timeWindow等,它們分別對數據流執行不一樣類型的操做。
圖 2 WordCont程序的邏輯視圖圖 2展現了WordCount程序中數據從不一樣算子間流動的狀況。圖中,圓圈表明算子,圓圈間的箭頭表明數據流,數據流在Flink程序中通過不一樣算子的計算,最終生成爲目標數據。按照算子對數據的操做內容,通常將算子分爲Source算子、Transformation算子和Sink算子。Source算子讀取數據源中的數據,數據源能夠是數據流、也能夠存儲在文件系統中的文件。Transformation算子對數據進行必要的計算處理。Sink算子將處理結果輸出,數據通常被輸出到數據庫、文件系統或下一個數據流程序。
咱們先對這個樣例程序中各個算子作一個簡單的介紹,關於這些算子的具體使用方式將在後續文章中詳細說明。
map函數對數據流中每一條數據作一個操做,生成一條新的數據。本例中map(word => (word, 1))
表示取輸入的每一個單詞,用變量word表示,而後生成一個二元對(word, 1),1是表示出現了一次。注意,map的一條輸入數據對應一條輸出數據。
在解釋flatMap前,咱們先對split
函數作一個簡單介紹。split(「\\s」)
函數以空白字符爲分隔符,將文本切分紅單詞列表。若是輸入爲「Hello Flink「,那麼通過這個函數切分後,獲得結果爲[「Hello」,」Flink」]組成的單詞列表。
本例中flatMap(line => line.split(「\\s」))
表示取出輸入的每一行文本,用變量line表示,將文本中以空格作切分,生成一個單詞列表,到這裏仍然列表,flatMap接着對列表打平,輸出單個單詞。flatMap
先作map
所作的操做,而後對輸出的各個列表打平,所以,flatMap
的一條輸入數據可能有多條輸出。
keyBy
根據某個Key作數據重分佈,將全部數據中包含該Key的數據都發送到同一個分區上。本例中是將二元組中第一項做爲Key,即以單詞爲Key,包含一樣單詞的二元對都發送到同一分區上。
timeWindow
是時間窗口函數,以界定對多長時間以內的數據作統計。
sum
爲求和函數。sum(1)
表示對二元組中第二個元素求和,由於通過前面的keyBy
,全部單詞都被髮送到了同一個分區,所以,在這一個分區上,將單詞出現次數作加和,就獲得出現的總次數。
對於詞頻統計這個案例,邏輯上來說無非是對數據流中的單詞作提取,而後使用一個Key-Value結構對單詞作詞頻計數,最後輸出結果便可,這樣的邏輯本能夠用幾行代碼完成,改爲這樣的算子形式,反而讓新人看着一頭霧水,爲何必定要用算子的形式來寫程序呢?實際上,算子進化成當前這個形態,就像人類從石塊計數,到手指計數,到算盤計數,再到計算器計數這樣的進化過程同樣,儘管更低級的方式能夠完成必定的計算任務,可是隨着計算規模的增加,古老的計數方式存在着低效的弊端,沒法完成更高級別和更大規模的計算需求。試想,若是咱們不使用大數據引擎提供的算子,而是本身實現一套上述的計算邏輯,儘管咱們能夠快速完成當前的詞頻統計的任務,可是當面臨一個新計算任務時,咱們須要再次從新編寫程序,完成一整套計算任務。咱們本身編寫代碼的橫向擴展性可能很低,當輸入數據暴增時,咱們須要作很大改動,以部署在更多機器上。
大數據引擎的算子對計算作了一些抽象,對於新人來講有必定學習成本,而一旦掌握這門技術,人們所能處理的數據規模將成倍增長。大數據引擎的算子出現,正是針對數據分佈在多個分區的大數據場景須要一種統一的計算描述語言來對數據作計算而進化出的新計算形態。基於大數據引擎的算子,咱們能夠定義一個數據流的邏輯視圖,以此完成對大數據的計算。剩下那些數據交換、橫向擴展、故障恢復等問題全交由大數據引擎來解決。
在絕大多數的大數據處理場景下,一臺機器節點沒法處理全部數據,數據被切分到多臺節點上。在大數據領域,當數據量大到超過單臺機器處理能力時,就將一份數據切分到多個分區(Partition)上,每一個分區分佈在一臺虛擬機或物理機上。
前一小節已經提到,大數據引擎的算子提供了編程接口,使用算子咱們能夠構建數據流的邏輯視圖。考慮到數據分佈在多個節點的狀況,邏輯視圖只是一種抽象,須要將邏輯視圖轉化爲物理執行圖,才能在分佈式環境下執行。
圖 3 樣例程序物理執行示意圖圖 3爲1.1中的樣例程序的物理執行圖,這裏數據流分佈在2個分區上。箭頭部分表示數據流分區,圓圈部分表示算子在分區上的算子子任務(Operator Subtask)。從邏輯視圖變爲物理執行圖後,map算子在每一個分區都有一個算子子任務,以處理該分區上的數據:map[1/2]算子子任務處理第一個數據流分區上的數據,map[2/2]算子子任務處理第二個數據流分區上的數據。keyBy算子會將數據按照某個key作數據重分佈,在詞頻統計的例子中是以單詞爲key,例如,輸入數據爲「Hello Flink Hello World」,keyBy算子會將全部的」Hello」歸結到一個分區上。
算子子任務是物理執行的基本單元,算子子任務之間是相互獨立的,某個算子子任務有本身的線程,不一樣算子子任務可能分佈在不一樣的節點上。後文在Flink的資源分配部分咱們還會重點介紹算子子任務。
從圖 3中能夠看到,除去Sink外的算子都被分紅了2個算子子任務,這樣配置的並行度(Parallelism)爲2,Sink算子的並行度爲1。並行度是能夠被設置的,實際應用中通常根據數據量的大小,計算資源的多少等多方面的因素來設置並行度。
圖 3中keyBy算子子任務將數據作了從新分配,即數據在不一樣分區上進行着數據交換,產生了數據流動的現象。不管是Hadoop、Spark仍是Flink,當涉及數據分佈在多個分區時,對數據的處理都會涉及到數據交換策略。在Flink中,數據交換策略包括圖 4中涉及到的四種策略:
圖 4 Flink數據交換策略前向傳播(Forward):前一個算子子任務將數據直接傳遞給後一個算子子任務,數據不存在跨分區的交換,也避免了因數據交換產生的各種開銷,圖 3中Source和和flatMap之間就是這樣的情形。
全局廣播(Broadcast):將某份數據發送到全部分區上,這種策略涉及到了數據拷貝和網絡通訊,所以很是消耗資源。
基於Key的數據重分佈:數據以(Key, Value)形式存在,該策略將全部數據作一次從新分佈,並保證相同Key的數據被髮送到同一個分區上。圖 3中keyBy算子將單詞做爲Key,把某個單詞都發送到同一分區,以方便後續算子來統計這個單詞出現的頻次。
隨機策略(Random):該策略將全部數據隨機均勻地發送到多個分區上,以保證數據平均分配到不一樣分區上。該策略一般爲了防止數據傾斜到某些分區,致使部分分區數據稀疏,部分分區數據擁堵,甚至超過該分區上算子的處理能力。
爲了實現支持分佈式運行,Flink跟其餘大數據引擎同樣,採用了主從(Master-Worker)架構,運行時主要包括兩個組件:
• JobManager,又被稱爲Master,是一個Flink應用的主節點。
• TaskManager,又被稱爲Worker,執行計算任務的節點。
一個Flink應用通常含有至少一個JobManager,一個或多個TaskManager。
用戶編寫Flink程序並提交任務的具體流程爲:
用戶編寫應用程序代碼,並經過Flink客戶端(Client)提交做業。程序通常爲Java或Scala語言,調用Flink API,構建基於邏輯視角的數據流圖,代碼和相關配置文件被編譯打包,並被提交到JobManager上,造成一個應用做業(Application)。
JobManager接受到做業後,將邏輯視圖轉化成可並行的物理執行圖。
JobManager將物理執行圖發送給各TaskManager。
TaskManager接收到物理執行圖後,會初始化並開始執行被分配的任務。
TaskManager在執行任務過程當中可能會與其餘TaskManager交換數據,會使用圖 4提到的一些數據交換策略。
TaskManager將任務啓動、運行、性能指標、結束或終止等狀態信息會反饋給JobManager。
用戶可使用Flink Web儀表盤來監控提交的做業。
圖 6對Flink的各個組件描述得更爲詳細,咱們再對涉及到的各個組件進行更爲詳細的介紹。
當用戶提交一個Flink程序時,會首先建立一個客戶端(Client)。該Client會對用戶提交的Flink程序進行預處理,並把做業提交到Flink集羣中處理。Client須要從用戶提交的Flink程序配置中獲取JobManager的地址,並創建到JobManager的鏈接,將Flink做業提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph。
JobManager是Flink的協調者,它負責接收Flink做業,調度任務。同時,JobManager還負責管理TaskManager,收集做業的狀態信息,生成檢查點和故障恢復等問題。JobManager會將Client提交的JobGraph轉化爲ExceutionGraph,ExecutionGraph是JobGraph的並行版本,但還不是最終的物理執行圖。
TaskManager是實際負責執行計算的節點,在其上執行物理執行圖。同時,TaskManager還要處理必要的數據緩存和交換。每一個TaskManager負責管理其所在節點上的資源信息,包括內存、磁盤、網絡,TaskManager啓動的時候會將資源的狀態向JobManager彙報。
瞭解了Flink的分佈式架構和核心組件,這裏咱們從更細粒度上來分析1.2介紹的從邏輯視圖轉化爲物理執行圖過程,該過程能夠分紅四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph:是根據用戶經過 DataStream API 編寫的代碼生成的最初的圖,用來表示程序的拓撲結構。在這張圖中,節點就是用戶調用的算子,邊表示數據流。
JobGraph:JobGraph是提交給 JobManager 的數據結構。StreamGraph通過優化後生成了 JobGraph,主要的優化爲,將多個符合條件的節點連接在一塊兒做爲一個節點,這樣能夠減小數據交換所須要的序列化、反序列化以及傳輸消耗。這個連接的過程叫作算子鏈,會在下一節簡單介紹。
ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對做業進行調度後,在各個TaskManager 上部署任務造成的圖,物理執行並非一個具體的數據結構。
能夠看到,Flink在數據流圖上可謂煞費苦心,僅各種圖就有四種之多。對於新人來講,能夠不用太關心這些很是細節的底層實現,只須要了解如下幾個核心概念:
Flink採用主從架構,JobManager起着管理協調做用,TaskManager負責物理執行,在執行過程當中會發生一些數據交換、生命週期管理等事情。
用戶調用Flink API,構造邏輯視圖,Flink會對邏輯視圖優化,並轉化爲物理執行圖,最後被執行的是物理執行圖。
在分佈式運行的過程當中,Flink將一些算子子任務(Subtask)連接在一塊兒,組成算子鏈(Operator Chain),連接後將以任務(Task)的形式被TaskManager調度執行。使用算子鏈是一個很是有效的優化,它能夠有效下降算子子任務之間的傳輸開銷。連接以後造成的Task是TaskManager中的一個線程。
圖 7 任務、子任務與算子鏈例如,數據從Source算子前向傳播到 flatMap算子,再由flatMap算子前向傳播到map算子,中間沒有發生跨分區的數據交換,所以,咱們徹底能夠將Source、flatMap和map幾個Operator Subtask組合在一塊兒,造成一個Task。keyBy算子發生了數據重分佈,數據會跨越分區,所以map和keyBy沒法被連接到一塊兒。一樣,咱們也不能把sum和Sink連接到一塊兒。
默認狀況下,Flink會盡可能將更多的Subtask連接在一塊兒,但一個Subtask有超過一個輸入或發生數據交換時,連接就沒法創建。儘管將算子連接到一塊兒會下降一些傳輸開銷,可是也有一些狀況並不須要太多連接。好比,有時候咱們須要將一個很是長的算子鏈拆開,這樣咱們就能夠將原來集中在一個線程中的計算拆分到多個線程中來並行計算。Flink手動配置是否對某些算子啓用算子鏈。
根據前文的介紹,咱們已經瞭解到TaskManager負責具體的任務執行。TaskManager是一個Java虛擬機進程,在TaskManager中能夠並行運行多個Task。在程序執行以前,通過優化,部分Subtask被連接在一塊兒,組成一個Task。每一個Task是一個線程,須要TaskManager爲其分配相應的資源,TaskManager使用任務槽位(Task Slot)給任務分配資源,簡稱槽位(Slot)。
在解釋任務槽位的概念前,咱們先回顧一下進程與線程的概念。在操做系統層面,進程(Process)是進行資源分配和調度的一個獨立單位,線程(Thread)是是CPU調度的基本單位。好比,咱們經常使用的Office Word軟件,在啓動後就佔用操做系統的一個進程。Windows上可使用任務管理器來查看當前活躍進程,Linux上可使用top命令來查看。線程是進程的一個子集,一個線程通常專一於處理一些特定任務,不獨立擁有系統資源,只擁有一些運行中必要的資源,如程序計數器。一個進程至少有一個線程,也能夠有多個線程。多線程場景下,每一個線程都處理一小個任務,多個線程以高併發的方式同時處理多個小任務,能夠提升處理能力。
回到Flink的槽位分配機制上,一個TaskManager是一個進程,TaskManager能夠管理一至多個Task,每一個Task是一個線程,佔用一個槽位。
圖 8 Task Slot與Task Manager假設咱們給WordCount程序分配兩個TaskManager,每一個TaskManager又分配3個槽位,因此總共是6個槽位。結合圖 7中對這個做業的並行度設置,整個做業被劃分爲5個Task,使用5個線程,這5個線程能夠按照圖 8所示的方式分配到6個槽位中。
每一個槽位的資源是整個TaskManager資源的子集,好比這裏的TaskManager下有3個槽位,每一個槽位佔用TaskManager所管理的1/3的內存,在第一個槽位內運行的任務不會與在第二個槽位內運行的任務互相爭搶內存資源。注意,在分配資源時,Flink並無將CPU資源明確分配給各個槽位。
Flink容許用戶設置TaskManager中槽位的數目,這樣用戶就能夠肯定以怎樣的粒度將任務作相互隔離。若是每一個TaskManager只包含一個槽位,那麼運行在該槽位內的任務將獨享JVM。若是TaskManager包含多個槽位,那麼多個槽位內的任務能夠共享JVM資源,好比共享TCP鏈接、心跳信息、部分數據結構等。如無特殊須要,能夠將槽位數目設置爲TaskManager下可用的CPU核心數,那麼平均下來,每一個槽位都能得到至少一個CPU核心。
圖 8中展現了任務的一種資源分配方式,默認狀況下, Flink還提供了一種槽位共享(Slot Sharing)的優化機制,進一步優化數據傳輸開銷,充分利用計算資源。將圖 8中的任務作槽位共享優化後,結果如圖 9所示。
圖 9 槽位共享示意圖開啓槽位共享後,Flink容許將獨佔一個槽位的任務與同一個做業中的其餘任務共享槽位。因而能夠將一個做業從開頭到結尾的全部Subtask都放置在一個槽位中,如圖 9中最左側的數據流,這樣槽位內的數據交換成本更低。並且,對於一個數據流圖來講,Source、map等算子的計算量相對不大,window算子的計算量比較大,計算量較大的Subtask與計算量較小的Subtask相互互補,能夠騰出更多的槽位,分配給更多Task,這樣能夠更好地利用資源。而不開啓槽位共享,計算量小的Source、map算子子任務獨佔了槽位,形成必定的資源浪費。
圖 3中提到了並行度,在WordCount的例子中,除去Sink算子的並行度爲1外,其餘算子的並行度均爲2,也就是說在並行度爲2的狀況下,每一個算子只能拆分爲2個Subtask。圖 8中的方式共佔用5個槽位,支持槽位共享後,圖 9只佔用2個槽位,這裏故意將剩下的幾個槽位置空,只是爲了演示須要,若是這個做業的數據量很是大,佔用的數據分區不少,其實徹底能夠經過增長並行度,將這些槽位填充,爲更多的並行任務提供資源。
圖 10 並行度與槽位數目爲了充分利用空槽位,佔滿圖 9中多餘的4個槽位,咱們能夠把除Sink外的其餘算子的並行度都設置爲6。圖 2‑10展現了將並行度增長後,資源分配狀況。
並行度和槽位數目的概念可能容易讓人混淆,這裏再次闡明一下。用戶使用Flink提供的API算子能夠構建一個邏輯視圖,須要將任務並行才能被物理執行。整個做業將被切分爲多個實例,每一個實例處理整個做業輸入數據的一部分。若是輸入數據過大,增大並行度能夠增長更多的實例,加快數據處理速度。可見,並行度是Flink對任務並行切分的一種描述。槽位數目是在資源設置時,對單個TaskManager的資源切分粒度。並行度、槽位數目和TaskManager數可大體按照公式 2‑1來計算。
公式 1 並行度、TaskManager數與Task Slot數關係其中,ceil爲上限函數,表示對除法結果向上取整。關於並行度、槽位數目等配置,將在後續文章中詳細說明。
咱們以前討論的WordCount例子中,一直使用的是Flink提供的DataStream API,即在數據流上的操做。除了DataStream API,Flink給編程人員不一樣層次API,主要有三層:
Flink最底層提供的是有狀態的流式計算引擎,流(Stream)、狀態(State)和時間(Time)等流式計算概念都在這一層獲得了實現。
通常狀況下,應用程序不會使用上述底層接口,而是使用Flink提供的核心API:針對有界和無界數據流的DataStream API和針對有界數據集的DataSet API。用戶可使用這兩個API進行經常使用的數據處理:轉換(Transformation)、鏈接(Join)、聚合(Aggregation)、窗口(Window)以及對狀態(State)的操做。這一層有點像Spark提供的RDD級別的接口。
Table API和SQL是更高級別的抽象。在這一層,數據被轉換成了關係型數據庫式的表格,每一個表格擁有一個表模式(Schema),用戶能夠像操做表格那樣操做流式數據,例如可使用針對結構化數據的select、join、group-by等操做。若是用戶熟悉SQL語句,那麼能夠很快上手Flink的Table API和SQL。不少公司的數據流很是依賴SQL,Flink SQL下降了從其餘框架遷移至Flink的成本。
咱們將在後續文章中介紹DataStream API、Table API和SQL。
瞭解Flink的主從架構以及API結構後,咱們能夠將Flink的核心組件分層來剖析。
圖 12 Flink組件棧大數據引擎首先須要部署在物理機或虛擬機上。Flink支持多種部署方式,能夠部署在單機、集羣,以及雲上。
運行時(Runtime)層爲Flink各種計算提供了實現。這一層作了前面章節中提到的將數據流圖轉化爲物理執行圖、資源分配以及分佈式調度與執行等大部分工做。
API層主要實現了面向數據流的流處理DataStream API和麪向數據集的批處理DataSet API。在這兩個API之上,Flink還提供了更豐富的工具:
面向數據流處理的:CEP(Complex Event Process,復瑣事件處理)、基於類SQL的Table API和SQL
面向數據集批處理的:FlinkML(機器學習計算庫)、Gelly(圖計算庫)
在批處理場景下,數據已是按照某個時間維度分批次地存儲了。一些公司常常將用戶行爲日誌按天存儲在一個文件目錄下,另一些開放數據集都會說明數據採集的時間始末。所以,對於批處理任務,處理一個數據集,其實就是對該數據集對應的時間窗口內的數據進行處理。在流計算場景下,數據以源源不斷的流的形式存在,數據一直在產生,沒有始末。咱們要對數據進行處理時,每每須要明確一個時間窗口,好比,數據在「每秒」、「每小時」、「天天」的維度下的一些特性。通常有以下幾種定義時間窗口的方式。
滾動窗口(Tumbling Window)模式下窗口之間互不重疊,且窗口長度是固定的,長度能夠是數據的條數,也能夠是時間間隔。圖 13是固定長度爲4的滾動窗口,圖 14是固定長度爲10分鐘的滾動窗口。定長滾動窗口是常常用到的一種窗口模式。在本文最開始的WordCount例子中,咱們使用的是定長爲5秒的滾動窗口。
滑動窗口(Sliding Window)也是一種窗口長度定長的模式。與滾動窗口不一樣,滑動窗口模式下窗口和窗口之間有滑動間隔(Slide)。再以WordCount爲例,咱們要統計10分鐘內的詞頻,而且每隔1分鐘統計一次,就須要使用滑動窗口。
會話(Session)是一個用戶交互概念,經常出如今互聯網應用上,通常指用戶在某APP或某網站上短時間內產生的一系列行爲。好比,用戶在手機淘寶上短期有大量的搜索和點擊的行爲,這系列行爲事件組成了一個Session,接着可能由於一些其餘因素,用戶暫停了與APP的交互,過一會用戶又返回APP,通過一系列搜索、點擊、與客服溝通,最終下單。Session窗口的長度並不固定,所以不能用上面兩種形式的窗口來建模。
圖 16 會話窗口Session沒有固定長度,那如何將數據劃分到不一樣的窗口呢?Flink提供了Session Gap的概念。
咱們繼續以用戶在手機淘寶上的行爲爲例,如今有3個用戶,每一個用戶產生了不一樣的行爲,果兩個行爲數據的時間戳小於session gap,則被劃歸到同一個窗口中,圖 17中user2的window4,如兩個行爲數據的時間戳大於了session gap,則被劃歸到兩個不一樣的窗口中,user2的window1和window2之間的時間間隔大於最小的session gap,數據被劃歸爲了兩個窗口。
咱們將在後續文章詳細介紹以上幾種窗口的使用方法。
若是咱們要定義基於時間的窗口,那麼首先要定義時間。在程序中,時間通常基於Unix時間戳,即以1970-01-01-00:00:00.000爲起始點。時間戳毫秒精度是時間距離該起點的毫秒總數,時間戳微秒精度是事件距離該起點的微秒總數。
圖 18 三種時間語義以前文章中咱們提到了流處理的時間語義問題,在Flink中通常有三種時間概念,如圖 18所示。
事件時間(Event Time)是事件實際發生的時間,一般是事件發生時嵌入到事件上的時間,好比某個傳感器在生成數據時,會將時間戳打入這個數據上。
接收時間(Ingestion Time)是事件進入Flink的時間,確切的說,是該事件進入Source算子時,Source算子的當前時間。
處理時間(Processing Time)是各個時間算子處理該事件的當前時間。通常狀況下,處理時間會比攝入時間更晚一些。
Processing Time是最簡單的時間概念,只須要算子獲取當前運行機器的系統時間,不須要考慮其餘任何因素,所以使用Processing Time做爲時間,能夠得到最好的性能和最低的延遲。但Processing Time並不能表明事件實際發生的時間,從事件實際發生到算子處理的過程有大量的不肯定性,以Processing Time來計算,極可能致使事件的處理是亂序的,產生不可復現的結果。
Event Time能夠保證事件順序的可靠性,所以能夠獲得一致的、可復現的結果。Event Time雖然準確,但也有其弊端:咱們沒法預知某個時間下,是否全部數據均已到達,所以須要使用水位線機制處理延遲數據。
以前文章已經提到,水位線(Watermark)機制假設在某個時間點上,不會有比這個時間點更晚的上報數據。Watermark常被做爲一個時間窗口的結束時間。
圖 19 一個帶有Watermark的數據流Flink中的Watermark是被系統插入到數據流的特殊數據。Watermark的時間戳單調遞增,且與事件時間戳相關。如上圖的數據流所示,方塊是事件,三角形是該事件對應的時間戳,圓圈爲Watermark。當Flink接受到時間戳值爲5的Watermark時,系統假設時間戳小於5的事件均已到達,後續到達的小於5的事件均爲延遲數據。Flink處理到最新的Watermark,會開啓這個時間窗口的計算,把這個Watermark以前的數據歸入進這次計算,延遲數據則不能被歸入進來,所以使用Watermark會致使微小的偏差。
流數據中的事件時間戳與Watermark高度相關,事件時間戳的抽取和Watermark的生成也基本是同時進行的,抽取的過程會遇到下面兩種狀況:
數據流中已經包含了事件時間戳和Watermark。
使用抽取算子生成事件時間戳和Watermark,這也是實際應用中更爲常見的場景。由於後續的計算都依賴時間,Watermark抽取算子最好在數據接入後立刻調用。具體而言,Watermark抽取算子包含兩個函數:第一個函數從數據流的事件中抽取時間戳,並將時間戳賦值到事件的元數據上,第二個函數生成Watermark。
Flink有兩種方式來生成Watermark:
週期性(Periodic)生成Watermark:Flink每隔必定時間間隔,按期調用Watermark生成函數。這種方式下,Watermark的生成與時間有周期性的關係。
斷點式(Punctuated)生成Watermark:數據流中某些帶有特殊標記的數據自帶了Watermark信息,Flink監控數據流中的每一個事件,當接收到帶有特殊標記數據時,會觸發Watermark的生成。這種方式下,Watermark的生成與時間無關,與什麼時候接收到特殊標記數據有關。
不管是以上那種方式,Flink都會生成Watermark並插入到數據流中。一旦時間戳和Watermark生成後,後續的算子將以Event Time的時間語義來處理這個數據流。Flink把時間處理部分的代碼都作了封裝,會在內部處理各種時間問題,用戶不須要擔憂延遲數據等任什麼時候間相關問題。用戶只須要在數據接入的一開始生成時間戳和Watermark,Flink會負責剩下的事情。
Flink有一些機制專門收集和處理延遲數據。遲到事件在Watermark以後到達,通常處理的方式有三種:
將遲到事件做爲錯誤事件直接丟棄
將遲到事件收集起來另外再處理
從新觸發計算
對於第二種方式,用戶可使用Flink提供的Side Output
機制,將遲到事件放入一個單獨的數據流,以便再對其單獨處理。
對於第三種方式,用戶可使用Flink提供的Allowed Lateness
機制,設置一個容許的最大遲到時長,原定的時間窗口關閉後,Flink仍然會保存該窗口的狀態,直至超過遲到時長,遲到的事件加上原來的事件一塊兒從新被計算。
咱們將在後續文章中詳細介紹Event Time的使用、Watermark生成、延遲數據處理等技術細節。
在以前的文章中咱們已經提到了狀態的概念:流式大數據處理引擎會根據流入數據持續更新狀態數據。狀態能夠是當前所處理事件的位置偏移(Offset)、一個時間窗口內的某種輸入數據、或與具體做業有關的自定義變量。
對於WordCount的例子來講,已經處理了一個」Hello」單詞,而且正在處理一個」Hello」,對於Source算子來講,當前數據的位置偏移爲3,全部已處理的數據中,單詞」Hello」的出現次數爲2。這個做業的狀態包括當前處理的位置偏移、已處理過的單詞出現次數等變量信息。
在一個有狀態的流處理做業中,爲保證高吞吐和低延遲,Flink的每一個Task須要高效讀寫狀態數據,Task會在本地的TaskManager中存儲狀態數據。然而,因爲大數據系統通常運行在多臺機器上,可能會遇到進程被殺、機器宕機、網絡抖動等問題,一旦出現宕機等問題,該機器上的狀態以及相應的計算會丟失,所以須要一種恢復機制來應對這些潛在問題。
Flink使用一致性檢查點(Consistent Checkpoint)技術來作故障恢復。檢查點機制通常是按期將狀態數據生成快照(Snapshot),持久化存儲起來,一旦發生意外,Flink主動重啓應用,並從最近的快照中恢復,再繼續處理新流入數據。一致性檢查點技術能夠將分佈在多臺節點上的全部狀態都記錄下來,並提供了Exactly-Once的投遞保障,其背後是使用了Chandy-Lamport算法,將本地的狀態數據保存到一個存儲空間上,故障發生後及時恢復最近的快照數據。咱們將在後續文章中詳細介紹一致性檢查點的算法原理。
Task在本地內存中保存一份狀態數據,但在分佈式系統中,某個Task在任意時間點均可能發生故障,所以Task上的本地狀態數據能夠被認爲是脆弱的。Flink按期將本地的狀態數據持久化保存到一個存儲空間上。用戶能夠選擇以怎樣的方式來保存這些狀態數據,這種機制被稱爲狀態後端(State Backend)。Flink提供了三種狀態後端:內存、文件系統和RocksDB。
內存確定是讀寫性能最優的方式,單個節點的內存有限,所以這種狀態後端會對狀態數據的大小有限制。相比內存,本地磁盤的速度更慢,其所能承擔的數據量更大,RocksDB 就是一種基於本地磁盤的狀態後端。此外,Flink還容許將數據存儲到分佈式文件系統,如Hadoop的HDFS和AWS的S3上,分佈式文件系統的數據存儲能力很是大,足以應付海量數據的存儲需求。咱們將在後續文章中詳細介紹三種狀態後端的使用方法。
在容錯上,除了Checkpoint,Flink還提供了Savepoint機制。從名稱和實現上,這兩個機制都極其類似,甚至Savepoint會使用Checkpoint的數據,但實際上,這兩個機制的定位不一樣。
Checkpoint是Flink按期觸發並自動執行的故障恢復機制,以應對各類意外狀況,其設計初衷主要是針對容錯和故障恢復。Savepoint會使用Checkpoint生成的快照數據,但與Checkpoint不一樣點在於,Savepoint須要編程人員手動介入,用來恢復暫停做業。相比而言,Checkpoint是自動執行,Savepoint是手動管理。
當咱們想要手動處理以前已經處理過的數據,就可使用Savepoint,所以Savepoint常常被用來調試程序:
咱們能夠給同一份做業設置不一樣的並行度,來找到最佳的並行度設置
咱們想測試一個新功能或修復一個已知的bug,並用新的程序邏輯處理原來的數據
進行一些A/B實驗,使用相同的數據源測試程序的不一樣版本
由於狀態能夠被持久化存儲到分佈式文件系統上,咱們甚至能夠將一樣一份應用程序從一個集羣遷移到另外一個集羣,只需保證不一樣的集羣均可以訪問這個文件系統
Checkpoint 和 Savepoint 是Flink提供的兩個類似的功能,它們知足了不一樣的需求,以確保一致性、容錯性,知足了做業升級、BUG 修復、遷移、A/B測試等不一樣需求。