日均處理萬億數據!Flink在快手的應用實踐與技術演進之路

董亭亭,快手大數據架構實時計算引擎團隊負責人。目前負責 Flink 引擎在快手內的研發、應用以及周邊子系統建設。2013 年畢業於大連理工大學,曾就任於奇虎 360、58 集團。主要研究領域包括:分佈式計算、調度系統、分佈式存儲等系統。後端

本次的分享包括如下三個部分:緩存

  1. 介紹 Flink 在快手的應用場景以及目前規模;
  2. 介紹 Flink 在落地過程的技術演進過程;
  3. 討論 Flink 在快手的將來計劃。

一.Flink 在快手應用場景與規模

1. Flink 在快手應用場景

快手計算鏈路是從 DB/Binlog 以及 WebService Log 實時入到 Kafka 中,而後接入 Flink 作實時計算,其中包括實時 ETL、實時分析、Interval Join 以及實時訓練,最後的結果存到 Druid、ES 或者 HBase 裏面,後面接入一些數據應用產品;同時這一份 Kafka 數據實時 Dump 一份到 Hadoop 集羣,而後接入離線計算。性能優化

Flink 在快手應用的類別主要分爲三大類:架構

  • 80% 統計監控:實時統計,包括各項數據的指標,監控項報警,用於輔助業務進行實時分析和監控;
  • 15% 數據處理:對數據的清洗、拆分、Join 等邏輯處理,例如大 Topic 的數據拆分、清洗;
  • 5% 數據處理:實時業務處理,針對特定業務邏輯的實時處理,例如實時調度。

Flink 在快手應用的典型場景包括:併發

  • 快手是分享短視頻跟直播的平臺,快手短視頻、直播的質量監控是經過 Flink 進行實時統計,好比直播觀衆端、主播端的播放量、卡頓率、開播失敗率等跟直播質量相關的多種監控指標;
  • 用戶增加分析,實時統計各投放渠道拉新狀況,根據效果實時調整各渠道的投放量;
  • 實時數據處理,廣告展示流、點擊流實時 Join,客戶端日誌的拆分等;
  • 直播 CDN 調度,實時監控各 CDN 廠商質量,經過 Flink 實時訓練調整各個CDN廠商流量配比。

2.Flink 集羣規模

快手目前集羣規模有 1500 臺左右,做業數量大約是 500 左右,日處理條目數總共有 1.7 萬億,峯值處理條目數大約是 3.7 千萬。集羣部署都是 On Yarn 模式,分爲離線集羣和實時集羣兩類集羣,其中離線集羣混合部署,機器經過標籤進行物理隔離,實時集羣是 Flink 專用集羣,針對隔離性、穩定性要求極高的業務部署。框架

二.快手 Flink 技術演進

快手 Flink 技術演進主要分爲三部分:運維

  1. 基於特定場景優化,包括 Interval Join 場景優化;
  2. 穩定性改進,包括數據源控速,JobManager 穩定性,做業頻繁失敗;
  3. 平臺建設。

1.場景優化

1.1 Interval Join 應用場景

Interval Join 在快手的一個應用場景是廣告展示點擊流實時 Join 場景:打開快手 App 可能會收到廣告服務推薦的廣告視頻,用戶有時會點擊展示的廣告視頻。這樣在後端造成兩份數據流,一份是廣告展示日誌,一份是客戶端點擊日誌。這兩份數據需進行實時 Join,將 Join 結果做爲樣本數據用於模型訓練,訓練出的模型會被推送到線上的廣告服務。該場景下展示之後 20 分鐘的點擊被認爲是有效點擊,實時 Join 邏輯則是點擊數據 Join 過去 20 分鐘展示。其中,展示流的數據量相對比較大,20 分鐘數據在 1 TB 以上。最初實時 Join 過程是業務本身實現,經過 Redis 緩存廣告展示日誌,Kafka 延遲消費客戶端點擊日誌實現 Join 邏輯,該方式缺點是實時性不高,而且隨着業務增加須要堆積更多機器,運維成本很是高。基於 Flink 使用 Interval Join 完美契合此場景,而且實時性高,可以實時輸出 Join 後的結果數據,對業務來講維護成本很是低,只須要維護一個 Flink 做業便可。分佈式

1.2 Interval Join 場景優化

1.2.1 Interval Join 原理:oop

Flink 實現 Interval join 的原理:兩條流數據緩存在內部 State 中,任意一數據到達,獲取對面流相應時間範圍數據,執行 joinFunction 進行 Join。隨着時間的推動,State 中兩條流相應時間範圍的數據會被清理。性能

在前面提到的廣告應用場景 Join 過去 20 分鐘數據,假設兩個流的數據徹底有序到達,Stream A 做爲展示流緩存過去 20 分鐘數據,Stream B 做爲點擊流每來一條數據到對面 Join 過去 20 分鐘數據便可。

Flink 實現 Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB)
         .between(Time.minutes(0),Time.minutes(20))
         .process(joinFunction)

1.2.2 狀態存儲策略選擇

關於狀態存儲策略選擇,生產環境狀態存儲 Backend 有兩種方式:

  1. FsStateBackend:State 存儲在內存,Checkpoint 時持久化到 HDFS;
  2. RocksDBStateBackend:State 存儲在 RocksDB 實例,可增量 Checkpoint,適合超大 State。在廣告場景下展示流 20 分鐘數據有 1 TB 以上,從節省內存等方面綜合考慮,快手最終選擇的是 RocksDBStateBackend。

在 Interval join 場景下,RocksDB 狀態存儲方式是將兩個流的數據存在兩個 Column Family 裏,RowKey 根據 keyGroupId+joinKey+ts 方式組織。

1.2.3 RocksDB 訪問性能問題

Flink 做業上線遇到的第一個問題是 RocksDB 訪問性能問題,表現爲:

  • 做業在運行一段時間以後出現反壓,吞吐降低。
  • 經過 Jstack 發現程序邏輯頻繁處於 RocksDB get 請求處。
  • 經過 Top 發現存在單線程 CPU 持續被打滿。

進一步對問題分析,發現:該場景下,Flink 內部基於 RocksDB State 狀態存儲時,獲取某個 Join key 值某段範圍的數據,是經過前綴掃描的方式獲取某個 Join key 前綴的 entries 集合,而後再判斷哪些數據在相應的時間範圍內。前綴掃描的方式會致使掃描大量的無效數據,掃描的數據大多緩存在 PageCache 中,在 Decode 數據判斷數據是否爲 Delete 時,消耗大量 CPU。

以上圖場景爲例,藍色部分爲目標數據,紅色部分爲上下邊界以外的數據,前綴掃描時會過多掃描紅色部分無用數據,在對該大量無效數據作處理時,將單線程 CPU 消耗盡。

1.2.4 針對 RocksDB 訪問性能優化

快手在 Interval join 該場景下對 RocksDB 的訪問方式作了如下優化:

  • 在 Interval join 場景下,是能夠精確的肯定需訪問的數據邊界範圍。因此用全 Key 範圍掃描代替前綴掃描,精確拼出查詢上下邊界 Full Key 即 keyGroupId+joinKey+ts[lower,upper]。
  • 範圍查詢 RocksDB ,能夠更加精確 Seek 到上下邊界,避免無效數據掃描和校驗。

優化後的效果:P99 查詢時延性能提高 10 倍,即 nextKey 獲取 RocksDB 一條數據, P99 時延由 1000 毫秒到 100 毫秒之內。 做業吞吐反壓問題進而獲得解決。

1.2.5 RocksDB 磁盤壓力問題

Flink 做業上線遇到的第二個問題是隨着業務的增加, RocksDB 所在磁盤壓力即將達到上限,高峯時磁盤 util 達到 90%,寫吞吐在 150 MB/s。詳細分析發現,該問題是由如下幾個緣由疊加致使:

  • Flink 機器選型爲計算型,大內存、單塊 HDD 盤,在集羣規模不是很大的狀況下,單個機器會有 4-5 個該做業 Container,同時使用一塊 HDD 盤。
  • RocksDB 後臺會頻繁進行 Compaction 有寫放大狀況,同時 Checkpoint 也在寫磁盤。

針對 RocksDB 磁盤壓力,快手內部作了如下優化:

  • 針對 RocksDB 參數進行調優,目的是減小 Compaction IO 量。優化後 IO 總量有一半左右的降低。
  • 爲更加方便的調整 RocksDB 參數,在 Flink 框架層新增 Large State RocksDB 配置套餐。同時支持 RocksDBStateBackend 自定義配置各類 RocksDB 參數。
  • 將來計劃,考慮將 State 用共享存儲的方式存儲,進一步作到減小 IO 總量,而且快速Checkpoint 和恢復。

2.穩定性改進

首先介紹下視頻質量監控調度應用背景,有多個 Kafka Topic 存儲短視頻、直播相關質量日誌,包括短視頻上傳/下載、直播觀衆端日誌,主播端上報日誌等。Flink Job 讀取相應 Topic 數據實時統計各種指標,包括播放量、卡頓率、黑屏率以及開播失敗率等。指標數據會存到 Druid 提供後續相應的報警監控以及多維度的指標分析。同時還有一條流是進行直播 CDN 調度,也是經過 Flink Job 實時訓練、調整各 CDN 廠商的流量配比。以上 Kafka Topic 數據會同時落一份到 Hadoop 集羣,用於離線補償數據。實時計算跟離線補數據的過程共用同一份 Flink 代碼,針對不一樣的數據源,分別讀取 Kafka 數據或 HDFS 數據。

2.1 數據源控速

視頻應用場景下遇到的問題是:做業 DAG 比較複雜,同時從多個 Topic 讀取數據。一旦做業異常,做業失敗從較早狀態恢復,須要讀取部分歷史數據。此時,不一樣 Source 併發讀取數據速度不可控,會致使 Window 類算子 State 堆積、做業性能變差,最終致使做業恢復失敗。 另外,離線補數據,從不一樣 HDFS 文件讀數據一樣會遇到讀取數據不可控問題。在此以前,實時場景下臨時解決辦法是重置 GroupID 丟棄歷史數據,使得從最新位置開始消費。

針對該問題咱們但願從源頭控制多個 Source 併發讀取速度,因此設計了從 Source 源控速的策略。

Source 控速策略

Source 控速策略是 :

  • SourceTask 共享速度狀態提供給 JobManager。
  • JobManager 引入 SourceCoordinator,該 Coordinator 擁有全局速度視角,制定相應的策略,並將限速策略下發給 SourceTask。
  • SourceTask 根據 JobManager 下發的速度調節信息執行相應控速邏輯。
  • 一個小細節是 DAG 圖有子圖的話, 不一樣子圖 Source 源之間互相不影響。

Source 控速策略詳細細節

SourceTask 共享狀態

  • SourceTask 按期彙報狀態給 JobManager,默認 10 s 間隔。
  • 彙報內容爲。

協調中心 SourceCoordinator

  • 限速閾值:最快併發 Watermark - 最慢併發 Watermark > ∆t(默認 5 分鐘)。只要在達到限速閾值狀況下,才進行限速策略制定。
  • 全局預測:各併發 targetWatermark=base+speed*time;Coordinator 先進行全局預測,預測各併發接下來時間間隔能運行到的 Watermark 位置。
  • 全局決策:targetWatermark = 預測最慢 Watermark+∆t/2;Coordinator 根據全局預測結果,取預測最慢併發的 Watermark 值再浮動一個範圍做爲下個週期全侷限速決策的目標值。
  • 限速信息下發:。將全局決策的信息下發給全部的 Source task,限速信息包括下一個目標的時間和目標的 Watermark 位置。

以上圖爲例,A 時刻,4 個併發分別到達如圖所示位置,爲 A+interval 的時刻作預測,圖中藍色虛線爲預測各併發可以到達的位置,選擇最慢的併發的 Watermark 位置,浮動範圍值爲 Watermark + ∆t/2 的時間,圖中鮮紅色虛線部分爲限速的目標 Watermark,以此做爲全局決策發給下游 Task。

SourceTask 限速控制

  • SourceTask 獲取到限速信息後,進行限速控制。
  • 以 KafkaSource 爲例,KafkaFetcher 獲取數據時,根據限速信息 Check 當前進度,肯定是否須要限速等待。

該方案中,還有一些其餘考慮,例如:

  • 時間屬性:只針對 EventTime 狀況下進行限速執行。
  • 開關控制:支持做業開關控制是否開啓 Source 限速策略。
  • DAG 子圖 Source 源之間互相不影響。
  • 是否會影響 CheckPoint Barrier 下發。
  • 數據源發送速度不恆定,Watermark 突變狀況。

Source 控速結果

拿線上做業,使用 Kafka 從最先位置(2 days ago)開始消費。如上圖,不限速狀況下State 持續增大,最終做業掛掉。使用限速策略後,最開始 State 有緩慢上升,可是 State 大小可控,最終能平穩追上最新數據,並 State 持續在 40 G 左右。

2.2 JobManager 穩定性

關於 JobManager 穩定性,遇到了兩類 Case,表現均爲:JobManager 在大併發做業場景 WebUI 卡頓明顯,做業調度會超時。進一步分析了兩種場景下的問題緣由。

場景一,JobManager 內存壓力大問題。JobManager 須要控制刪除已完成的 Checkpoint 在 HDFS 上的路徑。在 NameNode 壓力大時,Completed CheckPoint 路徑刪除慢,致使CheckPoint Path 在內存中堆積。 原來刪除某一次 Checkpoint 路徑策略爲:每刪除目錄下一個文件,需 List 該目錄判斷是否爲空,如爲空將目錄刪除。在大的 Checkpoint 路徑下, List 目錄操做爲代價較大的操做。針對該邏輯進行優化,刪除文件時直接調用 HDFS delete(path,false) 操做,語義保持一致,而且開銷小。

場景二,該 Case 發生在 Yarn Cgroup 功能上線以後,JobManager G1 GC 過程變慢致使阻塞應用線程。AppMaster 申請 CPU 個數硬編碼爲1,在上線 Cgroup 以後可用的 CPU 資源受到限制。解決該問題的方法爲,支持 AppMaster 申請 CPU 個數參數化配置。

2.3 做業頻繁失敗

機器故障形成做業頻繁失敗,具體的場景也有兩種:

場景一:磁盤問題致使做業持續調度失敗。磁盤出問題致使一些 Buffer 文件找不到。又由於 TaskManager 不感知磁盤健康情況,會頻繁調度做業到該 TaskManager,做業頻繁失敗。

場景二:某臺機器有問題致使 TaskManager 在某臺機器上頻繁出 Core,陸續分配新的 TaskManager 到這臺機器上,致使做業頻繁失敗。

針對機器故障問題解決方法:

  • 針對磁盤問題,TaskManager 增長 DiskChecker 磁盤健康檢查,發現磁盤有問題 TaskManager 自動退出;
  • 針對有些機器頻繁出現 TaskManager 出現問題,根據必定的策略將有問題機器加到黑名單中,而後經過軟黑名單機制,告知 Yarn 儘可能不要調度 Container 到該機器。

3.平臺化建設

3.1 平臺建設:

快手的平臺化建設主要體如今青藤做業託管平臺。經過該平臺可進行做業操做、做業管理以及做業詳情查看等。做業操做包括提交、中止做業。做業管理包括管理做業存活、性能報警,自動拉起配置等;詳情查看,包括查看做業的各種 Metric 等。

上圖爲青藤做業託管平臺的一些操做界面。

3.2 問題定位流程優化:

咱們也常常須要給業務分析做業性能問題,幫助業務 debug 一些問題,過程相對繁瑣。因此該部分咱們也作了不少工做,儘可能提供更多的信息給業務,方便業務自主分析定位問題。首先,咱們將全部 Metric 入 Druid,經過 Superset 可從各個維度分析做業各項指標。第二,針對 Flink 的 WebUI 作了一些完善,支持 Web 實時打印 jstack,Web DAG 爲各 Vertex 增長序號,Subtask 信息中增長各併發 SubtaskId。第三,豐富異常信息提示,針對機器宕機等特定場景信息進行明確提示。第四,新增各類 Metric。

三.將來計劃

快手的將來規劃主要分爲兩個部分:

第一,目前在建設的 Flink SQL 相關工做。由於 SQL 可以減小用戶開發的成本,包括咱們如今也在對接實時數倉的需求,因此 Flink SQL 是咱們將來計劃的重要部分之一。
第二,咱們但願進行一些資源上的優化。目前業務在提做業時存在需求資源及併發預估不許確的狀況,可能會過多申請資源致使資源浪費。另外如何提高總體集羣資源的利用率問題,也是接下來須要探索的問題。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索