董亭亭,快手大數據架構實時計算引擎團隊負責人。目前負責 Flink 引擎在快手內的研發、應用以及周邊子系統建設。2013 年畢業於大連理工大學,曾就任於奇虎 360、58 集團。主要研究領域包括:分佈式計算、調度系統、分佈式存儲等系統。後端
本次的分享包括如下三個部分:緩存
快手計算鏈路是從 DB/Binlog 以及 WebService Log 實時入到 Kafka 中,而後接入 Flink 作實時計算,其中包括實時 ETL、實時分析、Interval Join 以及實時訓練,最後的結果存到 Druid、ES 或者 HBase 裏面,後面接入一些數據應用產品;同時這一份 Kafka 數據實時 Dump 一份到 Hadoop 集羣,而後接入離線計算。性能優化
Flink 在快手應用的類別主要分爲三大類:架構
Flink 在快手應用的典型場景包括:併發
快手目前集羣規模有 1500 臺左右,做業數量大約是 500 左右,日處理條目數總共有 1.7 萬億,峯值處理條目數大約是 3.7 千萬。集羣部署都是 On Yarn 模式,分爲離線集羣和實時集羣兩類集羣,其中離線集羣混合部署,機器經過標籤進行物理隔離,實時集羣是 Flink 專用集羣,針對隔離性、穩定性要求極高的業務部署。框架
快手 Flink 技術演進主要分爲三部分:運維
Interval Join 在快手的一個應用場景是廣告展示點擊流實時 Join 場景:打開快手 App 可能會收到廣告服務推薦的廣告視頻,用戶有時會點擊展示的廣告視頻。這樣在後端造成兩份數據流,一份是廣告展示日誌,一份是客戶端點擊日誌。這兩份數據需進行實時 Join,將 Join 結果做爲樣本數據用於模型訓練,訓練出的模型會被推送到線上的廣告服務。該場景下展示之後 20 分鐘的點擊被認爲是有效點擊,實時 Join 邏輯則是點擊數據 Join 過去 20 分鐘展示。其中,展示流的數據量相對比較大,20 分鐘數據在 1 TB 以上。最初實時 Join 過程是業務本身實現,經過 Redis 緩存廣告展示日誌,Kafka 延遲消費客戶端點擊日誌實現 Join 邏輯,該方式缺點是實時性不高,而且隨着業務增加須要堆積更多機器,運維成本很是高。基於 Flink 使用 Interval Join 完美契合此場景,而且實時性高,可以實時輸出 Join 後的結果數據,對業務來講維護成本很是低,只須要維護一個 Flink 做業便可。分佈式
1.2.1 Interval Join 原理:oop
Flink 實現 Interval join 的原理:兩條流數據緩存在內部 State 中,任意一數據到達,獲取對面流相應時間範圍數據,執行 joinFunction 進行 Join。隨着時間的推動,State 中兩條流相應時間範圍的數據會被清理。性能
在前面提到的廣告應用場景 Join 過去 20 分鐘數據,假設兩個流的數據徹底有序到達,Stream A 做爲展示流緩存過去 20 分鐘數據,Stream B 做爲點擊流每來一條數據到對面 Join 過去 20 分鐘數據便可。
Flink 實現 Interval Join:
KeyedStreamA.intervalJoin(KeyedStreamB) .between(Time.minutes(0),Time.minutes(20)) .process(joinFunction)
1.2.2 狀態存儲策略選擇
關於狀態存儲策略選擇,生產環境狀態存儲 Backend 有兩種方式:
在 Interval join 場景下,RocksDB 狀態存儲方式是將兩個流的數據存在兩個 Column Family 裏,RowKey 根據 keyGroupId+joinKey+ts 方式組織。
1.2.3 RocksDB 訪問性能問題
Flink 做業上線遇到的第一個問題是 RocksDB 訪問性能問題,表現爲:
進一步對問題分析,發現:該場景下,Flink 內部基於 RocksDB State 狀態存儲時,獲取某個 Join key 值某段範圍的數據,是經過前綴掃描的方式獲取某個 Join key 前綴的 entries 集合,而後再判斷哪些數據在相應的時間範圍內。前綴掃描的方式會致使掃描大量的無效數據,掃描的數據大多緩存在 PageCache 中,在 Decode 數據判斷數據是否爲 Delete 時,消耗大量 CPU。
以上圖場景爲例,藍色部分爲目標數據,紅色部分爲上下邊界以外的數據,前綴掃描時會過多掃描紅色部分無用數據,在對該大量無效數據作處理時,將單線程 CPU 消耗盡。
1.2.4 針對 RocksDB 訪問性能優化
快手在 Interval join 該場景下對 RocksDB 的訪問方式作了如下優化:
優化後的效果:P99 查詢時延性能提高 10 倍,即 nextKey 獲取 RocksDB 一條數據, P99 時延由 1000 毫秒到 100 毫秒之內。 做業吞吐反壓問題進而獲得解決。
1.2.5 RocksDB 磁盤壓力問題
Flink 做業上線遇到的第二個問題是隨着業務的增加, RocksDB 所在磁盤壓力即將達到上限,高峯時磁盤 util 達到 90%,寫吞吐在 150 MB/s。詳細分析發現,該問題是由如下幾個緣由疊加致使:
針對 RocksDB 磁盤壓力,快手內部作了如下優化:
首先介紹下視頻質量監控調度應用背景,有多個 Kafka Topic 存儲短視頻、直播相關質量日誌,包括短視頻上傳/下載、直播觀衆端日誌,主播端上報日誌等。Flink Job 讀取相應 Topic 數據實時統計各種指標,包括播放量、卡頓率、黑屏率以及開播失敗率等。指標數據會存到 Druid 提供後續相應的報警監控以及多維度的指標分析。同時還有一條流是進行直播 CDN 調度,也是經過 Flink Job 實時訓練、調整各 CDN 廠商的流量配比。以上 Kafka Topic 數據會同時落一份到 Hadoop 集羣,用於離線補償數據。實時計算跟離線補數據的過程共用同一份 Flink 代碼,針對不一樣的數據源,分別讀取 Kafka 數據或 HDFS 數據。
視頻應用場景下遇到的問題是:做業 DAG 比較複雜,同時從多個 Topic 讀取數據。一旦做業異常,做業失敗從較早狀態恢復,須要讀取部分歷史數據。此時,不一樣 Source 併發讀取數據速度不可控,會致使 Window 類算子 State 堆積、做業性能變差,最終致使做業恢復失敗。 另外,離線補數據,從不一樣 HDFS 文件讀數據一樣會遇到讀取數據不可控問題。在此以前,實時場景下臨時解決辦法是重置 GroupID 丟棄歷史數據,使得從最新位置開始消費。
針對該問題咱們但願從源頭控制多個 Source 併發讀取速度,因此設計了從 Source 源控速的策略。
Source 控速策略
Source 控速策略是 :
Source 控速策略詳細細節
SourceTask 共享狀態
協調中心 SourceCoordinator
以上圖爲例,A 時刻,4 個併發分別到達如圖所示位置,爲 A+interval 的時刻作預測,圖中藍色虛線爲預測各併發可以到達的位置,選擇最慢的併發的 Watermark 位置,浮動範圍值爲 Watermark + ∆t/2 的時間,圖中鮮紅色虛線部分爲限速的目標 Watermark,以此做爲全局決策發給下游 Task。
SourceTask 限速控制
該方案中,還有一些其餘考慮,例如:
Source 控速結果
拿線上做業,使用 Kafka 從最先位置(2 days ago)開始消費。如上圖,不限速狀況下State 持續增大,最終做業掛掉。使用限速策略後,最開始 State 有緩慢上升,可是 State 大小可控,最終能平穩追上最新數據,並 State 持續在 40 G 左右。
關於 JobManager 穩定性,遇到了兩類 Case,表現均爲:JobManager 在大併發做業場景 WebUI 卡頓明顯,做業調度會超時。進一步分析了兩種場景下的問題緣由。
場景一,JobManager 內存壓力大問題。JobManager 須要控制刪除已完成的 Checkpoint 在 HDFS 上的路徑。在 NameNode 壓力大時,Completed CheckPoint 路徑刪除慢,致使CheckPoint Path 在內存中堆積。 原來刪除某一次 Checkpoint 路徑策略爲:每刪除目錄下一個文件,需 List 該目錄判斷是否爲空,如爲空將目錄刪除。在大的 Checkpoint 路徑下, List 目錄操做爲代價較大的操做。針對該邏輯進行優化,刪除文件時直接調用 HDFS delete(path,false) 操做,語義保持一致,而且開銷小。
場景二,該 Case 發生在 Yarn Cgroup 功能上線以後,JobManager G1 GC 過程變慢致使阻塞應用線程。AppMaster 申請 CPU 個數硬編碼爲1,在上線 Cgroup 以後可用的 CPU 資源受到限制。解決該問題的方法爲,支持 AppMaster 申請 CPU 個數參數化配置。
機器故障形成做業頻繁失敗,具體的場景也有兩種:
場景一:磁盤問題致使做業持續調度失敗。磁盤出問題致使一些 Buffer 文件找不到。又由於 TaskManager 不感知磁盤健康情況,會頻繁調度做業到該 TaskManager,做業頻繁失敗。
場景二:某臺機器有問題致使 TaskManager 在某臺機器上頻繁出 Core,陸續分配新的 TaskManager 到這臺機器上,致使做業頻繁失敗。
針對機器故障問題解決方法:
快手的平臺化建設主要體如今青藤做業託管平臺。經過該平臺可進行做業操做、做業管理以及做業詳情查看等。做業操做包括提交、中止做業。做業管理包括管理做業存活、性能報警,自動拉起配置等;詳情查看,包括查看做業的各種 Metric 等。
上圖爲青藤做業託管平臺的一些操做界面。
咱們也常常須要給業務分析做業性能問題,幫助業務 debug 一些問題,過程相對繁瑣。因此該部分咱們也作了不少工做,儘可能提供更多的信息給業務,方便業務自主分析定位問題。首先,咱們將全部 Metric 入 Druid,經過 Superset 可從各個維度分析做業各項指標。第二,針對 Flink 的 WebUI 作了一些完善,支持 Web 實時打印 jstack,Web DAG 爲各 Vertex 增長序號,Subtask 信息中增長各併發 SubtaskId。第三,豐富異常信息提示,針對機器宕機等特定場景信息進行明確提示。第四,新增各類 Metric。
快手的將來規劃主要分爲兩個部分:
第一,目前在建設的 Flink SQL 相關工做。由於 SQL 可以減小用戶開發的成本,包括咱們如今也在對接實時數倉的需求,因此 Flink SQL 是咱們將來計劃的重要部分之一。
第二,咱們但願進行一些資源上的優化。目前業務在提做業時存在需求資源及併發預估不許確的狀況,可能會過多申請資源致使資源浪費。另外如何提高總體集羣資源的利用率問題,也是接下來須要探索的問題。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。