【譯】如何調整ApacheFlink®集羣的大小How To Size Your Apache Flink® Cluster: A Back-of-the-Envelope Calculation

來自Flink Forward Berlin 2017的最受歡迎的會議是Robert Metzger的「堅持下去:如何可靠,高效地操做Apache Flink」。 Robert所涉及的主題之一是如何粗略地肯定Apache Flink集羣的大小。 Flink Forward的與會者提到他的羣集大小調整指南對他們有幫助,所以咱們將他的談話部分轉換爲博客文章。 請享用!apache

Flink社區中最多見的問題之一是如何在從開發階段轉向生產階段時肯定羣集的大小。 對這個問題的明確答案固然是「它取決於」,但這不是一個有用的答案。 這篇文章概述了一系列問題,要求您提供一些可用做指導的數字。windows

作計算並創建基線

第一步是仔細考慮應用程序的運營指標,以得到所需資源的基線。後端

要考慮的關鍵指標是:網絡

  • 每秒記錄數和每條記錄的大小
  • 您擁有的不一樣key的數量以及每一個key的狀態大小
  • 狀態更新的數量和狀態後端的訪問模式

最後,更實際的問題是您的服務水平協議(SLA)與客戶的停機時間,延遲和最大吞吐量有關,由於這些直接影響您的容量規劃。ide

接下來,根據您的預算查看您可用的資源。例如:性能

  • 網絡容量,考慮到也使用網絡的任何外部服務,如Kafka,HDFS等。
  • 您的磁盤帶寬,若是您依賴於基於磁盤的狀態後端(如RocksDB)(並考慮其餘磁盤使用,如Kafka或HDFS)
  • 機器的數量以及它們可用的CPU和內存

基於全部這些因素,您如今能夠構建正常操做的基線,以及用於恢復追趕或處理負載峯值的資源緩衝區。我建議您在創建基線時考慮檢查點期間使用的資源。ui

示例:讓咱們舉一些例子

我如今將計劃在假設的集羣上部署做業,以可視化創建資源使用基準的過程。 這些數字是粗略的「背後」值,而且它們並不全面 - 在帖子的最後,我還將肯定在進行此計算時我忽略的一些方面。spa

示例Flink流式處理做業和硬件

示例Flink Streaming做業拓撲3d

對於此示例,我將部署一個典型的Flink流式做業,該做業使用Flink的Kafka使用者從Kafka主題讀取數據。 而後使用鍵控聚合窗口運算符來變換流。 窗口操做符在5分鐘的時間窗口上執行聚合。 因爲老是有新數據,我將窗口配置爲一個滑動窗口,滑動時間爲1分鐘。code

這意味着我將得到每分鐘更新過去5分鐘的聚合。 流式傳輸做業爲每一個userId建立一個聚合。 從Kafka主題消耗的消息的大小(平均)爲2 KB。

吞吐量是每秒100萬條消息。 要了解窗口運算符的狀態大小,您須要知道不一樣鍵的數量。 在這種狀況下,它是userIds的數量,即500,000,000個惟一身份用戶。 對於每一個用戶,您計算四個數字,存儲爲長(8個字節)。

讓咱們總結一下這項工做的關鍵指標:

  • Message size: 2KB
  • Throughput: 1,000,000 msg/sec
  • Distinct keys: 500,000,000 (aggregation in window: 4 longs per key)
  • Checkpointing: Once every minute.

假設硬件設置

運行該做業的機器有五臺,每臺機器都運行Flink TaskManager(Flink的工做節點)。 磁盤是網絡鏈接的(在雲設置中很常見),從主交換機到運行TaskManager的每臺機器都有一個10千兆以太網鏈接。 Kafka broker分佈在不一樣的機器上運行。

每臺機器有16個CPU核心。 爲簡單起見,我不會考慮CPU和內存要求。 在現實世界中,根據您的應用程序邏輯和使用中的狀態後端,您須要注意內存。 此示例使用基於RocksDB的狀態後端,該後端功能強大且內存要求低。

單機的視角

要了解整個做業部署的資源需求,最簡單的方法是首先關注一臺機器和一臺TaskManager中的操做。 而後,您可使用從一臺計算機派生的數字來計算整體資源需求。

默認狀況下(若是全部運算符具備相同的並行性且沒有特殊的調度限制),則每一個計算機上都會運行流式做業的全部運算符。

在這種狀況下,Kafka源(或消費者),窗口操做符和Kafka接收器(或生產者)都在五臺機器中的每臺機器上運行。

機器視角 -  TaskManager n

keyBy是上圖中的一個單獨的運算符,所以計算資源需求更容易。 實際上,keyBy是一個API構造,並轉換爲Kafka源和窗口運算符之間鏈接的配置屬性。

我如今將從上到下遍歷每一個運算符,以瞭解他們的網絡資源需求。

The Kafka source

要計算單個Kafka源接收的數據量,首先計算聚合Kafka輸入。 源每秒接收1,000,000條消息,每條消息2KB。

2KB x 1,000,000/s = 2GB/s

將2GB / s除以機器數量(5)會產生如下結果:

2GB/s ÷ 5 machines = 400MB/s

羣集中運行的5個Kafka源中的每個都接收平均吞吐量爲400 MB / s的數據。

The Kafka source calculation

混洗和分區

接下來,您須要確保具備相同key的全部事件(在本例中爲userId)最終位於同一臺計算機上。 您正在讀取的Kafka主題中的數據可能會根據不一樣的分區方案進行分區。

混洗過程將具備相同key的全部數據發送到一臺計算機,所以您未來自Kafka的400MB / s數據流拆分爲userId分區流:

400MB/s ÷ 5 machines = 80MB/s

平均而言,您必須向每臺計算機發送80 MB / s的數據。 這個分析是從一臺機器的角度來看的,這意味着一些數據已經在指定的目標機器上,所以減去80MB / s來解釋:

400MB/s - 80MB = 320MB/s

每臺機器以320MB / s的速率接收和發送用戶數據。

混洗計算

Window Emit and Kafka Sink

接下來要問的問題是窗口操做員發出多少數據並將其發送到Kafka接收器。 它是67MB / s,讓咱們解釋一下咱們是如何達到這個數字的。

窗口運算符爲每一個鍵保留4個數字(表示爲長整數)的彙總。 每分鐘一次,操做員發出當前的聚合值。 每一個key從聚合中發出2個int(user_id,window_ts)和4個long:

(2 x 4 bytes) + (4 x 8 bytes) = 40 bytes per key

而後考慮key(500,000,000除以機器數量):

100,000,000 keys x 40 bytes = 4GB

......來自每臺機器。

而後計算每秒大小:

4GB/min ÷ 60 = 67MB/s

...由每一個TaskManager發出。

這意味着每一個TaskManager平均從窗口運算符發出67 MB / s的用戶數據。 因爲每一個TaskManager上都運行一個Kafka接收器(窗口運算符旁邊),而且沒有進一步的從新分區,這是從Flink發送到Kafka的數據量。

用戶數據:從Kafka,洗牌到窗口運算符,而後回到Kafka

窗口運算符的數據發射預計是「突發性的」,由於它們每分鐘發出一次數據。 實際上,運營商不會以67 MB / s的恆定速率發送數據,而是每分鐘最多可用帶寬幾秒鐘。

這總計爲:

  • Data in: 720MB/s (400 + 320) per machine
  • Data out: 387MB/s (320 + 67) per machine

狀態訪問和檢查點

這不是一切。 到目前爲止,我只查看了Flink正在處理的用戶數據。 您須要將存儲狀態和檢查點保存在RocksDB中而進行的磁盤訪問的開銷包括在內。 要了解磁盤訪問成本,請查看窗口運算符如何訪問狀態。 Kafka源也保持一些狀態,但與窗口運算符相比,它能夠忽略不計。

要了解窗口運算符的狀態大小,請從不一樣的角度查看它。 Flink正在計算5分鐘的窗戶,只需1分鐘的幻燈片。 Flink經過維護五個窗口來實現滑動窗口,每一個窗口對應一個「幻燈片」。如前所述,當使用執行急切聚合的窗口實現時,每一個窗口和聚合的每一個key保持40個字節的狀態。 對於每一個傳入事件,首先須要從磁盤檢索當前聚合值(讀取40個字節),更新聚合,而後再寫入新值(寫入40個字節)。

窗口狀態

這意味着:

40 bytes of state x 5 windows x 200,000 msg/s per machine = 40MB/s

...每臺機器的讀寫磁盤訪問權限。 如開頭所述,磁盤是網絡鏈接的,所以我須要將這些數字添加到總體吞吐量計算中。
總數如今是:

  • Data in: 760MB/s (400 MB/s data in + 320 MB/s shuffle + 40 MB/s state)
  • Data out: 427MB/s (320 MB/s shuffle + 67 MB/s data out + 40 MB/s state)

以上考慮用於狀態訪問,當新事件到達窗口操做符時,該訪問一致地發生。 您還能夠啓用容錯檢查點。 若是計算機或其餘任何其餘設備出現故障,您須要恢復窗口內容並繼續處理。

檢查點設置爲每分鐘一個檢查點的間隔,每一個檢查點將做業的整個狀態複製到網絡附加文件系統中。

讓咱們快速瞭解每臺機器上的整個狀態有多大:

40bytes of state x 5 windows x 100,000,000 keys = 20GB

而且,要得到每秒值:

20GB ÷ 60 = 333 MB/s.

與窗口運算符相似,檢查點具備突發模式,每分鐘一次,它會嘗試將其數據全速發送到外部存儲。 檢查點致使對RocksDB的額外狀態訪問(在此示例中位於網絡鏈接磁盤上)。 自Flink 1.3以來,RocksDB狀態後端支持增量檢查點,減小了每一個檢查點上所需的網絡傳輸,從概念上講,僅發送自上一個檢查點以來的「diff」,但此示例中未使用此功能。

這會將總計更新爲:

  • Data in: 760MB/s (400 + 320 + 40)
  • Data out: 760MB/s (320 + 67 + 40 + 333)

這意味着總體網絡流量爲:

760 + 760 x 5 + 400 + 2335 = 10335 MB/s

400是整個5臺機器上80MB狀態訪問(讀寫)進程的總和,2335是整個集羣中Kafka進出流程的總和。

或者只是上面硬件設置中可用網絡容量的一半以上。

網絡要求

我想補充一下免責聲明。 這些計算都不包括協議開銷,例如來自Flink,Kafka或文件系統的TCP,以太網和RPC調用。 這仍然是瞭解工做所需的硬件類型以及性能指標的良好起點。

擴大你的方式

根據個人分析,此示例使用5節點集羣,而且在典型操做中,每臺計算機須要處理760 MB / s的數據,包括輸入和輸出,總容量爲1250 MB / s。 這爲我所掩蓋的複雜性保留了大約40%的網絡容量,例如網絡協議開銷,從檢查點恢復時事件重放期間的高負載,以及由數據誤差致使的集羣內不均衡的負載平衡。

對於40%是不是適當的餘量,沒有一個通用的答案,但這個算術應該給你一個很好的起點。 嘗試上面的計算,更換機器數量,key數量或每秒消息數,以便選擇要考慮的值,而後根據預算和運營因素進行平衡。 快樂縮放!

原文鏈接:https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

歡迎來騰訊雲社區:https://cloud.tencent.com/developer/support-plan?invite_code=1q904aao4zdgs

相關文章
相關標籤/搜索