做者:陳守元、戴資力算法
Apache Flink 是一個分佈式大數據處理引擎,可對有限數據流和無限數據流進行有狀態或無狀態的計算,可以部署在各類集羣環境,對各類規模大小的數據進行快速計算。windows
瞭解 Flink 應用開發須要先理解 Flink 的 Streams、State、Time 等基礎處理語義以及 Flink 兼顧靈活性和方便性的多層次 API。後端
在架構部分,主要分爲如下四點:網絡
第一, Flink 具有統一的框架處理有界和無界兩種數據流的能力架構
第二, 部署靈活,Flink 底層支持多種資源調度器,包括 Yarn、Kubernetes 等。Flink 自身帶的 Standalone 的調度器,在部署上也十分靈活。框架
第三, 極高的可伸縮性,可伸縮性對於分佈式系統十分重要,阿里巴巴雙11大屏採用 Flink 處理海量數據,使用過程當中測得 Flink 峯值可達 17 億/秒。運維
第四, 極致的流式處理性能。Flink 相對於 Storm 最大的特色是將狀態語義徹底抽象到框架中,支持本地狀態讀取,避免了大量網絡 IO,能夠極大提高狀態存取的性能。分佈式
後面會有專門課程講解,此處簡單分享 Flink 關於運維及業務監控的內容:性能
Data Pipeline 的核心場景相似於數據搬運並在搬運的過程當中進行部分數據清洗或者處理,而整個業務架構圖的左邊是 Periodic ETL,它提供了流式 ETL 或者實時 ETL,可以訂閱消息隊列的消息並進行處理,清洗完成後實時寫入到下游的 Database 或 File system 中。場景舉例:大數據
當下遊要構建實時數倉時,上游則可能須要實時的 Stream ETL。這個過程會進行實時清洗或擴展數據,清洗完成後寫入到下游的實時數倉的整個鏈路中,可保證數據查詢的時效性,造成實時數據採集、實時數據處理以及下游的實時 Query。
搜索引擎這塊以淘寶爲例,當賣家上線新商品時,後臺會實時產生消息流,該消息流通過 Flink 系統時會進行數據的處理、擴展。而後將處理及擴展後的數據生成實時索引,寫入到搜索引擎中。這樣當淘寶賣家上線新商品時,能在秒級或者分鐘級實現搜索引擎的搜索。
Data Analytics,如圖,左邊是 Batch Analytics,右邊是 Streaming Analytics。Batch Analysis 就是傳統意義上使用相似於 Map Reduce、Hive、Spark Batch 等,對做業進行分析、處理、生成離線報表,Streaming Analytics 使用流式分析引擎如 Storm,Flink 實時處理分析數據,應用較多的場景如實時大屏、實時報表。
從某種程度上來講,全部的實時的數據處理或者是流式數據處理都是屬於 Data Driven,流計算本質上是 Data Driven 計算。應用較多的如風控系統,當風控系統須要處理各類各樣複雜的規則時,Data Driven 就會把處理的規則和邏輯寫入到Datastream 的 API 或者是 ProcessFunction 的 API 中,而後將邏輯抽象到整個 Flink 引擎中,當外面的數據流或者是事件進入就會觸發相應的規則,這就是 Data Driven 的原理。在觸發某些規則後,Data Driven 會進行處理或者是進行預警,這些預警會發到下游產生業務通知,這是 Data Driven 的應用場景,Data Driven 在應用上更多應用於復瑣事件的處理。
傳統批處理方法是持續收取數據,以時間做爲劃分多個批次的依據,再週期性地執行批次運算。但假設須要計算每小時出現事件轉換的次數,若是事件轉換跨越了所定義的時間劃分,傳統批處理會將中介運算結果帶到下一個批次進行計算;除此以外,當出現接收到的事件順序顛倒狀況下,傳統批處理仍會將中介狀態帶到下一批次的運算結果中,這種處理方式也不盡如人意。
第一點,要有理想方法,這個理想方法是引擎必需要有能力能夠累積狀態和維護狀態,累積狀態表明着過去歷史中接收過的全部事件,會影響到輸出。
第二點,時間,時間意味着引擎對於數據完整性有機制能夠操控,當全部數據都徹底接受到後,輸出計算結果。
第三點,理想方法模型須要實時產生結果,但更重要的是採用新的持續性數據處理模型來處理實時數據,這樣才最符合 continuous data 的特性。
流式處理簡單來說即有一個無窮無盡的數據源在持續收取數據,以代碼做爲數據處理的基礎邏輯,數據源的數據通過代碼處理後產生出結果,而後輸出,這就是流式處理的基本原理。
假設 Input Streams 有不少個使用者,每一個使用者都有本身的 ID,若是計算每一個使用者出現的次數,咱們須要讓同一個使用者的出現事件流到同一運算代碼,這跟其餘批次須要作 group by 是一樣的概念,因此跟 Stream 同樣須要作分區,設定相應的 key,而後讓一樣的 key 流到同一個 computation instance 作一樣的運算。
如圖,上述代碼中定義了變數 X,X 在數據處理過程當中會進行讀和寫,在最後輸出結果時,能夠依據變數 X 決定輸出的內容,即狀態 X 會影響最終的輸出結果。這個過程當中,第一個重點是先進行了狀態 co-partitioned key by,一樣的 key 都會流到 computation instance,與使用者出現次數的原理相同,次數即所謂的狀態,這個狀態必定會跟同一個 key 的事件累積在同一個 computation instance。
至關於根據輸入流的 key 從新分區的 狀態,當分區進入 stream 以後,這個 stream 會累積起來的狀態也變成 copartiton 了。第二個重點是 embeded local state backend。有狀態分散式流式處理的引擎,狀態可能會累積到很是大,當 key 很是多時,狀態可能就會超出單一節點的 memory 的負荷量,這時候狀態必須有狀態後端去維護它;在這個狀態後端在正常情況下,用 in-memory 維護便可。
當咱們考慮狀態容錯時不免會想到精確一次的狀態容錯,應用在運算時累積的狀態,每筆輸入的事件反映到狀態,更改狀態都是精確一次,若是修改超過一次的話也意味着數據引擎產生的結果是不可靠的。
仍是以使用者出現次數來看,若是某個使用者出現的次數計算不許確,不是精確一次,那麼產生的結果是沒法做爲參考的。在考慮精確的容錯保證前,咱們先考慮最簡單的使用場景,如無限流的數據進入,後面單一的 Process 進行運算,每處理完一筆計算即會累積一次狀態,這種狀況下若是要確保 Process 產生精確一次的狀態容錯,每處理完一筆數據,更改完狀態後進行一次快照,快照包含在隊列中並與相應的狀態進行對比,完成一致的快照,就能確保精確一次。
Flink 做爲分佈式的處理引擎,在分佈式的場景下,進行多個本地狀態的運算,只產生一個全域一致的快照,如須要在不中斷運算值的前提下產生全域一致的快照,就涉及到分散式狀態容錯。
關於 Global consistent snapshot,當 Operator 在分佈式的環境中,在各個節點作運算,首先產生 Global consistent snapshot 的方式就是處理每一筆數據的快照點是連續的,這筆運算流過全部的運算值,更改完全部的運算值後,可以看到每個運算值的狀態與該筆運算的位置,便可稱爲 consistent snapshot,固然,Global consistent snapshot 也是簡易場景的延伸。
首先了解一下 Checkpoint,上面提到連續性快照每一個 Operator 運算值本地的狀態後端都要維護狀態,也就是每次將產生檢查點時會將它們傳入共享的 DFS 中。當任何一個 Process 掛掉後,能夠直接從三個完整的 Checkpoint 將全部的運算值的狀態恢復,從新設定到相應位置。Checkpoint 的存在使整個 Process 可以實現分散式環境中的 Exactly-once。
關於 Flink 如何在不中斷運算的情況下持續產生 Global consistent snapshot,其方式是基於用 simple lamport 演算法機制下延伸的。已知的一個點 Checkpoint barrier, Flink 在某個 Datastream 中會一直安插 Checkpoint barrier,Checkpoint barrier 也會 N — 1等等,Checkpoint barrier N 表明着全部在這個範圍裏面的數據都是Checkpoint barrier N。
舉例:假設如今須要產生 Checkpoint barrier N,但實際上在 Flink 中是由 job manager 觸發 Checkpoint,Checkpoint 被觸發後開始從數據源產生 Checkpoint barrier。當 job 開始作 Checkpoint barrier N 的時候,能夠理解爲 Checkpoint barrier N 須要逐步填充左下角的表格。
如圖,當部分事件標爲紅色,Checkpoint barrier N 也是紅色時,表明着這些數據或事件都由 Checkpoint barrier N 負責。Checkpoint barrier N 後面白色部分的數據或事件則不屬於 Checkpoint barrier N。
在以上的基礎上,當數據源收到 Checkpoint barrier N 以後會先將本身的狀態保存,以讀取 Kafka 資料爲例,數據源的狀態就是目前它在 Kafka 分區的位置,這個狀態也會寫入到上面提到的表格中。下游的 Operator 1 會開始運算屬於 Checkpoint barrier N 的數據,當 Checkpoint barrier N 跟着這些數據流動到 Operator 1 以後,Operator 1 也將屬於 Checkpoint barrier N 的全部數據都反映在狀態中,當收到 Checkpoint barrier N 時也會直接對 Checkpoint 去作快照。
當快照完成後繼續往下游走,Operator 2 也會接收到全部數據,而後搜索 Checkpoint barrier N 的數據並直接反映到狀態,當狀態收到 Checkpoint barrier N 以後也會直接寫入到 Checkpoint N 中。以上過程到此能夠看到 Checkpoint barrier N 已經完成了一個完整的表格,這個表格叫作 Distributed Snapshots,即分佈式快照。分佈式快照能夠用來作狀態容錯,任何一個節點掛掉的時候能夠在以前的 Checkpoint 中將其恢復。繼續以上 Process,當多個 Checkpoint 同時進行,Checkpoint barrier N 已經流到 job manager 2,Flink job manager 能夠觸發其餘的 Checkpoint,好比 Checkpoint N + 1,Checkpoint N + 2 等等也同步進行,利用這種機制,能夠在不阻擋運算的情況下持續地產生 Checkpoint。
狀態維護即用一段代碼在本地維護狀態值,當狀態值很是大時須要本地的狀態後端來支持。
如圖,在 Flink 程序中,能夠採用 getRuntimeContext().getState(desc); 這組 API 去註冊狀態。Flink 有多種狀態後端,採用 API 註冊狀態後,讀取狀態時都是經過狀態後端來讀取的。Flink 有兩種不一樣的狀態值,也有兩種不一樣的狀態後端:
Flink 目前支持以上兩種狀態後端,一種是純 memory 的狀態後端,另外一種是有資源磁盤的狀態後端,在維護狀態時能夠根據狀態的數量選擇相應的狀態後端。
3.1 不一樣時間種類
在 Flink 及其餘進階的流式處理引擎出現以前,大數據處理引擎一直只支持 Processing-time 的處理。假設定義一個運算 windows 的窗口,windows 運算設定每小時進行結算。以 Processing-time 進行運算時能夠發現數據引擎將 3 點至 4 點間收到的數據作結算。實際上在作報表或者分析結果時是想了解真實世界中 3 點至 4 點之間實際產生數據的輸出結果,瞭解實際數據的輸出結果就必須採用 Event – Time 了。
如圖,Event - Time 至關於事件,它在數據最源頭產生時帶有時間戳,後面都須要用時間戳來進行運算。用圖來表示,最開始的隊列收到數據,每小時對數據劃分一個批次,這就是 Event - Time Process 在作的事情。
3.2 Event - Time 處理
Event - Time 是用事件真實產生的時間戳去作 Re-bucketing,把對應時間 3 點到 4 點的數據放在 3 點到 4 點的 Bucket,而後 Bucket 產生結果。因此 Event - Time 跟 Processing - time 的概念是這樣對比的存在。
Event - Time 的重要性在於記錄引擎輸出運算結果的時間。簡單來講,流式引擎連續 24 小時在運行、蒐集資料,假設 Pipeline 裏有一個 windows Operator 正在作運算,每小時能產生結果,什麼時候輸出 windows 的運算值,這個時間點就是 Event - Time 處理的精髓,用來表示該收的數據已經收到。
3.3 Watermarks
Flink 其實是用 watermarks 來實現 Event - Time 的功能。Watermarks 在 Flink 中也屬於特殊事件,其精髓在於當某個運算值收到帶有時間戳「 T 」的 watermarks 時就意味着它不會接收到新的數據了。使用 watermarks 的好處在於能夠準確預估收到數據的截止時間。舉例,假設預期收到數據時間與輸出結果時間的時間差延遲 5 分鐘,那麼 Flink 中全部的 windows Operator 搜索 3 點至 4 點的數據,但由於存在延遲須要再多等5分鐘直至收集完 4:05 分的數據,此時方能斷定 4 點鐘的資料收集完成了,而後纔會產出 3 點至 4 點的數據結果。這個時間段的結果對應的就是 watermarks 的部分。
流式處理應用無時無刻不在運行,運維上有幾個重要考量:
Checkpoint 完美符合以上需求,不過 Flink 中還有另一個名詞保存點(Savepoint),當手動產生一個 Checkpoint 的時候,就叫作一個 Savepoint。Savepoint 跟 Checkpoint 的差異在於檢查點是 Flink 對於一個有狀態應用在運行中利用分佈式快照持續週期性的產生 Checkpoint,而 Savepoint 則是手動產生的 Checkpoint,Savepoint 記錄着流式應用中全部運算元的狀態。
如圖,Savepoint A 和 Savepoint B,不管是變動底層代碼邏輯、修 bug 或是升級 Flink 版本,從新定義應用、計算的平行化程度等,最早須要作的事情就是產生 Savepoint。
Savepoint 產生的原理是在 Checkpoint barrier 流動到全部的 Pipeline 中手動插入從而產生分佈式快照,這些分佈式快照點即 Savepoint。Savepoint 能夠放在任何位置保存,當完成變動時,能夠直接從 Savepoint 恢復、執行。
從 Savepoint 的恢復執行須要注意,在變動應用的過程當中時間在持續,如 Kafka 在持續收集資料,當從 Savepoint 恢復時,Savepoint 保存着 Checkpoint 產生的時間以及 Kafka 的相應位置,所以它須要恢復到最新的數據。不管是任何運算,Event - Time 均可以確保產生的結果徹底一致。
假設恢復後的從新運算用 Process Event - Time,將 windows 窗口設爲 1 小時,從新運算可以在 10 分鐘內將全部的運算結果都包含到單一的 windows 中。而若是使用 Event – Time,則相似於作 Bucketing。在 Bucketing 的情況下,不管從新運算的數量多大,最終從新運算的時間以及 windows 產生的結果都必定能保證徹底一致。
本文首先從 Apache Flink 的定義、架構、基本原理入手,對大數據流計算相關的基本概念進行辨析,在此基礎上簡單回顧了大數據處理方式的歷史演進以及有狀態的流式數據處理的原理,最後從目前有狀態的流式處理面臨的挑戰分析 Apache Flink 做爲業界公認爲最好的流計算引擎之一所具有的自然優點。但願有助於你們釐清大數據流式處理引擎涉及的基本概念,可以更加駕輕就熟的使用 Flink。