58 集團大規模 Storm 任務平滑遷移至 Flink 的祕密

Flink-Storm 是 Flink 官方提供的用於 Flink 兼容 Storm 程序 beta 工具,而且在 Release 1.8 以後去掉相關代碼。本文主要講述 58 實時計算平臺如何優化 Flink-Storm 以及基於 Flink-Storm 實現真實場景下大規模 Storm 任務平滑遷移 Flink。後端

背景

58 實時計算平臺旨在爲集團業務部門提供穩定高效實時計算服務,主要基於 Storm 和 Spark Streaming 構建,但在使用過程當中也面臨一些問題,主要包括 Storm 在吞吐量不足以及多集羣帶來運維問題,Spark Streaming 又沒法知足低延遲的要求。Apache Flink 開源以後,其在架構設計、計算性能和穩定性上體現出的優點,使咱們決定採用 Flink 做爲新一代實時計算平臺的計算引擎。同時基於 Flink 開發了一站式高性能實時計算平臺 Wstream,支持 Flink jar,Stream Sql,Flink-Storm 等多樣化任務構建方式。架構

在完善 Flink 平臺建設的同時,咱們也啓動 Storm 任務遷移 Flink 計劃,旨在提高實時計算平臺總體效率,減小機器成本和運維成本。app

Storm vs Flink

儘管 Flink 做爲高性能計算引擎能夠很好兼容 Storm,但在業務遷移過程當中,咱們仍然遇到了一些問題:運維

1.用戶對 Flink 的學習成本;maven

  1. 從新基於 Flink 開發耗費工做量;
  2. Stream-SQL 雖然能夠知足快速開發減小學習成本和開發工做量但沒法知足一些複雜場景。

所以咱們決定採用 Flink 官方提供的 Flink-Storm 進行遷移,在保障遷移穩定性同時無需用戶修改 Storm 代碼邏輯。工具

Flink-Storm 原理

經過 Storm 原生 TopologyBuilder 構建好 Storm topology。性能

FlinkTopology.createTopology(builder) 將 StormTopology 轉換爲 Flink 對應的 Streaming Dataflow。學習

SpoutWrapper 用於將 spout 轉換爲 RichParallelSourceFunction,spout 的OutputFields轉換成 source 的T ypeInformation。大數據

BoltWrapper 用於將 bolt 轉換成對應的 operator,其中 grouping 轉換爲對 spout 的 DataStream 的對應操做。優化

構建完 FlinkTopology 以後,就能夠經過 StreamExecutionEnvironment 生成 StreamGraph 獲取 JobGraph,以後將 JobGraph 提交到 Flink 運行時環境。

實踐

Flink-Storm 做爲官方提供 Flink 兼容 Storm 程序爲咱們實現無縫遷移提供了可行性,可是做爲 beta 版本,在實際使用過程當中存在不少沒法知足現實場景的狀況,主要包括版本,功能 bug,複雜邏輯兼容,沒法支持 yarn 等,下面將主要分爲平臺層面和用戶層面講述咱們的使用和改進。

平臺層面

1. 版本

當前線上使用 Apache Flink 1.6 版本,Flink-Storm 模塊基於 Storm 1.0 開發,咱們平臺運行 Storm 版本爲 0.9.5 和 1.2 。

1.1 對於 Storm 1.2 運行任務,Storm 1.0 API 徹底兼容 1.2 版本,所以只需切換 Flink-Storm 模塊依賴的 storm-core 到 1.2.

1.2 對於 Storm 0.9.5 任務,因爲 Storm 1.0 API 沒法兼容 0.9.5,須要修改依賴 storm-core 爲 0.9.5,同時修改 Flink-Storm 模塊中全部與 Storm 相關的 API,主要是切換 package 路徑。

1.3 從新構建 flink-storm 包 mvn clean package -Dmaven.test.skip=true -Dcheckstyle.skip=true

2.功能

2.1 傳遞語義保證

Storm 使用 ACK 機制來實現傳遞語義保證,咱們沒有將 Storm 的 ACK 機制移植到Flink-Storm。所以,某些依賴 ACK 機制的功能會受到限制。好比,Kafka spout 將消費狀態存儲在 ZK,狀態的更新須要依賴 ACK 機制,tuple 樹結束後,spout 纔會觸發狀態更新,表示這條消息已經被徹底處理,從而實現 at least once 的傳遞保證。Storm 也提供了at most once 的支持,spout 發送消息後,無需等待 tuple 樹結束直接觸發狀態更新。咱們使用了 Storm 的實現 at most once 的方式,在 Kafka spout 實現 at most once 的基礎上,經過實現 Flink Checkpoint 的狀態機制,實現了 Flink-storm 任務的 at least once。Storm 任務遷移到 Flink,傳遞保證不變。

2.2 tick tuple 機制

Storm 使用 tick tuple 機制實現定時功能,消息超時重發、Bolt 定時觸發等功能都要依賴 tick tuple 機制。Storm 0.9.5 版本沒有實現窗口功能,用戶可使用 tick tuple 機制簡單實現窗口功能。咱們一樣爲 Flink-Storm 增長了 tick tuple 機制的支持,使用方式也和 Storm 中使用方式同樣,配置 topology.tick.tuple. freq.secs 參數,即開啓了 tick tuple 功能。

2.3 多輸入下 AllGrouping 支持

AllGrouping 分組方式對應於 Flink 是 Broadcast。如圖,bolt-1 有兩個輸入,這種狀況下,原 flink-storm 的實現,spout-2 到 bolt-1 的數據分區的表現形式和Rebalance(Flink 術語)同樣,而不是 Broadcast。咱們優化了這種場景,使其數據分組表現和 Storm 中是同樣的。

3.Runtime

Flink-Storm 默認支持 local 和 standalong 模式任務提交,沒法將任務提交到 yarn 集羣,咱們在建設 Flink 集羣一開始就選擇了 yarn 模式,便於集羣資源管理和統一實時計算平臺,所以須要自行實現支持 yarn 的 runtime 功能,這裏主要涉及 yarn client 端設計。

YARN Client 實現機制

整個模塊主要分爲四個部分,其中 client 用於調用 Flink-Storm 程序轉化接口,獲得 Flink jobGraph。配置參數用於初始化 Flink 及 yarn 相關配置,構建運行時環境,命令行工具主要用於更加靈活的管理。yarnClient 主要實現 ApplicationClientProtocol 接口,完成與 ResourceManager 與 ApplicationMaster 的交互,實現 Flink job 提交和監控。

4.任務部署

爲便於任務提交和集成到 Wstream 平臺,提供相似 Flink 命令行提交方式:

用戶層面

1.maven 依賴

平臺將編譯好的包上傳到公司 maven 私服供用戶下載對應版本 Flink-Storm 依賴包:

2.代碼改動

用戶須要將 Storm 提交任務的方式改爲 Flink 提交,其餘無需變更。

總結

經過對 Fink-Storm 的優化和使用,咱們已經順利完成多個 Storm 集羣任務遷移和下線,在保障實時性及吞吐量的基礎上能夠節約計算資源 40% 以上,同時藉助 yarn 統一管理實時計算平臺無需維護多套 Storm 集羣,總體提高了平臺資源利用率,減輕平臺運維工做量。

做者介紹:萬石康,來自 58 集團 TEG,後端高級工程師,專一於大數據實時計算架構設計。



閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索