360深度實踐:Flink 與 Storm 協議級對比

做者:張馨予算法

本文從數據傳輸和數據可靠性的角度出發,對比測試了 Storm 與 Flink 在流處理上的性能,並對測試結果進行分析,給出在使用 Flink 時提升性能的建議。安全

Apache Storm、Apache Spark 和 Apache Flink 都是開源社區中很是活躍的分佈式計算平臺,在不少公司可能同時使用着其中兩種甚至三種。對於實時計算來講,Storm 與 Flink 的底層計算引擎是基於流的,本質上是一條一條的數據進行處理,且處理的模式是流水線模式,即全部的處理進程同時存在,數據在這些進程之間流動處理。而 Spark 是基於批量數據的處理,即一小批一小批的數據進行處理,且處理的邏輯在一批數據準備好以後纔會進行計算。在本文中,咱們把一樣基於流處理的 Storm 和 Flink 拿來作對比測試分析。網絡

在咱們作測試以前,調研了一些已有的大數據平臺性能測試報告,好比,雅虎的 Streaming-benchmarks,或者 Intel 的 HiBench 等等。除此以外,還有不少的論文也從不一樣的角度對分佈式計算平臺進行了測試。雖然這些測試 case 各有不一樣的側重點,但他們都用到了一樣的兩個指標,即吞吐和延遲。吞吐表示單位時間內所能處理的數據量,是能夠經過增大併發來提升的。延遲表明處理一條數據所須要的時間,與吞吐量成反比關係。併發

在咱們設計計算邏輯時,首先考慮一下流處理的計算模型。上圖是一個簡單的流計算模型,在 Source 中將數據取出,發往下游 Task ,並在 Task 中進行處理,最後輸出。對於這樣的一個計算模型,延遲時間由三部分組成:數據傳輸時間、 Task 計算時間和數據排隊時間。咱們假設資源足夠,數據不用排隊。則延遲時間就只由數據傳輸時間和 Task 計算時間組成。而在 Task 中處理所須要的時間與用戶的邏輯息息相關,因此對於一個計算平臺來講,數據傳輸的時間才更能反映這個計算平臺的能力。所以,咱們在設計測試 Case 時,爲了更好的體現出數據傳輸的能力,Task 中沒有設計任何計算邏輯。分佈式

在肯定數據源時,咱們主要考慮是在進程中直接生成數據,這種方法在不少以前的測試標準中也一樣有使用。這樣作是由於數據的產生不會受到外界數據源系統的性能限制。但因爲在咱們公司內部大部分的實時計算數據都來源於 kafka ,因此咱們增長了從 kafka 中讀取數據的測試。性能

對於數據傳輸方式,能夠分爲兩種:進程間的數據傳輸和進程內的數據傳輸。測試

進程間的數據傳輸是指這條數據會通過序列化、網絡傳輸和反序列化三個步驟。在 Flink 中,2個處理邏輯分佈在不一樣的 TaskManager 上,這兩個處理邏輯之間的數據傳輸就能夠叫作進程間的數據傳輸。Flink 網絡傳輸是採用的 Netty 技術。在 Storm 中,進程間的數據傳輸是 worker 之間的數據傳輸。早版本的 storm 網絡傳輸使用的 ZeroMQ,如今也改爲了 Netty。大數據

進程內的數據傳輸是指兩個處理邏輯在同一個進程中。在 Flink 中,這兩個處理邏輯被 Chain 在了一塊兒,在一個線程中經過方法調用傳參的形式進程數據傳輸。在 Storm 中,兩個處理邏輯變成了兩個線程,經過一個共享的隊列進行數據傳輸。優化

Storm 和 Flink 都有各自的可靠性機制。在 Storm 中,使用 ACK 機制來保證數據的可靠性。而在 Flink 中是經過 checkpoint 機制來保證的,這是來源於 chandy-lamport 算法。spa

事實上 Exactly-once 可靠性的保證跟處理的邏輯和結果輸出的設計有關。好比結果要輸出到kafka中,而輸出到kafka的數據沒法回滾,這就沒法保證 Exactly-once。咱們在測試的時候選用的 at-least-once 語義的可靠性和不保證可靠性兩種策略進行測試。

上圖是咱們測試的環境和各個平臺的版本。

上圖展現的是 Flink 在自產數據的狀況下,不一樣的傳輸方式和可靠性的吞吐量:在進程內+不可靠、進程內+可靠、進程間+不可靠、進程間+可靠。能夠看到進程內的數據傳輸是進程間的數據傳輸的3.8倍。是否開啓 checkpoint 機制對 Flink 的吞吐影響並不大。所以咱們在使用 Flink 時,進來使用進程內的傳輸,也就是儘量的讓算子能夠 Chain 起來。

那麼咱們來看一下爲何 Chain 起來的性能好這麼多,要如何在寫 Flink 代碼的過程當中讓 Flink 的算子 Chain 起來使用進程間的數據傳輸。

你們知道咱們在 Flink 代碼時必定會建立一個 env,調用 env 的 disableOperatorChainning() 方法會使得全部的算子都沒法 chain 起來。咱們通常是在 debug 的時候回調用這個方法,方便調試問題。

若是容許 Chain 的狀況下,上圖中 Source 和 mapFunction 就會 Chain 起來,放在一個 Task 中計算。反之,若是不容許 Chain,則會放到兩個 Task 中。

對於沒有 Chain 起來的兩個算子,他們被放到了不一樣的兩個 Task 中,那麼他們之間的數據傳輸是這樣的:SourceFunction 取到數據序列化後放入內存,而後經過網絡傳輸給 MapFunction 所在的進程,該進程將數據方序列化後使用。

對於 Chain 起來的兩個算子,他們被放到同一個Task中,那麼這兩個算子之間的數據傳輸則是:SourceFunction 取到數據後,進行一次深拷貝,而後 MapFunction 把深拷貝出來的這個對象做爲輸入數據。

雖然 Flink 在序列化上作了不少優化,跟不用序列化和不用網絡傳輸的進程內數據傳輸對比,性能仍是差不少。因此咱們儘量的把算子 Chain 起來。

不是任何兩個算子均可以 Chain 起來的,要把算子 Chain 起來有不少條件:第一,下游算子只能接受一種上游數據流,好比Map接受的流不能是一條 union 後的流;其次上下游的併發數必定要同樣;第二,算子要使用同一個資源 Group,默認是一致的,都是 default;第三,就是以前說的 env 中不能調用 disableOperatorChainning() 方法,最後,上游發送數據的方法是 Forward 的,好比,開發時沒有調用 rebalance() 方法,沒有 keyby(),沒有 boardcast 等。

對比一下自產數據時,使用進程內通訊,且不保證數據可靠性的狀況下,Flink 與 Storm 的吞吐。在這種狀況下,Flink 的性能是 Storm 的15倍。Flink 吞吐能達到2060萬條/s。不只如此,若是在開發時調用了env.getConfig().enableObjectReuse() 方法,Flink 的但併發吞吐能達到4090萬條/s。

當調用了 enableObjectReuse 方法後,Flink 會把中間深拷貝的步驟都省略掉,SourceFunction 產生的數據直接做爲 MapFunction 的輸入。但須要特別注意的是,這個方法不能隨便調用,必需要確保下游 Function 只有一種,或者下游的 Function 均不會改變對象內部的值。不然可能會有線程安全的問題。

當對比在不一樣可靠性策略的狀況下,Flink 與 Storm 的表現時,咱們發現,保證可靠性對 Flink 的影響很是小,但對 Storm 的影響很是大。總的來講,在保證可靠的狀況下,Flink 單併發的吞吐是 Storm 的15倍,而不保證可靠的狀況下,Flink 的性能是 Storm 的66倍。會產生這樣的結果,主要是由於 Flink 與 Storm 保證數據可靠性的機制不一樣。

而 Storm 的 ACK 機制爲了保證數據的可靠性,開銷更大。

左邊的圖展現的是 Storm 的 ACK 機制。Spout 每發送一條數據到 Bolt,就會產生一條 ACK 的信息給 ACKer ,當 Bolt 處理完這條數據後也會發送 ACK 信息給 ACKer。當 ACKer 收到這條數據的全部 ACK 信息時,會回覆 Spout 一條 ACK 信息。也就是說,對於一個只有兩級(spout+bolt)的拓撲來講,每發送一條數據,就會傳輸3條 ACK 信息。這3條 ACK 信息則是爲了保證可靠性所須要的開銷。

右邊的圖展現的是 Flink 的 Checkpoint 機制。Flink 中 Checkpoint 信息的發起者是 JobManager。它不像 Storm 中那樣,每條信息都會有 ACK 信息的開銷,並且按時間來計算花銷。用戶能夠設置作 checkpoint 的頻率,好比10秒鐘作一次 checkpoint。每作一次 checkpoint,花銷只有從 Source 發往 map 的1條 checkpoint 信息(JobManager 發出來的 checkpoint 信息走的是控制流,與數據流無關)。與 Storm 相比,Flink 的可靠性機制開銷要低得多。這也就是爲何保證可靠性對 Flink 的性能影響較小,而 Storm 的影響確很大的緣由。

最後一組自產數據的測試結果對比是 Flink 與 Storm 在進程間的數據傳輸的對比,能夠看到進程間數據傳輸的狀況下,Flink 但併發吞吐是 Storm 的4.7倍。保證可靠性的狀況下,是 Storm 的14倍。

上圖展現的是消費 kafka 中數據時,Storm 與 Flink 的但併發吞吐狀況。由於消費的是 kafka 中的數據,因此吞吐量確定會收到 kafka 的影響。咱們發現性能的瓶頸是在 SourceFunction 上,因而增長了 topic 的 partition 數和 SourceFunction 取數據線程的併發數,可是 MapFunction 的併發數仍然是1.在這種狀況下,咱們發現 Flink 的瓶頸轉移到上游往下游發數據的地方。而 Storm 的瓶頸確是在下游收數據反序列化的地方。

以前的性能分析使咱們基於數據傳輸和數據可靠性的角度出發,單純的對 Flink 與 Storm 計算平臺自己進行了性能分析。但實際使用時,task 是確定有計算邏輯的,這就勢必更多的涉及到 CPU,內存等資源問題。咱們未來打算作一個智能分析平臺,對用戶的做業進行性能分析。經過收集到的指標信息,分析出做業的瓶頸在哪,並給出優化建議。

相關文章
相關標籤/搜索