Flink 在有贊實時計算的實踐

1、前言

這篇主要由五個部分來組成:數據庫

首先是有讚的實時平臺架構。緩存

其次是在調研階段咱們爲何選擇了 Flink。在這個部分,主要是 Flink 與 Spark 的 structured streaming 的一些對比和選擇 Flink 的緣由。架構

第三個就是比較重點的內容,Flink 在有讚的實踐。這其中包括了咱們在使用 Flink 的過程當中碰到的一些坑,也有一些具體的經驗。異步

第四部分是將實時計算 SQL 化,界面化的一些實踐。jvm

最後的話就是對 Flink 將來的一些展望。這塊能夠分爲兩個部分,一部分是咱們公司接下來會怎麼去更深刻的使用 Flink,另外一部分就是 Flink 之後可能會有的的一些新的特性。分佈式


2、有贊實時平臺架構

有讚的實時平臺架構呢有幾個主要的組成部分。函數

圖片描述

首先,對於實時數據來講,一個消息中間件確定是必不可少的。在有贊呢,除了業界經常使用的 Kafka 之外,還有 NSQ。與 Kafka 有別的是,NSQ 是使用 Go 開發的,因此公司封了一層 Java 的客戶端是經過 push 和 ack 的模式去保證消息至少投遞一次,因此 Connector 也會有比較大的差距,尤爲是實現容錯的部分。在實現的過程當中呢,參考了 Flink 官方提供的 Rabbit MQ 的鏈接器,結合 NSQ client 的特性作了一些改造。性能

接下來就是計算引擎了,最古老的就是 Storm 了,如今依然還有一些任務在 Storm 上面跑,至於新的任務基本已經不會基於它來開發了,由於除了開發成本高之外,語義的支持,SQL 的支持包括狀態管理的支持都作得不太好,吞吐量還比較低,將 Storm 的任務遷移到 Flink 上也是咱們接下來的任務之一。還有呢就是 Spark Streaming 了,相對來講 Spark 有一個比較好的生態,可是 Spark Streaming 是微批處理的,這給它帶來了不少限制,除了延遲高之外還會比較依賴外部存儲來保存中間狀態。 Flink 在有贊是比較新的引擎,爲何在有了 Spark 和 Storm 的狀況下咱們還要引入 Flink 呢,下一個部分我會提到。大數據

存儲引擎,除了傳統的 MySQL 之外,咱們還使用 HBase ,ES 和 ZanKV。ZanKV 是咱們公司開發的一個兼容 Redis 協議的分佈式 KV 數據庫,因此姑且就把它當成 Redis 來理解好了。優化

實時 OLAP 引擎的話基於 Druid,在多維的統計上面有很是好的應用。

最後是咱們的實時平臺。實時平臺提供了集羣管理,項目管理,任務管理和報警監控的功能。。

關於實時平臺的架構就簡單介紹到這裏,接下來是 Flink 在有讚的探索階段。在這個部分,我主要會對比的 Spark Structured Streaming。


3、爲何選擇引入 Flink 

至於爲何和 Spark Structured Streaming(SSS) 進行對比呢?由於這是實時SQL化這個大背景下比較有表明性的兩個引擎。

首先是性能上,從幾個角度來比較一下。首先是延遲,毫無疑問,Flink 做爲一個流式引擎是優於 SSS 的微批引擎的。雖說 Spark 也引入了一個連續的計算引擎,可是無論從語義的保證上,仍是從成熟度上,都是不如 Flink 的。據我所知,他們是經過將 rdd 長期分配到一個結點上來實現的。

其次比較直觀的指標就是吞吐了,這一點在某些場景下 Flink 略遜於 Spark 。可是當涉及到中間狀態比較大的任務呢,Flink 基於 RocksDB 的狀態管理就顯示出了它的優點。
 
Flink 在中間狀態的管理上可使用純內存,也可使用 RocksDB 。至於 RocksDB ,簡單點理解的話就是一個帶緩存的嵌入式數據庫。藉助持久化到磁盤的能力,Flink 相比 SSS 來講能夠保存的狀態量大得多,而且不容易OOM。而且在作 checkpoint 中選用了增量模式,應該是隻須要備份與上一次 checkpoint 時不一樣的 sst 文件。使用過程當中,發現 RocksDB 做爲狀態管理性能也是能夠知足咱們需求的。

聊完性能,接下來就說一說 SQL 化,這也是如今的一個大方向吧。我在開始嘗試 SSS 的時候,嘗試了一個 SQL 語句中有多個聚合操做,可是卻拋了異常。 後面仔細看了文檔,發現確實這在 SSS 中是不支持的。第二個是 distinct 也是不支持的。這兩點 Flink 是遠優於 SSS 的。因此從實時 SQL 的角度,Flink 又爲本身贏得了一票。除此以外,Flink 有更靈活的窗口。還有輸出的話,一樣參考的是 DataFlow 模型,Flink 實現支持刪除並更新的操做,SSS 僅支持更新的操做。(這邊 SSS 是基於 Spark 的 2.3版本)

API 的靈活性。在 SSS 中,誠然 table 帶來了比較大的方便,可是對於有一些操做依然會想經過 DStream 或者 rdd 的形式來操做,可是 SSS 並無提供這樣的轉換,只能編寫一些 UDF。可是在 Flink 中,Table 和 DataStream 能夠靈活地互相轉換,以應對更復雜的場景。


4、Flink在有讚的實踐

在真正開始使用 Flink 以前呢,第一個要考慮的就是部署的問題。由於現有的技術棧,因此選擇了部署在 Yarn 上,而且使用的是 Single Job 的模式,雖然會有更多的 ApplicationMaster,但無疑是增長了隔離性的。

4.1 問題一: FLINK-9567

在開始部署的時候我遇到了一個比較奇怪的問題。先講一下背景吧,由於還處於調研階段,因此使用的是 Yarn 的默認隊列,優先級比較低,在資源緊張的時候也容易被搶佔。
有一個上午,我起了一個任務,申請了5個 Container 來運行 TaskExecutor ,一個比較簡單地帶狀態的流式任務,想多跑一段時間看看穩定不穩定。這個 Flink 任務最後佔了100多個 container,還在不停增長,可是隻有五個 Container 在工做,其餘的 container 都註冊了 slot,而且 slot 都處於閒置的狀態。如下兩張圖分別表明正常狀態下的任務,和出問題的任務。

圖片描述

出錯後

圖片描述

在涉及到這個問題細節以前,我先介紹一下 Flink 是如何和 Yarn 整合到一塊的。根據下圖,咱們從下往上一個一個介紹這些組件是作什麼的。

圖片描述

TaskExecutor 是實際任務的執行者,它可能有多個槽位,每一個槽位執行一個具體的子任務。每一個 TaskExecutor 會將本身的槽位註冊到 SlotManager 上,並彙報本身的狀態,是忙碌狀態,仍是處於一個閒置的狀態。

SlotManager 既是 Slot 的管理者,也負責給正在運行的任務提供符合需求的槽位。還記錄了當前積壓的槽位申請。當槽位不夠的時候向Flink的ResourceManager申請容器。

Pending slots 積壓的 Slot 申請及計數器

Flink 的 ResourceManager 則負責了與 Yarn 的 ResourceManager 進行交互,進行一系列例如申請容器,啓動容器,處理容器的退出等等操做。由於採用的是異步申請的方式,因此還須要記錄當前積壓的容器申請,防止接收過多容器。

Pending container request 積壓容器的計數器

AMRMClient 是異步申請的執行者,CallbackHandler 則在接收到容器和容器退出的時候通知 Flink 的 ResourceManager。

Yarn 的 ResourceManager 則像是一個資源的分發器,負責接收容器請求,併爲 Client 準備好容器。

這邊一會兒引入的概念有點多,下面我用一個簡單地例子來描述一下這些組件在運行中起到的角色。

首先,咱們的配置是3個 TaskManager,每一個 TaskManager 有兩個 Slot,也就是總共須要6個槽位。當前已經擁有了4個槽位,任務的調度器向 Slot 申請還須要兩個槽位來運行子任務。

圖片描述

這時 SlotManager 發現全部的槽位都已經被佔用了,因此它將這個 slot 的 request 放入了 pending slots 當中。因此能夠看到 pending slots 的那個計數器從剛纔的0跳轉到了如今的2. 以後 SlotManager 就向 Flink 的 ResourceManager 申請一個新的 TaskExecutor,正好就能夠知足這兩個槽位的需求。因而 Flink 的 ResourceManager 將 pending container request 加1,並經過 AMRM Client 去向 Yarn 申請資源。

圖片描述

當 Yarn 將相應的 Container 準備好之後,經過 CallbackHandler 去通知 Flink 的 ResourceManager。Flink 就會根據在每個收到的 container 中啓動一個 TaskExecutor ,而且將 pending container request 減1,當 pending container request 變爲0以後,即便收到新的 container 也會立刻退回。

圖片描述

當 TaskExecutor 啓動以後,會向 SlotManager 註冊本身的兩個 Slot 可用,SlotManager 便會將兩個積壓的 SlotRequest 完成,通知調度器這兩個子任務能夠到這個新的 TaskExecutor 上執行,而且 pending requests 也被置爲0. 到這兒一切都符合預期。

圖片描述

那這個超發的問題又是如何出現的呢?首先咱們看一看這就是剛剛那個正常運行的任務。它佔用了6個 Slot。

若是在這個時候,出現了一些緣由致使了 TaskExecutor 非正常退出,好比說 Yarn 將資源給搶佔了。這時 Yarn 就會通知 Flink 的 ResourceManager 這三個 Container 已經異常退出。因此 Flink 的 ResourceManager 會當即申請三個新的 container。在這兒會討論的是一個 worst case,由於這個問題其實也不是穩定復現的。

CallbackHandler 兩次接收到回調發現 Container 是異常退出,因此當即申請新的 Container,pending container requests 也被置爲了3.

圖片描述

若是在這時,任務重啓,調度器會向 SlotManager 申請6個 Slot,SlotManager 中也沒有可用 Slot,就會向 Flink 的 ResourceManager 申請3個 Container,這時 pending container requests 變爲了6.

圖片描述

最後呢結果就如圖所示,起了6個 TaskExecutor,總共12個 Slot,可是隻有6個是被正常使用的,還有6個一直處於閒置的狀態。

圖片描述

在修復這個問題的過程當中,我有兩次嘗試。第一次嘗試,在 Container 異常退出之後,我不去當即申請新的 container。可是問題在於,若是 Container 在啓動 TaskExecutor 的過程當中出錯,那麼失去了這種補償的機制,有些 Slot Request 會被一直積壓,由於 SlotManager 已經爲它們申請了 Container。
 
第二次嘗試是在 Flink 的 ResourceManager 申請新的 container 以前先去檢查 pending slots,若是當前的積壓 slots 已經能夠被積壓的 container 給知足,那就沒有必要申請新的 container 了。

4.2 問題二: 監控

咱們使用過程當中踩到的第二個坑,實際上是跟延遲監控相關的。例子是一個很簡單的任務,兩個 source,兩個除了 source 以外的 operator,並行度都是2. 每一個 source 和 operator 它都有兩個子任務。

圖片描述

任務的邏輯是很簡單,可是呢當咱們打開延時監控。即便是這麼簡單的一個任務,它會記錄每個 source 的子任務到每個算子的子任務的延遲數據。這個延遲數據裏還包含了平均延遲,最大延遲,百分之99的延遲等等等等。那咱們能夠得出一個公式,延遲數據的數量是 source 的子任務數量乘以的 source 的數量乘以算子的並行度乘以算子的數量。N = n(subtasks per source) n(sources) n(subtasks per operator) * n(operator)

這邊我作一個比較簡單地假設,那就是 source 的子任務數量和算則的子任務數量都是 p - 並行度。從下面這個公式咱們能夠看出,監控的數量隨着並行度的上升呈平方增加。N = p^2 n(sources) n(operator)

圖片描述

若是咱們把上個任務提高到10個並行度,那麼就會收到400份的延遲數據。這可能看起來尚未太大的問題,這貌似並不影響組件的正常運行。

可是,在 Flink 的 dev mailing list 當中,有一個用戶反饋在開啓了延遲監控以後,JobMaster 很快就會掛掉。他收到了24000+的監控數據,而且包含這些數據的 ConcurrentHashMap 在內存中佔用了1.6 G 的內存。常規狀況 Flink 的 JobMaster 時會給到多少內存,我通常會配1-2 g,最後會致使長期 FullGC 和 OOM 的狀況。

那怎麼去解決這個問題呢?當延遲監控已經開始影響到系統的正常工做的時候,最簡單的辦法就是把它給關掉。但是把延時監控關掉,一方面咱們沒法得知當前任務的延時,另外一方面,又沒有辦法去針對延時作一些報警的功能。
 
因此另外一個解決方案就以下。首先是 Flink-10243,它提供了更多的延遲監控粒度的選項,從源頭上減小數量。好比說咱們使用了 Single 模式去採集這些數據,那它只會記錄每一個 operator 的子任務的延遲,忽略是從哪一個 source 或是 source 的子任務中來。這樣就能夠得出這樣一個公式,也能將以前咱們提到的十個並行度的任務產生的400個延時監控下降到了40個。這個功能發佈在了1.7.0中,而且 backport 回了1.5.5和1.6.2.
 
此外,Flink-10246 提出了改進 MetricQueryService。它包含了幾個子任務,前三個子任務爲監控服務創建了一個專有的低優先級的 ActorSystem,在這裏能夠簡單的理解爲一個獨立的線程池提供低優先級的線程去處理相關任務。它的目的也是爲了防止監控任務影響到主要的組件。這個功能發佈在了1.7.0中。
 
還有一個就是 Flink-10252,它還依舊處於 review 和改進當中,目的是爲了控制監控消息的大小。

 

4.3 具體實踐一

接下來會談一下 Flink 在有讚的一些具體應用。
 
首先是 Flink 結合 Spring。爲何要將這二者作結合呢,首先在有贊有不少服務都只暴露了 Dubbo 的接口,而用戶每每都是經過 Spring 去獲取這個服務的 client,在實時計算的一些應用中也是如此。
 
另外,有很多數據應用的開發也是 Java 工程師,他們但願能在 Flink 中使用 Spring 以及生態中的一些組件去簡化他們的開發。用戶的需求確定得獲得知足。接下來我會講一些錯誤的典型,以及最後是怎麼去使用的。

第一個錯誤的典型就是在 Flink 的用戶代碼中啓動一個 Spring 環境,而後在算子中取調用相關的 bean。可是事實上,最後這個 Spring Context 是啓動在 client 端的,也就是提交任務的這一端,在圖中有一個紅色的方框中間寫着 Spring Context 表示了它啓動的位置。但是用戶在實際調用時確實在 TaskManager 的 TaskSlot 中,它們都處在不一樣的 jvm,這明顯是不合理的。因此呢咱們又遇到了第二個錯誤。

圖片描述

第二個錯誤比第一個錯誤看起來要好多了,咱們在算子中使用了 RichFunction,而且在 open 方法中經過配置文件獲取了一個 Spring Context。可是先不說一個 TaskManager 中啓動幾個 Spring Context 是否是浪費,一個 Jvm 中啓動兩個 Spring Context 就會出問題。可能有用戶就以爲,那還不簡單,把 TaskSlot 設爲1不就好了。但是還有 OperatorChain 這個機制將幾個窄依賴的算子綁定到一塊運行在一個 TaskSlot 中。那咱們關閉 OperatorChain 不就好了?仍是不行,Flink可能會作基於 CoLocationGroup 的優化,將多個 subtask 放到一個 TaskSlot 中輪番執行。

圖片描述

但其實最後的解決方案仍是比較容易的,無非是使用單例模式來封裝 SpringContext,確保每一個jvm中只有一個,在算子函數的 open 方法中經過這個單例來獲取相應的 Bean。

圖片描述

但是在調用 Dubbo 服務的時候,一次響應每每最少也要在10 ms 以上。一個 TaskSlot 最大的吞吐也就在一千,能夠說對性能是大大的浪費。那麼解決這個問題的話能夠經過異步和緩存,對於屢次返回同一個值的調用可使用緩存,提高吞吐咱們可使用異步。

4.4 具體實踐二

但是若是想同時使用異步和緩存呢?剛開始我以爲這是一個挺容易實現的功能,但在實際寫 RichAsyncFunction 的時候我發現並無辦法使用 Flink 託管的 KeyedState。因此最初想到的方法就是作一個相似 LRU 的 Cache 去緩存數據。可是這徹底不能借助到 Flink 的狀態管理的優點。因此我研究了一下實現。

爲何不支持呢?

當一條記錄進入算子的時候,Flink 會先將 key 提取出來並將 KeyedState 指向與這個 key 關聯的存儲空間,圖上就指向了 key4 相關的存儲空間。可是若是此時 key1 關聯的異步操做完成了,但願把內容緩存起來,會將內容寫入到 key4 綁定的存儲空間。當下一次 key1 相關的記錄進入算子時,回去 key1 關聯的存儲空間查找,但是根本找不到數據,只好再次請求。

圖片描述

因此解決的方法是定製一個算子,每條記錄進入系統,都讓它指向同一個公用 key 的存儲空間。在這個空間使用 MapState 來作緩存。最後算子運行的 function 繼承 AbstractRichFunction 在 open 方法中來獲取 KeyedState,實現 AsyncFunction 接口來作異步操做。

圖片描述


5、實時計算 SQL 化與界面化

最先咱們使用 SDK 的方式來簡化 SQL 實時任務的開發,可是這對用戶來講也不算很是友好,因此如今講 SQL 實時任務界面化,用 Flink 做爲底層引擎去執行這些任務。

在作 SQL 實時任務時,首先是外部系統的抽象,將數據源和數據池抽象爲流資源,用戶將它們數據的 Schema 信息和元信息註冊到平臺中,平臺根據用戶所在的項目組管理讀寫的權限。在這裏消息源的格式若是能作到統一能下降不少複雜度。好比在有贊,想要接入的用戶必須保證是 Json 格式的消息,經過一條樣例消息能夠直接生成 Schema 信息。

接下來是根據用戶選擇的數據源和數據池,獲取相應的 Schema 信息和元信息,在 Flink 任務中註冊相應的外部系統 Table 鏈接器,再執行相應的 SQL 語句。

在 SQL 語義不支持的功能上儘可能使用 UDF 的方式來拓展。

有數據源和數據池之間的元信息,還能夠獲取實時任務之間可能存在的依賴關係,而且能作到整個鏈路的監控


6、將來與展望

Flink 的批處理和 ML 模塊的嘗試,會跟 Spark 進行對比,分析優劣勢。目前還處於調研階段,目前比較關注的是 Flink 和 Hive的結合,對應 FLINK-10566 這個 issue。

從 Flink 的發展來說呢,我比較關注並參與接下來對於調度和資源管理的優化。如今 Flink 的調度和任務執行圖是耦合在一塊的,使用比較簡單地調度機制。經過將調度器隔離出來,作成可插拔式的,能夠應用更多的調度機制。此外,基於新的調度器,還能夠去作更靈活的資源補充和減小機制,實現 Auto Scaling。這可能在接下來的版本中會是一個重要的特性。對應 FLINK-10404 和 FLINK-10429 這兩個 issue。


最後打個小廣告,有贊大數據團隊基礎設施團隊,主要負責有讚的數據平臺(DP), 實時計算(Storm, Spark Streaming, Flink),離線計算(HDFS,YARN,HIVE, SPARK SQL),在線存儲(HBase),實時 OLAP(Druid) 等數個技術產品,歡迎感興趣的小夥伴聯繫 yangshimin@youzan.com

圖片描述

相關文章
相關標籤/搜索