7月7日,Flink 1.11.0 正式發佈了,做爲這個版本的 release manager 之一,我想跟你們分享一下其中的經歷感覺以及一些表明性 feature 的解讀。在進入深度解讀前,咱們先簡單瞭解下社區發佈的通常流程,幫助你們更好的理解和參與 Flink 社區的工做。數據庫
Flink 1.11.0 從 3 月初的功能規劃到 7 月初的正式發佈,歷經了差很少 4 個月的時間,對 Flink 的生態、易用性、生產可用性、穩定性等方面都進行了加強和改善,下面將一一跟你們分享。json
Flink 1.11.0 從 Feature 凍結後發佈了 4 次 Candidate 才最終經過。經統計,一共有 236 個貢獻者參與了此次版本開發,解決了 1474 個 Jira 問題,涉及 30 多個 FLIP,提交了 2325 個 Commit。緩存
縱觀近五次版本發佈,能夠看出從 1.9.0 開始 Flink 進入了一個快速發展階段,各個維度指標相比以前都有了幾乎翻倍的提升。也是從 1.9.0 開始阿里巴巴內部的 Blink 項目開始被開源 Flink 整合,到 1.10.0 通過兩個大版本已經所有整合完畢,對 Flink 從生態建設、功能性、性能和生產穩定性上都有了大幅的加強。 網絡
Flink 1.11.0 版本的最初定位是重點解決易用性問題,提高用戶業務的生產使用體驗,總體上不作大的架構調整和功能開發,傾向於快速迭代的小版本開發。可是從上面統計的各個指標來看,所謂的「小版本」在各個維度的數據也絲絕不遜色於前兩個大版本,解決問題的數量和參與的貢獻者人數也在持續增長,其中來自中國的貢獻者比例達到 62%。架構
下面咱們會深度剖析 Flink 1.11.0 帶來了哪些讓你們期待已久的特性,從用戶直接使用的 API 層一直到執行引擎層,咱們都會選擇一些有表明性的 Feature 從不一樣維度解讀,更完整的 Feature 列表請你們關注發佈的 Release Blog。併發
這兩個維度在某種程度上是相輔相成的,很難嚴格區分開,生態兼容上的缺失經常形成使用上的不便,提高易用性的過程每每也是不斷完善相關生態的過程。在這方面用戶感知最明顯的應該就是 Table & SQL API 層面的使用。app
CDC 被普遍使用在複製數據、更新緩存、微服務間同步數據、審計日誌等場景,不少公司都在使用開源的 CDC 工具,如 MySQL CDC。經過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個強需求,在過往的不少討論中都被說起過,能夠幫助用戶以實時的方式處理 Changelog 流,進一步擴展 Flink 的應用場景,例如把 MySQL 中的數據同步到 PG 或 ElasticSearch 中,低延時的 Temporal Join 一個 Changelog 等。框架
除了考慮到上面的真實需求,Flink 中定義的「Dynamic Table」概念在流上有兩種模型:Append 模式和 Update 模式。經過 Append 模式把流轉化爲「Dynamic Table」在以前的版本中已經支持,所以在 1.11.0 中進一步支持 Update 模式也從概念層面完整的實現了「Dynamic Table」。機器學習
爲了支持解析和輸出 Changelog,如何在外部系統和 Flink 系統之間編解碼這些更新操做是首要解決的問題。考慮到 Source 和 Sink 是銜接外部系統的一個橋樑,所以 FLIP-95 在定義全新的 Table Source 和 Table Sink 接口時解決了這個問題。異步
在公開的 CDC 調研報告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 Changelog 到其它的系統中,如消息隊列。據此,FLIP-105 首先支持了 Debezium 和 Canal 這兩種格式,並且 Kafka Source 也已經能夠支持解析上述格式並輸出更新事件,在後續的版本中會進一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table ( ...) WITH ( 'connector'='...', -- e.g. 'kafka' 'format'='debezium-json', 'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema) 'debezium-json.ignore-parse-errors'='true' -- default: false );
1.11.0 以前,用戶若是依賴 Flink 的 Source/Sink 讀寫關係型數據庫或讀取 Changelog 時,必需要手動建立對應的 Schema。並且當數據庫中的 Schema 發生變化時,也須要手動更新對應的 Flink 做業以保持一致和類型匹配,任何不匹配都會形成運行時報錯使做業失敗。用戶常常抱怨這個看似冗餘且繁瑣的流程,體驗極差。
實際上對於任何和 Flink 鏈接的外部系統均可能有相似的上述問題,在 1.11.0 中重點解決了和關係型數據庫對接的這個問題。FLIP-93 提供了 JDBC catalog 的基礎接口以及 Postgres catalog 的實現,這樣方便後續實現與其它類型的關係型數據庫的對接。
1.11.0 版本後,用戶使用 Flink SQL 時能夠自動獲取表的 Schema 而再也不須要輸入 DDL。除此以外,任何 Schema 不匹配的錯誤都會在編譯階段提早進行檢查報錯,避免了以前運行時報錯形成的做業失敗。這是提高易用性和用戶體驗的一個典型例子。
從 1.9.0 版本開始 Flink 從生態角度致力於集成 Hive,目標打造批流一體的 Hive 數倉。通過前兩個版本的迭代,已經達到了 Batch 兼容且生產可用,在 TPC-DS 10T Benchmark 下性能達到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生態中重點實現了實時數倉方案,改善了端到端流式 ETL 的用戶體驗,達到了批流一體 Hive 數倉的目標。同時在兼容性、性能、易用性方面也進一步進行了增強。
在實時數倉的解決方案中,憑藉 Flink 的流式處理優點作到實時讀寫 Hive:
在 Hive 可用性方面的提高:
在 Hive 性能方面,1.10.0 中已經支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中咱們補全了全部版本的 Parquet 和 ORC 向量化支持來提高性能。
前面也提到過,Source 和 Sink 是 Flink 對接外部系統的一個橋樑,對於完善生態、可用性及端到端的用戶體驗是很重要的環節。社區早在一年前就已經規劃了 Source 端的完全重構,從 FLIP-27 的 ID 就能夠看出是很早的一個 Feature。可是因爲涉及到不少複雜的內部機制和考慮到各類 Source Connector 的實現,設計上須要考慮的很全面。從 1.10.0 就開始作 POC 的實現,最終遇上了 1.11.0 版本的發佈。
先簡要回顧下 Source 以前的主要問題:
FLIP-27 在設計時充分考慮了上述的痛點:
目前 Flink 已有的 Source Connector 會在後續的版本中基於新架構來從新實現,Legacy Source 也會繼續維護幾個版本保持兼容性,用戶也能夠按照 Release 文檔中的說明來嘗試體驗新 Source 的開發。
衆所周知,Python 語言在機器學習和數據分析領域有着普遍的使用。Flink 從 1.9.0 版本開始發力兼容 Python 生態,Python 和 Flink 協力爲 PyFlink,把 Flink 的實時分佈式處理能力輸出給 Python 用戶。前兩個版本 PyFlink 已經支持了 Python Table API 和 UDF,在 1.11.0 中擴大對 Python 生態庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時 Python UDF 性能有了極大的提高。
具體來講,以前普通的 Python UDF 每次調用只能處理一條數據,並且在 Java 端和 Python 端都須要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 做業中自定義和使用向量化 Python UDF,用戶只須要在 UDF 修飾中額外增長一個參數 udf_type=「pandas」 便可。這樣帶來的好處是:
除此以外,1.11.0 中 PyFlink 還支持:
上述解讀的都是側重 API 層面,用戶開發做業能夠直接感知到的易用性的提高。下面咱們看看執行引擎層在 1.11.0 中都有哪些值得關注的變化。
1.11.0 版本前,Flink 主要支持以下兩種模式運行:
以上兩種模式的共同問題是須要在客戶端執行用戶代碼,編譯生成對應的 Job Graph 提交到集羣運行。在這個過程須要下載相關 Jar 包並上傳到集羣,客戶端和網絡負載壓力容易成爲瓶頸,尤爲當一個客戶端被多個用戶共享使用。
1.11.0 中引入了 Application 模式(FLIP-85)來解決上述問題,按照 Application 粒度來啓動一個集羣,屬於這個 Application 的全部 Job 在這個集羣中運行。核心是 Job Graph 的生成以及做業的提交不在客戶端執行,而是轉移到 JM 端執行,這樣網絡下載上傳的負載也會分散到集羣中,再也不有上述 Client 單點上的瓶頸。
用戶能夠經過 bin/flink run-application 來使用 Application 模式,目前 Yarn 和 Kubernetes(K8s)都已經支持這種模式。Yarn application 會在客戶端將運行做業須要的依賴都經過 Yarn Local Resource 傳遞到 JM。K8s Application 容許用戶構建包含用戶 Jar 與依賴的鏡像,同時會根據做業自動建立 TM,並在結束後銷燬整個集羣,相比 Session 模式具備更好的隔離性。K8s 再也不有嚴格意義上的 Per-Job 模式,Application 模式至關於 Per-Job 在集羣進行提交做業的實現。
除了支持 Application 模式,Flink 原生 K8s 在 1.11.0 中還完善了不少基礎的功能特性(FLINK-14460),以達到生產可用性的標準。例如 Node Selector、Label、Annotation、Toleration 等。爲了更方便的與 Hadoop 集成,也支持根據環境變量自動掛載 Hadoop 配置的功能。
Checkpoint 和 Savepoint 機制一直是 Flink 保持先進性的核心競爭力之一,社區在這個領域的改動很謹慎,最近的幾個大版本中幾乎沒有大的功能和架構上的調整。在用戶郵件列表中,咱們常常能看到用戶反饋和抱怨的相關問題:好比 Checkpoint 長時間作不出來失敗,Savepoint 在做業重啓後不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提升生產可用性和穩定性。
1.11.0 以前, Savepoint 中 Meta 數據和 State 數據分別保存在兩個不一樣的目錄中,這樣若是想遷移 State 目錄很難識別這種映射關係,也可能致使目錄被誤刪除,對於目錄清理也一樣有麻煩。1.11.0 把兩部分數據整合到一個目錄下,這樣方便總體轉移和複用。另外,以前 Meta 引用 State 採用的是絕對路徑,這樣 State 目錄遷移後路徑發生變化也不可用,1.11.0 把 State 引用改爲了相對路徑解決了這個問題(FLINK-5763),這樣 Savepoint 的管理維護、複用更加靈活方便。
實際生產環境中,用戶常常遭遇 Checkpoint 超時失敗、長時間不能完成帶來的困擾。一旦做業 failover 會形成回放大量的歷史數據,做業長時間沒有進度,端到端的延遲增長。1.11.0 從不一樣維度對 Checkpoint 的優化和提速作了改進,目標實現分鐘甚至秒級的輕量型 Checkpoint。
首先,增長了 Checkpoint Coordinator 通知 Task 取消 Checkpoint 的機制(FLINK-8871),這樣避免 Task 端還在執行已經取消的 Checkpoint 而對系統帶來沒必要要的壓力。同時 Task 端放棄已經取消的 Checkpoint,能夠更快的參與執行 Coordinator 新觸發的 Checkpoint,某種程度上也能夠避免新 Checkpoint 再次執行超時而失敗。這個優化也對後面默認開啓 Local Recovery 提供了便利,Task 端能夠及時清理失效 Checkpoint 的資源。
其次,在反壓場景下,整個數據鏈路堆積了大量 Buffer,致使 Checkpoint Barrier 排在數據 Buffer 後面,不能被 Task 及時處理對齊,也就致使了 Checkpoint 長時間不能執行。1.11.0 中從兩個維度對這個問題進行解決:
1)嘗試減小數據鏈路中的 Buffer 總量(FLINK-16428),這樣 Checkpoint Barrier 能夠儘快被處理對齊。
這個優化有一部分工做已經在 1.11.0 中完成,剩餘部分會在下個版本繼續推動完成。
2)實現了全新的 Unaligned Checkpoint 機制(FLIP-76)從根本上解決了反壓場景下 Checkpoint Barrier 對齊的問題。實際上這個想法早在 1.10.0 版本以前就開始醞釀設計,因爲涉及到不少模塊的大改動,實現機制和線程模型也很複雜。咱們實現了兩種不一樣方案的原型 POC 進行了測試、性能對比,肯定了最終的方案,所以直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執行引擎層惟一的一個重量級 Feature。其基本思想能夠歸納爲:
Unaligned Checkpoint 在反壓嚴重的場景下能夠明顯加速 Checkpoint 的完成時間,由於它再也不依賴於總體的計算吞吐能力,而和系統的存儲性能更加相關,至關於計算和存儲的解耦。可是它的使用也有必定的侷限性,它會增長總體 State 的大小,對存儲 IO 帶來額外的開銷,所以在 IO 已是瓶頸的場景下就不太適合使用 Unaligned Checkpoint 機制。
1.11.0 中 Unaligned Checkpoint 尚未做爲默認模式,須要用戶手動配置來開啓,而且只在 Exactly-Once 模式下生效。但目前還不支持 Savepoint 模式,由於 Savepoint 涉及到做業的 Rescale 場景,Channel State 目前還不支持 State 拆分,在後面的版本會進一步支持,因此 Savepoint 目前仍是會使用以前的 Aligned 模式,在反壓場景下有可能須要很長時間才能完成。
Flink 1.11.0 版本的開發過程當中,咱們看到愈來愈多來自中國的貢獻者參與到核心功能的開發中,見證了 Flink 在中國的生態發展愈來愈繁榮,好比來自騰訊公司的貢獻者參與了 K8s、Checkpoint 等功能開發,來自字節跳動公司的貢獻者參與了 Table & SQL 層以及引擎網絡層的一些開發。但願更多的公司可以參與到 Flink 開源社區中,分享在不一樣領域的經驗,使 Flink 開源技術一直保持先進性,可以普惠到更多的受衆。
通過 1.11.0 「小版本」的短暫調整,Flink 正在醞釀下一個大版本的 Feature,相信必定會有不少重量級的特性登場,讓咱們拭目以待!