滴滴 Flink-1.10 升級之路

導讀:滴滴實時計算引擎從 Flink-1.4 無縫升級到 Flink-1.10 版本,作到了徹底對用戶透明。而且在新版本的指標、調度、SQL 引擎等進行了一些優化,在性能和易用性上相較舊版本都有很大提高。sql

這篇文章介紹了咱們升級過程當中遇到的困難和思考,但願能給你們帶來啓發。json

1、 背景

在本次升級以前,咱們使用的主要版本爲 Flink-1.4.2,而且在社區版本上進行了一些加強,提供了 StreamSQL 和低階 API 兩種服務形式。現有集羣規模達到了 1500 臺物理機,運行任務數超過 12000 ,日均處理數據 3 萬億條左右。數組

不過隨着社區的發展,尤爲是 Blink 合入 master 後有不少功能和架構上的升級,咱們但願能經過版本升級提供更好的流計算服務。今年 2 月份,里程碑版本 Flink-1.10 發佈,咱們開始在新版上上進行開發工做,踏上了充滿挑戰的升級之路。session

2、 Flink-1.10 新特性

做爲 Flink 社區至今爲止的最大的一次版本升級,加入的新特性解決了以前遇到不少的痛點。架構

1. 原生 DDL 語法與 Catalog 支持併發

Flink SQL 原生支持了 DDL 語法,好比 CREATE TABLE/CREATE FUNCTION,可使用 SQL 進行元數據的註冊,而不須要使用代碼的方式。函數

也提供了 Catalog 的支持,默認使用 InMemoryCatalog 將信息臨時保存在內存中,同時也提供了 HiveCatalog 能夠與 HiveMetastore 進行集成。也能夠經過本身拓展 Catalog 接口實現自定義的元數據管理。性能

2.Flink SQL 的加強單元測試

  • 基於 ROW_NUMBER 實現的 TopN 和去重語法,拓展了 StreamSQL 的使用場景。
  • 實現了 BinaryRow 類型做爲內部數據交互,將數據直接以二進制的方式構建而不是對象數組,好比使用一條數據中的某個字段時,能夠只反序列其中部分數據,減小了沒必要要的序列化開銷。
  • 新增了大量內置函數,例如字符串處理、FIRST/LAST_VALUE 等等,因爲不須要轉換爲外部類型,相較於自定義函數效率更高。
  • 增長了 MiniBatch 優化,經過微批的處理方式提高任務的吞吐

3.內存配置優化學習

以前對 Flink 內存的管理一直是一個比較頭疼的問題,尤爲是在使用 RocksDB 時,由於一個 TaskManager 中可能存在多個 RocksDB 實例,很差估算內存使用量,就致使常常發生內存超過限制被殺。

在新版上增長了一些內存配置,例如 state.backend.rocksdb.memory.fixed-per-slot 能夠輕鬆限制每一個 slot的RocksDB 內存的使用上限,避免了 OOM 的風險。

3、挑戰與應對

本次升級最大的挑戰是,如何保證 StreamSQL 的兼容性。StreamSQL 的目的就是爲了對用戶屏蔽底層細節,可以更加專一業務邏輯,而咱們能夠經過版本升級甚至更換引擎來提供更好的服務。保證任務的平滑升級是最基本的要求。

1. 內部 patch 如何兼容

因爲跨越多個版本架構差距巨大,內部 patch 基本沒法直接合入,須要在新版本上從新實現。咱們首先整理了全部的歷史 commit,篩選出那些必要的修改而且在新版上進行從新實現,目的是能覆蓋已有的全部功能,確保新版本能支持現有的全部任務需求。

例如:

  • 新增或修改 Connectors 以支持公司內部須要,例如 DDMQ(滴滴開源消息隊列產品),權限認證功能等。
  • 新增 Formats 實現,例如 binlog,內部日誌採集格式的解析等。
  • 增長 ADD JAR 語法,能夠在 SQL 任務中引用外部依賴,好比 UDF JAR,自定義 Source/Sink。
  • 增長 SET 語法,能夠在 SQL 中設置 TableConfig,指導執行計劃的生成

2. StreamSQL 語法兼容

社區在 1.4 版本時,FlinkSQL還處於比較初始的階段,也沒有原生的 DDL 語法支持,咱們使用 Antlr 實現了一套自定義的 DDL 語法。可是在 Flink1.10 版本上,社區已經提供了原生的 DDL 支持,並且與咱們內部的語法差異較大。如今擺在咱們面前有幾條路能夠選擇:

  • 放棄內部語法的支持,修改所有任務至新語法。(違背了平滑遷移的初衷,並且對已有用戶學習成本高)
  • 修改 Flink 內語法解析的模塊(sql-parser),支持對內部語法的解析。(實現較爲複雜,且不利於後續的版本升級)
  • 在 sql-parser 之上封裝一層語法轉換層,將本來的 SQL 解析提取有效信息後,經過字符串拼接的方式組織成社區語法再運行。

最終咱們選用了第三種方案,這樣能夠最大限度的減小和引擎的耦合,做爲插件運行,將來再有引擎升級徹底能夠複用現有的邏輯,可以下降不少的開發成本。

例如:咱們在舊版本上使用 "json-path" 的庫實現了 json 解析,經過在建表語句裏定義相似 $.status 的表達式表示如何提取此字段。

新版本上原生的 json 類型解析可使用 ROW 類型來表示嵌套結構,在轉換爲新語法的過程當中,將本來的表達是解析爲樹並構建出新的字段類型,再使用計算列的方式提取出原始表中的字段,確保表結構與以前一致。類型名稱、配置屬性也經過映射轉換爲社區語法。

3. 兼容性測試

最後是測試階段,須要進行完善的測試確保全部任務都能作到平滑升級。咱們本來的計劃是準備進行迴歸測試,對已有的全部任務替換配置後進行回放,可是在實際操做中有不少問題:

  • 測試流程過長,一次運行可能須要數個小時。
  • 出現問題時很差定位,可能發生在任務的整個生命週期的任何階段。
  • 沒法驗證計算結果,即新舊版本語義是否一致

因此咱們按任務的提交流程分紅多個階段進行測試,只有在當前階段可以所有測試經過後後進入下一個階段測試,提早發現問題,將問題定位範圍縮小到當前階段,提升測試效率。

  • 轉換測試:對全部任務進行轉換,測試結果符合預期,抽象典型場景爲單元測試。
  • 編譯測試:確保全部任務能夠經過 TablePlanner 生成執行計劃,再編譯成 JobGraph,真正提交運行前結束。
  • 迴歸測試:在測試環境對任務替換配置後進行回放,確認任務能夠提交運行
  • 對照測試:對採樣數據以文件的形式提交至新舊兩個版本中運行,對比結果是否徹底一致(由於部分任務結果不具備肯定性,因此使用舊版本連續運行 2 次,篩選出肯定性任務,做爲測試用例)

4、引擎加強

除了對舊版本的兼容,咱們也結合了新版本的特性,對引擎進行了加強。

1. Task-Load 指標

咱們一直但願能精確衡量任務的負載情況,使用反壓指標指標只能粗略的判斷任務的資源夠或者不夠。

結合新版的 Mailbox 線程模型,全部互斥操做所有運行在 TaskThread 中,只需統計出線程的佔用時間,就能夠精確計算任務負載的百分比。

將來可使用指標進行任務的資源推薦,讓任務負載維持在一個比較健康的水平。

2. SubTask 均衡調度

在 FLIP-6 後,Flink 修改了資源調度模型,移除了--container 參數,slot 按需申請確保不會有閒置資源。可是這也致使了一個問題,Source 的併發數經常是小於最大併發數的,而 SubTask 調度是按 DAG 的拓撲順序調度,這樣 SourceTask 就會集中在某些 TaskManager 中致使熱點。

咱們加入了"最小 slot 數"的配置,保證在 Flink session 啓動後當即申請相應數量的 slot,且閒置時也不主動退出,搭配 cluster.evenly-spread-out-slots 參數能夠保證在 slot 數充足的狀況下,SubTask 會均勻分佈在全部的 TaskManager 上。

3. 窗口函數加強

以滾動窗口爲例 TUMBLE(time_attr, INTERVAL '1' DAY),窗口爲一天時開始和結束時間固定爲天天 0 點 -24 點,沒法作到生產天天 12 點-第二天 12 點的窗口。

對於代碼能夠經過指定偏移量實現,可是 SQL 目前還未實現,經過增長參數 TUMBLE(time_attr, INTERVAL '1' DAY, TIME '12:00:00') 表示偏移時間爲 12 小時。

還有另一種場景,好比統計一天的 UV,同時但願展現當前時刻的計算結果,例如每分鐘觸發窗口計算。對於代碼開發的方式能夠經過自定義 Trigger 的方式決定窗口的觸發邏輯,並且 Flink 也內置了一些 Tigger 實現,好比 ContinuousTimeTrigger 就很適合這種場景。因此咱們又在窗口函數裏增長了一種可選參數,表明窗口的觸發週期,TUMBLE(time_attr, INTERVAL '1' DAY, INTERVAL '1' MINUTES) 。

經過增長 offset 和 tiggger 週期參數(TUMBLE(time_attr, size[,offset_time][,trigger_interval])),拓展了 SQL 中窗口的使用場景,相似上面的場景能夠直接使用 SQL 開發而不須要使用代碼的方式。

4. RexCall 結果複用

在不少 SQL 的使用場景裏,會屢次使用上一個計算結果,好比將 JSON 解析成 Map 並提取多個字段 。

雖然經過子查詢,看起來 json 解析只調用一次,可是通過引擎的優化後,經過結果表的投影 (Projection) 生成函數調用鏈 (RexCall),結果相似:

這樣會致使 json 解析的計算重複運行了3次,即便使用視圖分割成兩步操做,通過 Planner 的優化同樣會變成上邊的樣子。

對於肯定性 (isDeterministic=true) 的函數來講,相同的輸入必定表明相同的結果,重複執行 3 次 json 解析實際上是沒有意義的,如何優化才能實現對函數結果的複用呢?

在代碼生成時,將 RexCall 生成的惟一標識(Digest)和變量符號的映射保存在 CodeGenContext 中,若是遇到 Digest 相同的函數調用,則能夠複用已經存在的結果變量,這樣解析 JSON 只須要執行第一次,以後就能夠複用第一次的結果。

5、總結

經過幾個月的努力,新版本已經上線運行,而且做爲 StreamSQL 的默認引擎,任務重啓後直接使用新版本運行。兼容性測試的經過率達到 99.9%,能夠基本作到對用戶的透明升級。對於新接觸 StreamSQL 用戶可使用社區 SQL 語法進行開發,已有任務也能夠修改 DML 部分語句來使用新特性。如今新版本已經支持了公司內許多業務場景,例如公司實時數據倉庫團隊依託於新版本更強的表達能力和性能,承接了多種多樣的數據需求作到穩定運行且與離線口徑保持一致。

版本升級不是咱們的終點,隨着實時計算的發展,公司內也有愈來愈多的團隊須要使用 Flink 引擎, 也向咱們提出了更多的挑戰,例如與 Hive 的整合作到將結果直接寫入 Hive 或直接使用 Flink 做爲批處理引擎,這些也是咱們探索和發展的方向,經過不斷的迭代向用戶提供更加簡單好用的流計算服務。

做者|Alan

原文連接

本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索