如何分析及處理 Flink 反壓?

反壓(backpressure)是實時計算應用開發中,特別是流式計算中,十分常見的問題。反壓意味着數據管道中某個節點成爲瓶頸,處理速率跟不上上游發送數據的速率,而須要對上游進行限速。因爲實時計算應用一般使用消息隊列來進行生產端和消費端的解耦,消費端數據源是 pull-based 的,因此反壓一般是從某個節點傳導至數據源並下降數據源(好比 Kafka consumer)的攝入速率。緩存

關於 Flink 的反壓機制,網上已經有很多博客介紹,中文博客推薦這兩篇1。簡單來講,Flink 拓撲中每一個節點(Task)間的數據都以阻塞隊列的方式傳輸,下游來不及消費致使隊列被佔滿後,上游的生產也會被阻塞,最終致使數據源的攝入被阻塞。而本文將着重結合官方的博客[4]分享筆者在實踐中分析和處理 Flink 反壓的經驗。網絡

反壓的影響

反壓並不會直接影響做業的可用性,它代表做業處於亞健康的狀態,有潛在的性能瓶頸並可能致使更大的數據處理延遲。一般來講,對於一些對延遲要求不過高或者數據量比較小的應用來講,反壓的影響可能並不明顯,然而對於規模比較大的 Flink 做業來講反壓可能會致使嚴重的問題。運維

這是由於 Flink 的 checkpoint 機制,反壓還會影響到兩項指標: checkpoint 時長和 state 大小。函數

  • 前者是由於 checkpoint barrier 是不會越過普通數據的,數據處理被阻塞也會致使 checkpoint barrier 流經整個數據管道的時長變長,於是 checkpoint 整體時間(End to End Duration)變長。
  • 後者是由於爲保證 EOS(Exactly-Once-Semantics,準確一次),對於有兩個以上輸入管道的 Operator,checkpoint barrier 須要對齊(Alignment),接受到較快的輸入管道的 barrier 後,它後面數據會被緩存起來但不處理,直到較慢的輸入管道的 barrier 也到達,這些被緩存的數據會被放到state 裏面,致使 checkpoint 變大。

這兩個影響對於生產環境的做業來講是十分危險的,由於 checkpoint 是保證數據一致性的關鍵,checkpoint 時間變長有可能致使 checkpoint 超時失敗,而 state 大小一樣可能拖慢 checkpoint 甚至致使 OOM (使用 Heap-based StateBackend)或者物理內存使用超出容器資源(使用 RocksDBStateBackend)的穩定性問題。性能

所以,咱們在生產中要儘可能避免出現反壓的狀況(順帶一提,爲了緩解反壓給 checkpoint 形成的壓力,社區提出了 FLIP-76: Unaligned Checkpoints[4] 來解耦反壓和 checkpoint)。優化

定位反壓節點

要解決反壓首先要作的是定位到形成反壓的節點,這主要有兩種辦法:spa

  1. 經過 Flink Web UI 自帶的反壓監控面板;
  2. 經過 Flink Task Metrics。

前者比較容易上手,適合簡單分析,後者則提供了更加豐富的信息,適合用於監控系統。由於反壓會向上遊傳導,這兩種方式都要求咱們從 Source 節點到 Sink 的逐一排查,直到找到形成反壓的根源緣由[4]。下面分別介紹這兩種辦法。線程

反壓監控面板

Flink Web UI 的反壓監控提供了 SubTask 級別的反壓監控,原理是經過週期性對 Task 線程的棧信息採樣,獲得線程被阻塞在請求 Buffer(意味着被下游隊列阻塞)的頻率來判斷該節點是否處於反壓狀態。默認配置下,這個頻率在 0.1 如下則爲 OK,0.1 至 0.5 爲 LOW,而超過 0.5 則爲 HIGH。日誌

_1

圖1. Flink 1.8 的 Web UI 反壓面板(來自官方博客)blog

若是處於反壓狀態,那麼有兩種可能性:

  1. 該節點的發送速率跟不上它的產生數據速率。這通常會發生在一條輸入多條輸出的 Operator(好比 flatmap)。
  2. 下游的節點接受速率較慢,經過反壓機制限制了該節點的發送速率。

若是是第一種情況,那麼該節點則爲反壓的根源節點,它是從 Source Task 到 Sink Task 的第一個出現反壓的節點。若是是第二種狀況,則須要繼續排查下游節點。

值得注意的是,反壓的根源節點並不必定會在反壓面板體現出高反壓,由於反壓面板監控的是發送端,若是某個節點是性能瓶頸並不會致使它自己出現高反壓,而是致使它的上游出現高反壓。整體來看,若是咱們找到第一個出現反壓的節點,那麼反壓根源要麼是就這個節點,要麼是它緊接着的下游節點。

那麼若是區分這兩種狀態呢?很遺憾只經過反壓面板是沒法直接判斷的,咱們還須要結合 Metrics 或者其餘監控手段來定位。此外若是做業的節點數不少或者並行度很大,因爲要採集全部 Task 的棧信息,反壓面板的壓力也會很大甚至不可用。

Task Metrics

Flink 提供的 Task Metrics 是更好的反壓監控手段,但也要求更加豐富的背景知識。

首先咱們簡單回顧下 Flink 1.5 之後的網路棧,熟悉的讀者能夠直接跳過。

TaskManager 傳輸數據時,不一樣的 TaskManager 上的兩個 Subtask 間一般根據 key 的數量有多個 Channel,這些 Channel 會複用同一個 TaskManager 級別的 TCP 連接,而且共享接收端 Subtask 級別的 Buffer Pool。

在接收端,每一個 Channel 在初始階段會被分配固定數量的 Exclusive Buffer,這些 Buffer 會被用於存儲接受到的數據,交給 Operator 使用後再次被釋放。Channel 接收端空閒的 Buffer 數量稱爲 Credit,Credit 會被定時同步給發送端被後者用於決定發送多少個 Buffer 的數據。

在流量較大時,Channel 的 Exclusive Buffer 可能會被寫滿,此時 Flink 會向 Buffer Pool 申請剩餘的 Floating Buffer。這些 Floating Buffer 屬於備用 Buffer,哪一個 Channel 須要就去哪裏。而在 Channel 發送端,一個 Subtask 全部的 Channel 會共享同一個 Buffer Pool,這邊就沒有區分 Exclusive Buffer 和 Floating Buffer。

_2

圖2. Flink Credit-Based 網絡

咱們在監控反壓時會用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有關,最爲有用的是如下幾個 Metrics:

Metris

描述

outPoolUsage

發送端 Buffer 的使用率

inPoolUsage

接收端 Buffer 的使用率

floatingBuffersUsage(1.9 以上)

接收端 Floating Buffer 的使用率

exclusiveBuffersUsage (1.9 以上)

接收端 Exclusive Buffer 的使用率

其中 inPoolUsage 等於 floatingBuffersUsage 與 exclusiveBuffersUsage 的總和。

分析反壓的大體思路是:若是一個 Subtask 的發送端 Buffer 佔用率很高,則代表它被下游反壓限速了;若是一個 Subtask 的接受端 Buffer 佔用很高,則代表它將反壓傳導至上游。反壓狀況能夠根據如下表格進行對號入座(圖片來自官網):

_3

圖3. 反壓分析表

outPoolUsage 和 inPoolUsage 同爲低或同爲高分別代表當前 Subtask 正常或處於被下游反壓,這應該沒有太多疑問。而比較有趣的是當 outPoolUsage 和 inPoolUsage 表現不一樣時,這多是出於反壓傳導的中間狀態或者代表該 Subtask 就是反壓的根源。

若是一個 Subtask 的 outPoolUsage 是高,一般是被下游 Task 所影響,因此能夠排查它自己是反壓根源的可能性。若是一個 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,則代表它有多是反壓的根源。由於一般反壓會傳導至其上游,致使上游某些 Subtask 的 outPoolUsage 爲高,咱們能夠根據這點來進一步判斷。值得注意的是,反壓有時是短暫的且影響不大,好比來自某個 Channel 的短暫網絡延遲或者 TaskManager 的正常 GC,這種狀況下咱們能夠不用處理。

對於 Flink 1.9 及以上版本,除了上述的表格,咱們還能夠根據 floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 來進行進一步的分析一個 Subtask 和其上游 Subtask 的數據傳輸。

_4

圖4. Flink 1.9 反壓分析表

一般來講,floatingBuffersUsage 爲高則代表反壓正在傳導至上游,而 exclusiveBuffersUsage 則代表了反壓是否存在傾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低爲有傾斜,由於少數 channel 佔用了大部分的 Floating Buffer)。

至此,咱們已經有比較豐富的手段定位反壓的根源是出如今哪一個節點,可是具體的緣由尚未辦法找到。另外基於網絡的反壓 metrics 並不能定位到具體的 Operator,只能定位到 Task。特別是 embarrassingly parallel(易並行)的做業(全部的 Operator 會被放入一個 Task,所以只有一個節點),反壓 metrics 則派不上用場。

分析具體緣由及處理

定位到反壓節點後,分析形成緣由的辦法和咱們分析一個普通程序的性能瓶頸的辦法是十分相似的,可能還要更簡單一點,由於咱們要觀察的主要是 Task Thread。

在實踐中,不少狀況下的反壓是因爲數據傾斜形成的,這點咱們能夠經過 Web UI 各個 SubTask 的 Records Sent 和 Record Received 來確認,另外 Checkpoint detail 裏不一樣 SubTask 的 State size 也是一個分析數據傾斜的有用指標。

此外,最多見的問題多是用戶代碼的執行效率問題(頻繁被阻塞或者性能問題)。最有用的辦法就是對 TaskManager 進行 CPU profile,從中咱們能夠分析到 Task Thread 是否跑滿一個 CPU 核:若是是的話要分析 CPU 主要花費在哪些函數裏面,好比咱們生產環境中就偶爾遇到卡在 Regex 的用戶函數(ReDoS);若是不是的話要看 Task Thread 阻塞在哪裏,多是用戶函數自己有些同步的調用,多是 checkpoint 或者 GC 等系統活動致使的暫時系統暫停。

固然,性能分析的結果也多是正常的,只是做業申請的資源不足而致使了反壓,這就一般要求拓展並行度。值得一提的,在將來的版本 Flink 將會直接在 WebUI 提供 JVM 的 CPU 火焰圖[5],這將大大簡化性能瓶頸的分析。

另外 TaskManager 的內存以及 GC 問題也可能會致使反壓,包括 TaskManager JVM 各區內存不合理致使的頻繁 Full GC 甚至失聯。推薦能夠經過給 TaskManager 啓用 G1 垃圾回收器來優化 GC,並加上 -XX:+PrintGCDetails 來打印 GC 日誌的方式來觀察 GC 的問題。

總結

反壓是 Flink 應用運維中常見的問題,它不只意味着性能瓶頸還可能致使做業的不穩定性。定位反壓能夠從 Web UI 的反壓監控面板和 Task Metric 二者入手,前者方便簡單分析,後者適合深刻挖掘。定位到反壓節點後咱們能夠經過數據分佈、CPU Profile 和 GC 指標日誌等手段來進一步分析反壓背後的具體緣由並進行鍼對性的優化。

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索