1.背景
Apache Flink 和 Apache Storm 是當前業界普遍使用的兩個分佈式實時計算框架。其中 Apache Storm(如下簡稱「Storm」)在美團點評實時計算業務中已有較爲成熟的運用(可參考 Storm 的 可靠性保證測試),有管理平臺、經常使用 API 和相應的文檔,大量實時做業基於 Storm 構建。而 Apache Flink(如下簡稱「Flink」)在近期倍受關注,具備高吞吐、低延遲、高可靠和精確計算等 特性,對事件窗口有很好的支持,目前在美團點評實時計算業務中也已有必定應用。爲深刻熟悉瞭解 Flink 框架,驗證其穩定性和可靠性,評估其實時處理性能,識別該體系中的 缺點,找到其性能瓶頸並進行優化,給用戶提供最適合的實時計算引擎,咱們以實踐經驗豐富 的 Storm 框架做爲對照,進行了一系列實驗測試 Flink 框架的性能,計算 Flink 做爲確保「至 少一次」和「剛好一次」語義的實時計算框架時對資源的消耗,爲實時計算平臺資源規劃、框 架選擇、性能調優等決策及 Flink 平臺的建設提出建議並提供數據支持,爲後續的 SLA 建設提供必定參考。Flink 與 Storm 兩個框架對比:數據庫
流計算框架Flink與Storm 的性能對比
後端
|
Storm
|
Flink
|
狀態管理 |
無狀態,需用戶自行進行狀態管理
|
有狀態
|
窗口支持 |
對事件窗口支持較弱,緩存整個窗口的全部 數據,窗口結束時一塊兒計算
|
窗口支持較爲完善,自帶一些窗口聚合方法,並 且會自動管理窗口狀態。
|
消息投遞 |
At Most Once At Least Once
|
At Most Once At Least Once Exactly Once
|
容錯方式 |
ACK機制:對每一個消息進行全鏈路跟蹤,失敗 或超時進行重發。
|
檢查點機制:經過分佈式一致性快照機制,對數 據流和算子狀態進行保存。在發生錯誤時,使系 統可以進行回滾。 |
應用現狀 |
在美團點評實時計算業務中已有較爲成熟的 運用,有管理平臺、經常使用 API 和相應的文檔, 大量實時做業基於 Storm 構建。 |
在美團點評實時計算業務中已有必定應用,但 是管理平臺、API 及文檔等仍需進一步完善。 |
2.測試目標
評估不一樣場景、不一樣數據壓力下 Flink 和 Storm 兩個實時計算框架目前的性能表現,獲取其詳 細性能數據並找處處理性能的極限;瞭解不一樣配置對 Flink 性能影響的程度,分析各類配置的 適用場景,從而得出調優建議。緩存
2.1 測試場景
「輸入-輸出」簡單處理場景
經過對「輸入-輸出」這樣簡單處理邏輯場景的測試,儘量減小其它因素的干擾,反映兩個框 架自己的性能。同時測算框架處理能力的極限,處理更加複雜的邏輯的性能不會比純粹「輸入-輸出」更高。併發
用戶做業耗時較長的場景
若是用戶的處理邏輯較爲複雜,或是訪問了數據庫等外部組件,其執行時間會增大,做業的性 能會受到影響。所以,咱們測試了用戶做業耗時較長的場景下兩個框架的調度性能。框架
窗口統計場景
實時計算中常有對時間窗口或計數窗口進行統計的需求,例如一天中每五分鐘的訪問量,每 100 個訂單中有多少個使用了優惠等。Flink 在窗口支持上的功能比 Storm 更增強大,API 更 加完善,可是咱們同時也想了解在窗口統計這個經常使用場景下兩個框架的性能。分佈式
精確計算場景(即消息投遞語義爲「剛好一次」)
Storm 僅能保證「至多一次」 (At Most Once) 和「至少一次」 (At Least Once) 的消息投遞語義, 便可能存在重複發送的狀況。有不少業務場景對數據的精確性要求較高,但願消息投遞不重不 漏。Flink 支持「剛好一次」 (Exactly Once) 的語義,可是在限定的資源條件下,更加嚴格的精 確度要求可能帶來更高的代價,從而影響性能。所以,咱們測試了在不一樣消息投遞語義下兩個 框架的性能,但願爲精確計算場景的資源規劃提供數據參考。函數
2.2 性能指標
- 吞吐量(Throughput)
- 單位時間內由計算框架成功地傳送數據的數量,本次測試吞吐量的單位爲:條/秒。
- 反映了系統的負載能力,在相應的資源條件下,單位時間內系統能處理多少數據。 •
- 吞吐量經常使用於資源規劃,同時也用於協助分析系統性能瓶頸,從而進行相應的資源調整以 保證系統能達到用戶所要求的處理能力。假設商家每小時能作二十份午飯(吞吐量 20 份/ 小時),一個外賣小哥每小時只能送兩份(吞吐量 2 份/小時),這個系統的瓶頸就在小哥配 送這個環節,能夠給該商家安排十個外賣小哥配送。
- 延遲(Latency)
- 數據從進入系統到流出系統所用的時間,本次測試延遲的單位爲:毫秒。
- 反映了系統處理的實時性。
- 金融交易分析等大量實時計算業務對延遲有較高要求,延遲越低,數據實時性越強。
- 假設商家作一份午飯須要 5 分鐘,小哥配送須要 25 分鐘,這個流程中用戶感覺到了 30 分鐘的延遲。若是更換配送方案後延遲變成了 60 分鐘,等送到了飯菜都涼了,這個新的方案就是沒法接受的。
3.測試環境
爲 Storm 和 Flink 分別搭建由 1 臺主節點和 2 臺從節點構成的 Standalone 集羣進行本次測試。其中爲了觀察 Flink 在實際生產環境中的性能,對於部分測內容也進行了 on Yarn 環境的測試。oop
3.1 集羣參數
參數項
|
參數值
|
CPU
|
QEMU Virtual CPU version 1.1.2 2.6GHz |
Core
|
8
|
Memory |
16GB
|
Disk
|
500G
|
OS
|
CentOS release 6.5 (Final)
|
3.2 框架參數
參數項
|
Storm 配置
|
Flink 配置
|
Version
|
Storm 1.1.0-mt002
|
Flink 1.3.0
|
Master Memory |
2600M
|
2600M
|
Slave Memory
|
1600M * 16
|
12800M * 2
|
Parallelism
|
2 supervisor 16 worker |
2 Task Manager 16 Task slots |
4.測試方法
4.1 測試流程
數據生產
Data Generator 按特定速率生成數據,帶上自增的 id 和 eventTime 時間戳寫入 Kafka 的一個 Topic(Topic Data)。性能
數據處理
Storm Task 和 Flink Task (每一個測試用例不一樣)從 Kafka Topic Data 相同的 Offset 開始消費, 並將結果及相應 inTime、outTime 時間戳分別寫入兩個 Topic(Topic Storm 和 Topic Flink)中。測試
指標統計
Metrics Collector 按 outTime 的時間窗口從這兩個 Topic 中統計測試指標,每五分鐘將相應的 指標寫入 MySQL 表中。Metrics Collector 按 outTime 取五分鐘的滾動時間窗口,計算五分鐘的平均吞吐(輸出數據的 條數)、五分鐘內的延遲(outTime - eventTime 或 outTime - inTime)的中位數及 99 線等指標, 寫入 MySQL 相應的數據表中。最後對 MySQL 表中的吞吐計算均值,延遲中位數及延遲 99 線 選取中位數,繪製圖像並分析。
4.2 默認參數
- Storm 和 Flink 默認均爲At Least Once語義。
Storm 開啓 ACK,ACKer 數量爲 1。
Flink 的 Checkpoint 時間間隔爲 30 秒,默認 StateBackend 爲 Memory。
- 保證 Kafka 不是性能瓶頸,儘量排除 Kafka 對測試結果的影響。
- 測試延遲時數據生產速率小於數據處理能力,假設數據被寫入 Kafka 後馬上被讀取,即 eventTime 等於數據進入系統的時間。
- 測試吞吐量時從 Kafka Topic 的最舊開始讀取,假設該 Topic 中的測試數據量充足。
4.3 測試用例
Identity
- Identity 用例主要模擬「輸入-輸出」簡單處理場景,反映兩個框架自己的性能。
- 輸入數據爲「msgId, eventTime」,其中 eventTime 視爲數據生成時間。單條輸入數據約 20 B。
- 進入做業處理流程時記錄 inTime,做業處理完成後(準備輸出時)記錄 outTime。
- 做業從 Kafka Topic Data 中讀取數據後,在字符串末尾追加時間戳,而後直接輸出到 Kafka。
- 輸出數據爲「msgId, eventTime, inTime, outTime」。單條輸出數據約 50 B。
Sleep
- Sleep 用例主要模擬用戶做業耗時較長的場景,反映複雜用戶邏輯對框架差別的削弱,比較 兩個框架的調度性能。
- 輸入數據和輸出數據均與 Identity 相同。
- 讀入數據後,等待必定時長(1 ms)後在字符串末尾追加時間戳後輸出
Windowed Word Count
- Windowed Word Count 用例主要模擬窗口統計場景,反映兩個框架在進行窗口統計時性能 的差別。
- 此外,還用其進行了精確計算場景的測試,反映 Flink 剛好一次投遞的性能。
- 輸入爲 JSON 格式,包含 msgId、eventTime 和一個由若干單詞組成的句子,單詞之間由空 格分隔。單條輸入數據約 150 B。
- 讀入數據後解析 JSON,而後將句子分割爲相應單詞,帶 eventTime 和 inTime 時間戳發給 CountWindow 進行單詞計數,同時記錄一個窗口中最大最小的 eventTime 和 inTime,最後 帶 outTime 時間戳輸出到 Kafka 相應的 Topic。
- Spout/Source 及 OutputBolt/Output/Sink 併發度恆爲 1,增大併發度時僅增大 JSONParser、 CountWindow 的併發度。
- 因爲 Storm 對 window 的支持較弱,CountWindow 使用一個 HashMap 手動實現,Flink 用了原生的 CountWindow 和相應的 Reduce 函數。
5.測試結果
5.1 Identity 單線程吞吐量
- 上圖中藍色柱形爲單線程 Storm 做業的吞吐,橙色柱形爲單線程 Flink 做業的吞吐。
- Identity 邏輯下,Storm 單線程吞吐爲8.7萬條/秒,Flink 單線程吞吐可達35萬條/秒。
- 當 Kafka Data 的 Partition 數爲 1 時,Flink 的吞吐約爲 Storm 的 3.2 倍;當其 Partition 數爲 8 時,Flink 的吞吐約爲 Storm 的 4.6 倍。
- 由此能夠看出,Flink 吞吐約爲 Storm 的 3-5 倍。
5.2 Identity 單線程做業延遲
- 採用 outTime - eventTime 做爲延遲,圖中藍色折線爲 Storm,橙色折線爲 Flink。虛線爲 99 線,實線爲中位數。
- 從圖中能夠看出隨着數據量逐漸增大,Identity 的延遲逐漸增大。其中 99 線的增大速度比中位數快,Storm 的 增大速度比 Flink 快。
- 其中 QPS 在 80000 以上的測試數據超過了 Storm 單線程的吞吐能力,沒法對 Storm 進 行測試,只有 Flink 的曲線。
- 對比折線最右端的數據能夠看出,Storm QPS 接近吞吐時延遲中位數約 100 毫秒,99 線約 700 毫秒,Flink 中位數約 50 毫秒,99 線約 300 毫秒。Flink 在滿吞吐時的延遲約爲 Storm 的一半。
5.3 Sleep 吞吐量
- 從圖中能夠看出,Sleep 1 毫秒時,Storm 和 Flink 單線程的吞吐均在 900 條/秒左右,且隨着併發增大基本呈線性增大。
- 對比藍色和橙色的柱形能夠發現,此時兩個框架的吞吐能力基本一致。
5.4 Sleep 單線程做業延遲(中位數)
- 依然採用 outTime - eventTime 做爲延遲,從圖中能夠看出,Sleep 1 毫秒時,Flink 的延遲仍低於 Storm。
5.5 Windowed Word Count 單線程吞吐量
- 單線程執行大小爲 10 的計數窗口,吞吐量統計如圖。
- 從圖中能夠看出,Storm 吞吐約爲 1.2 萬條/秒,Flink Standalone 約爲 4.3 萬條/秒。Flink 吞吐依然爲 Storm 的 3 倍以上。
5.6 Windowed Word Count Flink At Least Once 與 Exactly Once 吞吐量對比
- 因爲同一算子的多個並行任務處理速度可能不一樣,在上游算子中不一樣快照裏的內容,通過中間並行算子的處理,到達下游算子時可能被計入同一個快照中。這樣一來,這部分數據會 被重複處理。所以,Flink 在 Exactly Once 語義下須要進行對齊,即當前最先的快照中全部 數據處理完以前,屬於下一個快照的數據不進行處理,而是在緩存區等待。當前測試用例 中,在 JSON Parser 和 CountWindow、CountWindow 和 Output 之間均須要進行對齊,有 必定消耗。爲體現出對齊場景,Source/Output/Sink 併發度的併發度仍爲 1,提升了 JSONParser/CountWindow 的併發度。具體流程細節參見前文 Windowed Word Count 流程圖。
- 上圖中橙色柱形爲 At Least Once 的吞吐量,黃色柱形爲 Exactly Once 的吞吐量。對比二者能夠看出,在當前併發條件下,Exactly Once 的吞吐較 At Least Once 而言降低了 6.3%
5.7 Windowed Word Count Storm At Least Once 與 At Most Once 吞吐量對比
- Storm 將 ACKer 數量設置爲零後,每條消息在發送時就自動 ACK,再也不等待 Bolt 的 ACK, 也再也不重發消息,爲 At Most Once 語義。
- 上圖中藍色柱形爲 At Least Once 的吞吐量,淺藍色柱形爲 At Most Once 的吞吐量。對比二者能夠看出,在當前併發條件下,At Most Once 語義下的吞吐較 At Least Once 而言提升了 16.8%
5.8 Windowed Word Count 單線程做業延遲
- Identity 和 Sleep 觀測的都是 outTime - eventTime,由於做業處理時間較短或 Thread.sleep() 精度不高,outTime - inTime 爲零或沒有比較意義;Windowed Word Count 中能夠有效測得 outTime - inTime 的數值,將其與 outTime - eventTime 畫在同一張圖上,其中 outTime - eventTime 爲虛線,outTime - InTime 爲實線。 • 觀察橙色的兩條折線能夠發現,Flink 用兩種方式統計的延遲都維持在較低水平;觀察兩條 藍色的曲線能夠發現,Storm 的 outTime - inTime 較低,outTime - eventTime 一直較高,即 inTime 和 eventTime 之間的差值一直較大,可能與 Storm 和 Flink 的數據讀入方式有關。
- 藍色折線代表 Storm 的延遲隨數據量的增大而增大,而橙色折線代表 Flink 的延遲隨着數 據量的增大而減少(此處未測至 Flink 吞吐量,接近吞吐時 Flink 延遲依然會上升)。 • 即便僅關注 outTime - inTime(即圖中實線部分),依然能夠發現,當 QPS 逐漸增大的時候, Flink 在延遲上的優點開始體現出來。
5.9 Windowed Word Count Flink At Least Once 與 Exactly Once 延遲對比
- 圖中黃色爲 99 線,橙色爲中位數,虛線爲 At Least Once,實線爲 Exactly Once。圖中相應 顏色的虛實曲線都基本重合,能夠看出 Flink Exactly Once 的延遲中位數曲線與 At Least Once 基本貼合,在延遲上性能沒有太大差別。
5.10 Windowed Word Count Storm At Least Once 與 At Most Once 延遲對比
- 圖中藍色爲 99 線,淺藍色爲中位數,虛線爲 At Least Once,實線爲 At Most Once。QPS 在 4000 及之前的時候,虛線實線基本重合;QPS 在 6000 時二者已有差別,虛線略高;QPS 接近 8000 時,已超過 At Least Once 語義下 Storm 的吞吐,所以只有實線上的點。 • 能夠看出,QPS 較低時 Storm At Most Once 與 At Least Once 的延遲觀察不到差別,隨着 QPS 增大差別開始增大,At Most Once 的延遲較低。
5.11 Windowed Word Count Flink 不一樣 StateBackends 吞吐量對比
- Flink 支持 Standalone 和 on Yarn 的集羣部署模式,同時支持 Memory、FileSystem、RocksDB 三種狀態存儲後端(StateBackends)。因爲線上做業須要,測試了這三種 StateBackends 在 兩種集羣部署模式上的性能差別。其中,Standalone 時的存儲路徑爲 JobManager 上的一 個文件目錄,on Yarn 時存儲路徑爲 HDFS 上一個文件目錄。
- 對比三組柱形能夠發現,使用 FileSystem 和 Memory 的吞吐差別不大,使用 RocksDB 的 吞吐僅其他二者的十分之一左右。
- 對比兩種顏色能夠發現,Standalone 和 on Yarn 的整體差別不大,使用 FileSystem 和 Memory 時 on Yarn 模式下吞吐稍高,使用 RocksDB 時 Standalone 模式下的吞吐稍高。
5.12 Windowed Word Count Flink 不一樣 StateBackends 延遲對比
- 使用 FileSystem 和 Memory 做爲 Backends 時,延遲基本一致且較低。
- 使用 RocksDB 做爲 Backends 時,延遲稍高,且因爲吞吐較低,在達到吞吐瓶頸前的延遲陡增。其中 on Yarn 模式下吞吐更低,接近吞吐時的延遲更高。
6.結論及建議
6.1 框架自己性能
- 由 5.一、5.5 的測試結果能夠看出,Storm 單線程吞吐約爲 8.7 萬條/秒,Flink 單線程吞吐 可達 35 萬條/秒。Flink 吞吐約爲 Storm 的 3-5 倍。
- 由 5.二、5.8 的測試結果能夠看出,Storm QPS 接近吞吐時延遲(含 Kafka 讀寫時間)中位 數約 100 毫秒,99 線約 700 毫秒,Flink 中位數約 50 毫秒,99 線約 300 毫秒。Flink 在 滿吞吐時的延遲約爲 Storm 的一半,且隨着 QPS 逐漸增大,Flink 在延遲上的優點開始體現出來。
- 綜上可得,Flink 框架自己性能優於 Storm。
6.2 複雜用戶邏輯對框架差別的削弱
- 對比 5.1 和 5.三、5.2 和 5.4 的測試結果能夠發現,單個 Bolt Sleep 時長達到 1 毫秒時, Flink 的延遲仍低於 Storm,但吞吐優點已基本沒法體現。
- 所以,用戶邏輯越複雜,自己耗時越長,針對該邏輯的測試體現出來的框架的差別越小。
6.3 不一樣消息投遞語義的差別
- 由 5.六、5.七、5.九、5.10 的測試結果能夠看出,Flink Exactly Once 的吞吐較 At Least Once 而 言降低 6.3%,延遲差別不大;Storm At Most Once 語義下的吞吐較 At Least Once 提高 16.8%,延遲稍有降低。
- 因爲 Storm 會對每條消息進行 ACK,Flink 是基於一批消息作的檢查點,不一樣的實現原理導 致二者在 At Least Once 語義的花費差別較大,從而影響了性能。而 Flink 實現 Exactly Once 語義僅增長了對齊操做,所以在算子併發量不大、沒有出現慢節點的狀況下對 Flink 性能的 影響不大。Storm At Most Once 語義下的性能仍然低於 Flink。
6.4 Flink 狀態存儲後端選擇
• Flink 提供了內存、文件系統、RocksDB 三種 StateBackends,結合 5.十一、5.12 的測試結果, 三者的對好比下:
StateBackend 過程狀態存儲 檢查點存儲 吞吐 推薦使用場景 Memory TM Memory JM Memory 高(3-5 倍 Storm) 調試、無狀態或對數據是否 丟失重複無要求 FileSystem TM Memory FS/HDFS 高(3-5 倍 Storm) 普通狀態、窗口、KV 結構 (建議做爲默認 Backend)
RocksDB RocksDB on TM FS/HDFS 低(0.3-0.5 倍 Storm) 超大狀態、超長窗口、大型 KV 結構
### 6.5 推薦使用 Flink 的場景 複製代碼
綜合上述測試結果,如下實時計算場景建議考慮使用 Flink 框架進行計算:
- 要求消息投遞語義爲Exactly Once的場景;
- 數據量較大,要求高吞吐低延遲的場景;
- 須要進行狀態管理或窗口統計的場景。
7.展望
- 本次測試中尚有一些內容沒有進行更加深刻的測試,有待後續測試補充。例如:
- Exactly Once 在併發量增大的時候是否吞吐會明顯降低?
- 用戶耗時到 1ms 時框架的差別已經再也不明顯(Thread.sleep() 的精度只能到毫秒),用 戶耗時在什麼範圍內 Flink 的優點依然能體現出來?
- 本次測試僅觀察了吞吐量和延遲兩項指標,對於系統的可靠性、可擴展性等重要的性能指 標沒有在統計數據層面進行關注,有待後續補充。
- Flink 使用 RocksDBStateBackend 時的吞吐較低,有待進一步探索和優化。
- 關於 Flink 的更高級 API,如 Table API & SQL 及 CEP 等,須要進一步瞭解和完善。
8.參考內容
分佈式流處理框架——功能對比和性能評估
intel-hadoop/HiBench: HiBench is a big data benchmark suite
Yahoo的流計算引擎基準測試
Extending the Yahoo! Streaming Benchmark
本文選自《不只僅是流計算 Apache Flink實踐》
更多Flink博文:
更多Flink原理知識:
穿梭時空的實時計算框架——Flink對時間的處理
大數據實時處理的王者-Flink
統一批處理流處理——Flink批流一體實現原理
Flink快速入門--安裝與示例運行
快速構建第一個Flink工程
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算: