參考書籍web
Stream Processing with Apache Flinkhttps://www.oreilly.com/library/view/stream-processing-with/9781491974285/算法
《基於Apache Flink的流處理》https://book.douban.com/subject/34912177/數據庫
注:本文主要是針對《基於Apache Flink的流處理》的筆記apache
1-8章筆記下載地址後端
在這一章中,咱們對Flink的架構進行了一個高層次的介紹,並描述了Flink如何解決咱們以前討論過的流處理相關問題。特別地,咱們重點解釋Flink的分佈式架構,展現它在流處理應用中是如何處理時間和狀態的,並討論了它的容錯機制。緩存
Flink是一個用於狀態化並行數據流處理的分佈式系統。Flink設置由多個進程組成,這些進程一般分佈在多臺機器上運行。網絡
分佈式系統須要解決的常見挑戰是數據結構
Flink自己並無實現全部這些功能。它只關注於其核心功能——分佈式數據流處理,可是利用了不少現有的開源中間件和框架來實現其餘非核心部分。多線程
Flink的搭建由四個不一樣的組件組成,它們一塊兒工做來執行流應用程序。這些組件是JobManager、ResourceManager、TaskManager和Dispatcher。因爲Flink是用Java和Scala實現的,因此全部組件都運行在Java虛擬機(jvm)上。各組成部分的職責將在下面四個子小節分別介紹。架構
應用管理
JobManager是控制 單個應用程序執行的主進程,每一個應用程序由一個的JobManager控制。(一對一關係)
資源管理
Flink爲 不一樣的環境和資源提供者(如YARN、Mesos、Kubernetes和獨立部署)提供了多個資源管理器。
工做進程,執行任務的
TaskManager是Flink的工做進程(worker process,工人)
與用戶直接對話
Dispatcher提供一個REST接口讓用戶提交要執行的應用。
Flink應用程序能夠以兩種不一樣的模式來部署。
在這種模式下,Flink應用程序被打包到一個JAR文件中,並由客戶端提交給一個正在運行的服務。該服務能夠是Flink Dispatcher、Flink JobManager或YARN的ResourceManager。
在這種模式下,Flink應用程序被綁定在一個應用程序特定的容器鏡像中,好比Docker鏡像。
第一種模式比較傳統,第二種模式經常使用於微服務中。
TaskManager能夠同時執行多個任務。
這些任務能夠
TaskManager提供固定數量的處理槽來控制它可以併發執行的任務的數量。一個處理槽能夠執行應用程序的某個算子的一個並行任務。下圖是一個TaskManager、處理槽、任務以及算子關係的例子。
左側是一個JobGraph(應用程序的非並行表示,邏輯圖)。
它由5個算子組成。
算子A和C是數據源,算子E是數據匯。
右側是一個ExecutionGraph(物理圖)
每一個TaskManager是一個JVM,而每一個Slot是JVM中的一個線程。TaskManager在同一個JVM進程中以多線程方式執行它的任務。線程比單獨的進程更輕量,通訊成本更低,但不會嚴格地將任務彼此隔離。所以,一個行爲不正常的任務能夠殺死整個TaskManager進程和運行在它上面的全部任務。
流式應用程序一般設計爲24x7運行。所以,即便內部進程失敗,也不能中止運行。
而要想從失敗中恢復
本小節主要學習如何從新啓動失敗的進程。
下面舉例說明TaskManager故障應該如何處理
假設咱們的應用程序要以最大並行度爲8來執行,那麼四個TaskManager(每一個TaskManager提供兩個插槽)能夠知足咱們對並行度的需求。
若是其中一個TaskManager發生故障,可用插槽的數量將減小到6個。
在這種狀況下,JobManager將請求ResourceManager提供更多處理槽。
若是請求失敗,JobManager會按照必定的時間間隔連續地重啓應用。直到重啓成功(有足夠多的空閒插槽就能重啓成功)。
比TaskManager失敗更具挑戰性的問題是JobManager失敗。
JobManager控制流應用程序的執行,並保存有關其執行的元數據,例如指向已完成檢查點的指針。
若是負責的JobManager進程失敗,流應用程序將沒法繼續處理。
這使得JobManager成爲Flink中的應用程序的一個單點失效組件(也就是若是這個組件失效,那麼整個系統失效)。
爲了克服這個問題,Flink支持一種高可用模式,該模式能夠在原始JobManager失效時將應用的管理權和應用的元數據 遷移到另外一個JobManager。
Flink的高可用模式 基於 ZooKeeper
它是一個分佈式系統,來提供分佈式協調和共識服務。
Flink使用ZooKeeper進行領袖選舉,並將其做爲一個高可用性和持久的數據存儲。
在高可用性模式下操做時,JobManager將JobGraph和全部必需的元數據(如應用程序的JAR文件)寫入遠程持久存儲系統。
此外,JobManager將一個指向存儲位置的指針 寫入ZooKeeper的數據存儲中。
在應用程序執行期間,JobManager接收各個任務檢查點的狀態句柄(存儲位置)。當檢查點完成後,JobManager將狀態寫入遠程存儲,並將指向此遠程存儲位置的指針寫入ZooKeeper。
所以,從JobManager故障中恢復所需的全部數據都存儲在遠程存儲中,而ZooKeeper持有指向存儲位置的指針。
圖3-3說明了這種設計。
當JobManager失敗時,接管它工做的新JobManager執行如下步驟:
最後還有一個問題,當TaskManager或者JobManager失效時,誰會觸發它們的重啓?
在運行過程當中,應用的任務不斷地交換數據。TaskManager 負責將數據從發送任務發送到接收任務。TaskManager的網絡組件在發送記錄以前在緩衝區中收集記錄,就是說,記錄不是一個一個發送的,而是先緩存到緩衝區中而後一批一批發送。這種技術是有效使用網絡資源和實現高吞吐量的基礎。
每一個TaskManager都有一個 網絡緩衝池(默認大小爲32 KB)用於發送和接收數據。
圖3-4顯示了這個架構。
當發送方任務和接收方任務在同一個TaskManager進程中運行時
經過網絡鏈接發送單條記錄很低效,而且形成很大的開銷。緩衝是充分利用網絡鏈接的帶寬的關鍵。在流處理上下文中,緩衝的一個缺點是增長了延遲,由於記錄是在緩衝區中收集的,而不是當即發送的。
Flink實現了一個基於信用值的流控制機制,其工做原理以下。
基於信用值的好處
Flink提供了一種被稱爲任務連接的優化技術,它能夠減小特定條件下本地通訊的開銷。
圖3-6描述瞭如何在任務連接模式下執行管道。
Flink在默認狀況下會開啓任務連接,可是也能夠經過配置關閉這個功能
正如上一節所述,事件時間語義會生成可重複且一致性的結果,這是許多流應用的剛性需求。下面,咱們將描述Flink如何在內部實現和處理事件時間戳和水位線,以支持具備事件時間語義的流應用。
Flink事件時間流應用處理的全部記錄都必須帶時間戳。時間戳將記錄與特定的時間點關聯起來,一般是記錄所表示的事件發生的時間點。此外,在現實環境中,時間戳亂序幾乎不可避免。
當Flink以事件時間模式處理數據流時,它會根據記錄的事件時間戳來觸發基於時間的算子操做。
水位線用於標註事件時間應用程序中每一個任務當前的事件時間。
在Flink中,水位線被實現爲一種帶時間戳的特殊記錄。如圖3-8所示,水位線像常規記錄同樣在數據流中移動。
水位線有兩個基本特性:
第二個屬性用於處理數據流中時間戳亂序的記錄,例如圖3-8中具備時間戳2和5的記錄。
水位線的一個意義是,它們容許應用控制結果的完整性和延遲。
在本節中,咱們將討論算子如何處理水位線。
當一個任務收到水位線時,會發生如下操做:
考慮到任務並行,咱們將詳細介紹一個任務如何將水位線發送到多個下游任務,以及它從多個上游任務獲取水位線以後如何推進事件時間時鐘前進。具體的方式以下
下圖舉了一個有4個輸入分區和3個輸出分區的任務在接受到水位線以後是如何更新它的分區水位線和事件時間時鐘的。
Flink的水位線傳播算法確保算子任務所發出帶時間戳的記錄和水位線必定會對齊。
對於具備兩個輸入流且水位線差距很大的算子,也會出現相似的效果。具備兩個輸入流的任務的事件時間時鐘將受制於較慢的流,一般較快的流的記錄或中間結果將處於緩衝狀態,直到事件時間時鐘容許處理它們。
下面介紹時間戳和水位線是如何產生的。
時間戳和水位線一般是在流應用接收數據流時 分配和生成的。Flink DataStream應用能夠經過三種方式完成該工做
大多數流應用是有狀態的。許多算子不斷讀取和更新某種狀態。無論是內置狀態仍是用戶自定義狀態,Flink的處理方式都是同樣的。
在本節中,咱們將討論
一般,須要任務去維護並用於計算結果的數據都屬於任務的狀態。圖3-10顯示了任務與其狀態之間的典型交互。
然而,高效可靠的狀態管理更具挑戰性。這包括處理很是大的狀態(可能超過內存),並確保在發生故障時不會丟失任何狀態。全部與狀態一致性、故障處理、高效存儲和訪問相關的問題都由Flink處理,以便開發人員可以將重點放在應用程序的邏輯上。
在Flink中,狀態老是與一個特定的算子相關聯。爲了讓Flink的運行時知道算子有哪些狀態,算子須要對其狀態進行註冊。根據做用域的不一樣,有兩種類型的狀態:算子狀態和鍵值分區狀態
算子狀態的做用域爲算子的單個任務。這意味着由同一並行任務以內的記錄均可以訪問同一狀態。算子狀態不能被其餘任務訪問。以下圖
Flink爲算子狀態提供了三類原語
鍵值分區狀態是根據算子輸入記錄中定義的鍵來維護和訪問的。Flink爲每一個鍵維護一個狀態實例,該狀態實例老是位於那個處理對應鍵值記錄的任務上。當任務處理一個記錄時,它自動將狀態訪問範圍限制到當前記錄的鍵。所以,具備相同鍵值分區的全部記錄都訪問相同的狀態。圖3-12顯示了任務如何與鍵值分區狀態交互。
鍵值分區狀態是一個在算子的全部並行任務上進行分區的分佈式鍵值映射。鍵值分區狀態原語以下
爲了確保快速的狀態訪問,每一個並行任務都在本地維護其狀態。至於狀態的具體存儲、訪問和維護,則由一個稱爲狀態後端的可拔插組件來完成。
狀態後端負責兩件事:
對於本地狀態管理,Flink提供兩種實現
狀態檢查點很重要,由於Flink是一個分佈式系統,狀態只能在本地維護。TaskManager進程可能在任什麼時候間點失敗。所以,它的存儲必須被認爲是易失的。狀態後端負責將任務的狀態檢查點指向遠程和持久存儲。用於檢查點的遠程存儲能夠是分佈式文件系統或數據庫系統。狀態後端在狀態檢查點的方式上有所不一樣。例如,RocksDB狀態後端支持增量檢查點,這能夠顯著減小很是大的狀態的檢查點開銷。
流應用的一個基本需求是根據輸入速率的增長或減小而調整算子的並行性。有狀態算子,調整並行度比較難。由於咱們須要把狀態從新分組,分配到與以前數量不等的並行任務上。
帶有鍵值分區狀態的算子能夠經過將鍵從新劃分來進行任務的擴縮容。可是,爲了提升效率,Flink不會以鍵爲單位來進行劃分。相反,Flink以鍵組做爲單位來從新分配,每一個鍵組裏面包含了多個鍵。
帶有算子列表狀態的算子在擴縮容時會對列表中的條目進行從新分配。理論上來講,全部並行任務的列表項會被統一收集起來,並再均勻從新分配。若是列表項的數量少於算子的新並行度,一些任務將以空狀態開始。圖3-14顯示了操做符列表狀態的從新分配。
帶有算子聯合狀態的算子會在擴縮容時把狀態列表中的所有條目 廣播到所有任務中。而後,任務本身來選擇使用哪些項和丟棄哪些項。如圖3-15顯示。
帶有算子廣播狀態的算子在擴縮容時會把狀態拷貝到所有新任務上。這樣作是由於廣播狀態要確保全部任務具備相同的狀態。在縮容的狀況下,直接簡單地停掉多餘的任務便可。如圖3-16顯示。
Flink是一個分佈式的數據處理系統,且任務在本地維護它們的狀態,Flink必須確保這種狀態不會丟失,而且在發生故障時保持一致。
在本節中,咱們將介紹Flink的檢查點和故障恢復機制,看一下它們是如何提供精確一次的狀態一致性保障。此外,咱們還討論了Flink獨特的保存點(savepoint)功能,它就像一把瑞士軍刀,解決了運行流式應用過程當中的諸多難題。
有狀態流應用程序的一致檢查點是在全部任務都處理完等量的原始輸出後對所有任務狀態進行的一個拷貝。咱們能夠經過一個樸素算法來對應用創建一致性檢查點的過程進行解釋。樸素算法的步驟爲:
下圖展現了一個一致性檢查點的例子,這個算法讀取數據,而後對奇數和偶數分別求和
在流應用執行期間,Flink週期性爲應用程序生成檢查點。一旦發生故障,Flink會使用最新的檢查點將應用狀態恢復到某個一致性的點並重啓應用。圖3-18顯示了恢復過程。
應用程序恢復分爲三個步驟:
假設全部算子都將它們的狀態寫入檢查點並從中恢復,而且全部輸入流的消費位置都能重置到檢查點生成那一刻,那麼這種檢查點和恢復機制能夠爲整個應用提供精確一次的一致性保障。輸入流是否能夠重置,取決於它的具體實現以及所消費外部系統是否提供相關接口。例如,像Apache Kafka這樣的事件日誌能夠從以前的某個偏移讀取記錄。相反,若是是從socket消費而來則沒法重置,由於socket一旦消耗了數據就會丟棄數據。
咱們必須指出,Flink的檢查點和恢復機制只能重置流應用內部的狀態。根據應用所採用的數據彙算子,在恢復期間,某些結果記錄可能被屢次發送到下游系統,例如事件日誌、文件系統或數據庫。對於某些存儲系統,Flink提供的數據匯能夠保證了精確一次輸出。
Flink基於Chandy-Lamport的分佈式快照算法來實現檢查點。該算法並不會暫停整個應用程序,在部分任務持久化狀態的過程當中,其餘任務能夠繼續執行。
Flink的檢查點算法使用一種稱爲檢查點分隔符的特殊類型的記錄,它與水位線相似。檢查點分隔符攜帶一個檢查點ID來標識它所屬的檢查點,分隔符從邏輯上將流分割爲兩個部分。由檢查點以前的記錄 引發的全部狀態修改都包含在分隔符對應的檢查點中,而由屏障以後的記錄引發的全部修改都不包含在分隔符對應的檢查點中。
下面咱們經過一個簡單的例子來解釋這個算法
咱們使用一個簡單的流應用程序示例逐步解釋該算法。應用程序由兩個數據源任務組成,每一個數據源任務消耗一個不斷增加的數字流。數據源任務的分別輸出奇數分區和偶數分區。每一個分區都由一個任務處理,該任務計算全部接收到的數字的總和,並將更新後的總和發送給下游數據匯。該應用程序如圖3-19所示。
JobManager經過向每一個數據源任務 發送一個新的帶有檢查點編號的消息來啓動檢查點生成流程,如圖3-20所示。
當數據源任務接收到檢查點消息時,
數據源發出的檢查點分隔符被廣播給下游任務。當下游任務接收到新的檢查點分隔符時,將繼續等待來自全部其餘上游任務的分隔符到達檢查點。在等待期間,它繼續處理那些還沒有提供分隔符的上游任務的記錄,而那些提供了分隔符的上游任務的記錄會被緩存,等待稍後處理。等待全部檢查點到達的過程稱爲檢查點對齊,如圖3-22所示。
一旦一個任務從它的全部上游任務收到分隔符,它就會讓狀態後端生成一個檢查點,並將檢查點分隔符廣播給它的全部下游任務,如圖3-23所示。
在發出檢查點分隔符後,任務就開始處理緩衝的記錄。在處理完全部緩衝記錄以後,任務會繼續處理其輸入流。圖3-24顯示了此時的應用程序。
最後,檢查點分隔符到達數據匯。當數據匯接收到分割符時,會先進行對齊操做,而後將自身狀態寫入檢查點,並向JobManager確認接收到該分隔符。一旦應用的全部任務都發送了檢查點確認,JobManager就會將應用程序的檢查點記錄爲已完成。圖3-25顯示了檢查點算法的最後一步。如前所述,已完成的檢查點可用於從故障中恢復應用。
Flink的檢查點算法從流應用中產生一致的分佈式檢查點,而不會中止整個應用。可是,它會增長應用的處理延遲。Flink實現了一些調整,能夠在某些條件下減輕性能影響。
任務在將其狀態寫入檢查點的過程當中,將被阻塞。一種好的方法是先將檢查點寫入本地,而後任務繼續執行它的常規處理,另外一個進程負責將檢查點傳到遠端存儲。
此外,還能夠在分隔符對齊的過程當中不緩存那些已經收到分隔符所對應分區的記錄,而是直接處理。但這會讓一致性保證從精確一次下降到至少一次
Flink最有價值和最獨特的功能之一是保存點。原則上,保存點的生成算法與檢查點生成算法同樣,所以能夠把保存點看做是帶有一些額外元數據的檢查點。Flink不會自動生成保存點,而是須要用戶顯式的調用來生成保存點。
給定一個應用和一個兼容的保存點,咱們能夠從該保存點啓動應用。這將把應用的狀態初始化爲保存點的狀態,並從獲取保存點的位置運行應用。
保存點能夠用在不少狀況
在本節中,咱們將描述Flink在從保存點啓動時如何去初始化應用狀態。
一個典型的應用程序包含多個狀態,它們分佈在不一樣算子的不一樣任務上。
下圖顯示了一個具備三個算子的應用程序,每一個算子各運行兩個任務。其中一個算子(OP-1)有一個算子狀態(OS-1),另外一個算子(OP-2)有兩個鍵值分區狀態(KS-1和KS-2)。當生成保存點時,全部任務的狀態都會被複制到一個持久化存儲位置上。
保存點中的狀態副本會按照算子標識符和狀態名稱進行組織。該算子標識符和狀態名須要可以將保存點的狀態數據映射到應用啓動後的狀態上。當從保存點啓動應用程序時,Flink將保存點數據從新分發給相應算子的任務。
若是應用發生了修改,只有那些算子標識符和狀態名稱沒變的狀態副本才能被成功還原。默認狀況下,Flink會分配惟一的算子標識符。可是,算子的標識符是基於其前面算子的標識符生成的。這樣,假如上游的算子標識符發生了變化,那麼下游的算子也會變化。所以,咱們強烈建議爲操做符手動分配惟一標識符,而不依賴於Flink的默認賦值。