我們從頭至尾講一次 Flink 網絡流控和反壓剖析

做者:張俊
整理:張友亮(Apache Flink 社區志願者)網絡

本文共 4745字,預計閱讀時間 15min。架構

本文根據 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大數據平臺研發負責人張俊老師分享。主要內容以下:大數據

  • 網絡流控的概念與背景
  • TCP的流控機制
  • Flink TCP-based 反壓機制(before V1.5)
  • Flink Credit-based 反壓機制 (since V1.5)
  • 總結與思考

網絡流控的概念與背景

爲何須要網絡流控

1_04

首先咱們能夠看下這張最精簡的網絡流控的圖,Producer 的吞吐率是 2MB/s,Consumer 是 1MB/s,這個時候咱們就會發如今網絡通訊的時候咱們的 Producer 的速度是比 Consumer 要快的,有 1MB/s 的這樣的速度差,假定咱們兩端都有一個 Buffer,Producer 端有一個發送用的 Send Buffer,Consumer 端有一個接收用的 Receive Buffer,在網絡端的吞吐率是 2MB/s,過了 5s 後咱們的 Receive Buffer 可能就撐不住了,這時候會面臨兩種狀況:優化

  • 1.若是 Receive Buffer 是有界的,這時候新到達的數據就只能被丟棄掉了。
  • 2.若是 Receive Buffer 是無界的,Receive Buffer 會持續的擴張,最終會致使 Consumer 的內存耗盡。

網絡流控的實現:靜態限速

2_05

爲了解決這個問題,咱們就須要網絡流控來解決上下游速度差的問題,傳統的作法能夠在 Producer 端實現一個相似 Rate Limiter 這樣的靜態限流,Producer 的發送速率是 2MB/s,可是通過限流這一層後,往 Send Buffer 去傳數據的時候就會降到 1MB/s 了,這樣的話 Producer 端的發送速率跟 Consumer 端的處理速率就能夠匹配起來了,就不會致使上述問題。可是這個解決方案有兩點限制:spa

  • 一、事先沒法預估 Consumer 到底能承受多大的速率
  • 二、 Consumer 的承受能力一般會動態地波動

網絡流控的實現:動態反饋/自動反壓

3_06

針對靜態限速的問題咱們就演進到了動態反饋(自動反壓)的機制,咱們須要 Consumer 可以及時的給 Producer 作一個 feedback,即告知 Producer 可以承受的速率是多少。動態反饋分爲兩種:計算機網絡

  • 一、負反饋:接受速率小於發送速率時發生,告知 Producer 下降發送速率
  • 二、正反饋:發送速率小於接收速率時發生,告知 Producer 能夠把發送速率提上來

讓咱們來看幾個經典案例線程

案例一:Storm 反壓實現

4_07

上圖就是 Storm 裏實現的反壓機制,能夠看到 Storm 在每個 Bolt 都會有一個監測反壓的線程(Backpressure Thread),這個線程一但檢測到 Bolt 裏的接收隊列(recv queue)出現了嚴重阻塞就會把這個狀況寫到 ZooKeeper 裏,ZooKeeper 會一直被 Spout 監聽,監聽到有反壓的狀況就會中止發送,經過這樣的方式匹配上下游的發送接收速率。3d

案例二:Spark Streaming 反壓實現

5_08

Spark Streaming 裏也有作相似這樣的 feedback 機制,上圖 Fecher 會實時的從 Buffer、Processing 這樣的節點收集一些指標而後經過 Controller 把速度接收的狀況再反饋到 Receiver,實現速率的匹配。orm

疑問:爲何 Flink(before V1.5)裏沒有用相似的方式實現 feedback 機制?

首先在解決這個疑問以前咱們須要先了解一下 Flink 的網絡傳輸是一個什麼樣的架構。中間件

6_10

這張圖就體現了 Flink 在作網絡傳輸的時候基本的數據的流向,發送端在發送網絡數據前要經歷本身內部的一個流程,會有一個本身的 Network Buffer,在底層用 Netty 去作通訊,Netty 這一層又有屬於本身的 ChannelOutbound Buffer,由於最終是要經過 Socket 作網絡請求的發送,因此在 Socket 也有本身的 Send Buffer,一樣在接收端也有對應的三級 Buffer。學過計算機網絡的時候咱們應該瞭解到,TCP 是自帶流量控制的。實際上 Flink (before V1.5)就是經過 TCP 的流控機制來實現 feedback 的。

TCP 流控機制

根據下圖咱們來簡單的回顧一下 TCP 包的格式結構。首先,他有 Sequence number 這樣一個機制給每一個數據包作一個編號,還有 ACK number 這樣一個機制來確保 TCP 的數據傳輸是可靠的,除此以外還有一個很重要的部分就是 Window Size,接收端在回覆消息的時候會經過 Window Size 告訴發送端還能夠發送多少數據。

7_13

接下來咱們來簡單看一下這個過程。

TCP 流控:滑動窗口

8_14

TCP 的流控就是基於滑動窗口的機制,如今咱們有一個 Socket 的發送端和一個 Socket 的接收端,目前咱們的發送端的速率是咱們接收端的 3 倍,這樣會發生什麼樣的一個狀況呢?假定初始的時候咱們發送的 window 大小是 3,而後咱們接收端的 window 大小是固定的,就是接收端的 Buffer 大小爲 5。

9_15

首先,發送端會一次性發 3 個 packets,將 1,2,3 發送給接收端,接收端接收到後會將這 3 個 packets 放到 Buffer 裏去。

10_16

接收端一次消費 1 個 packet,這時候 1 就已經被消費了,而後咱們看到接收端的滑動窗口會往前滑動一格,這時候 2,3 還在 Buffer 當中 而 4,5,6 是空出來的,因此接收端會給發送端發送 ACK = 4 ,表明發送端能夠從 4 開始發送,同時會將 window 設置爲 3 (Buffer 的大小 5 減去已經存下的 2 和 3),發送端接收到迴應後也會將他的滑動窗口向前移動到 4,5,6。

11_17

這時候發送端將 4,5,6 發送,接收端也能成功的接收到 Buffer 中去。

12_18

到這一階段後,接收端就消費到 2 了,一樣他的窗口也會向前滑動一個,這時候他的 Buffer 就只剩一個了,因而向發送端發送 ACK = 七、window = 1。發送端收到以後滑動窗口也向前移,可是這個時候就不能移動 3 格了,雖然發送端的速度容許發 3 個 packets 可是 window 傳值已經告知只能接收一個,因此他的滑動窗口就只能往前移一格到 7 ,這樣就達到了限流的效果,發送端的發送速度從 3 降到 1。

13_19
14_20

咱們再看一下這種狀況,這時候發送端將 7 發送後,接收端接收到,可是因爲接收端的消費出現問題,一直沒有從 Buffer 中去取,這時候接收端向發送端發送 ACK = 八、window = 0 ,因爲這個時候 window = 0,發送端是不能發送任何數據,也就會使發送端的發送速度降爲 0。這個時候發送端不發送任何數據了,接收端也不進行任何的反饋了,那麼如何知道消費端又開始消費了呢?

15_21
16_22
17_23

TCP 當中有一個 ZeroWindowProbe 的機制,發送端會按期的發送 1 個字節的探測消息,這時候接收端就會把 window 的大小進行反饋。當接收端的消費恢復了以後,接收到探測消息就能夠將 window 反饋給發送端端了從而恢復整個流程。TCP 就是經過這樣一個滑動窗口的機制實現 feedback。

Flink TCP-based 反壓機制(before V1.5)

示例:WindowWordCount

18_25

大致的邏輯就是從 Socket 裏去接收數據,每 5s 去進行一次 WordCount,將這個代碼提交後就進入到了編譯階段。

編譯階段:生成 JobGraph

19_26

這時候尚未向集羣去提交任務,在 Client 端會將 StreamGraph 生成 JobGraph,JobGraph 就是作爲向集羣提交的最基本的單元。在生成 JobGrap 的時候會作一些優化,將一些沒有 Shuffle 機制的節點進行合併。有了 JobGraph 後就會向集羣進行提交,進入運行階段。

運行階段:調度 ExecutionGraph

20_27

JobGraph 提交到集羣后會生成 ExecutionGraph ,這時候就已經具有基本的執行任務的雛形了,把每一個任務拆解成了不一樣的 SubTask,上圖 ExecutionGraph 中的 Intermediate Result Partition 就是用於發送數據的模塊,最終會將 ExecutionGraph 交給 JobManager 的調度器,將整個 ExecutionGraph 調度起來。而後咱們概念化這樣一張物理執行圖,能夠看到每一個 Task 在接收數據時都會經過這樣一個 InputGate 能夠認爲是負責接收數據的,再往前有這樣一個 ResultPartition 負責發送數據,在 ResultPartition 又會去作分區跟下游的 Task 保持一致,就造成了 ResultSubPartition 和 InputChannel 的對應關係。這就是從邏輯層上來看的網絡傳輸的通道,基於這麼一個概念咱們能夠將反壓的問題進行拆解。

問題拆解:反壓傳播兩個階段

21_28

反壓的傳播其實是分爲兩個階段的,對應着上面的執行圖,咱們一共涉及 3 個 TaskManager,在每一個 TaskManager 裏面都有相應的 Task 在執行,還有負責接收數據的 InputGate,發送數據的 ResultPartition,這就是一個最基本的數據傳輸的通道。在這時候假設最下游的 Task (Sink)出現了問題,處理速度降了下來這時候是如何將這個壓力反向傳播回去呢?這時候就分爲兩種狀況:

  • 跨 TaskManager ,反壓如何從 InputGate 傳播到 ResultPartition
  • TaskManager 內,反壓如何從 ResultPartition 傳播到 InputGate

跨 TaskManager 數據傳輸

22_29

前面提到,發送數據須要 ResultPartition,在每一個 ResultPartition 裏面會有分區 ResultSubPartition,中間還會有一些關於內存管理的 Buffer。
對於一個 TaskManager 來講會有一個統一的 Network BufferPool 被全部的 Task 共享,在初始化時會從 Off-heap Memory 中申請內存,申請到內存的後續內存管理就是同步 Network BufferPool 來進行的,不須要依賴 JVM GC 的機制去釋放。有了 Network BufferPool 以後能夠爲每個 ResultSubPartition 建立 Local BufferPool 。
如上圖左邊的 TaskManager 的 Record Writer 寫了 <1,2> 這個兩個數據進來,由於 ResultSubPartition 初始化的時候爲空,沒有 Buffer 用來接收,就會向 Local BufferPool 申請內存,這時 Local BufferPool 也沒有足夠的內存因而將請求轉到 Network BufferPool,最終將申請到的 Buffer 按原鏈路返還給 ResultSubPartition,<1,2> 這個兩個數據就能夠被寫入了。以後會將 ResultSubPartition 的 Buffer 拷貝到 Netty 的 Buffer 當中最終拷貝到 Socket 的 Buffer 將消息發送出去。而後接收端按照相似的機制去處理將消息消費掉。
接下來咱們來模擬上下游處理速度不匹配的場景,發送端的速率爲 2,接收端的速率爲 1,看一下反壓的過程是怎樣的。

跨 TaskManager 反壓過程

23_30

由於速度不匹配就會致使一段時間後 InputChannel 的 Buffer 被用盡,因而他會向 Local BufferPool 申請新的 Buffer ,這時候能夠看到 Local BufferPool 中的一個 Buffer 就會被標記爲 Used。

24_31

發送端還在持續以不匹配的速度發送數據,而後就會致使 InputChannel 向 Local BufferPool 申請 Buffer 的時候發現沒有可用的 Buffer 了,這時候就只能向 Network BufferPool 去申請,固然每一個 Local BufferPool 都有最大的可用的 Buffer,防止一個 Local BufferPool 把 Network BufferPool 耗盡。這時候看到 Network BufferPool 仍是有可用的 Buffer 能夠向其申請。

25_32

一段時間後,發現 Network BufferPool 沒有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限沒法向 Network BufferPool 申請,沒有辦法去讀取新的數據,這時 Netty AutoRead 就會被禁掉,Netty 就不會從 Socket 的 Buffer 中讀取數據了。

26_33

顯然,再過不久 Socket 的 Buffer 也被用盡,這時就會將 Window = 0 發送給發送端(前文提到的 TCP 滑動窗口的機制)。這時發送端的 Socket 就會中止發送。

27_34

很快發送端的 Socket 的 Buffer 也被用盡,Netty 檢測到 Socket 沒法寫了以後就會中止向 Socket 寫數據。

28_35

Netty 中止寫了以後,全部的數據就會阻塞在 Netty 的 Buffer 當中了,可是 Netty 的 Buffer 是無界的,能夠經過 Netty 的水位機制中的 high watermark 控制他的上界。當超過了 high watermark,Netty 就會將其 channel 置爲不可寫,ResultSubPartition 在寫以前都會檢測 Netty 是否可寫,發現不可寫就會中止向 Netty 寫數據。

29_36

這時候全部的壓力都來到了 ResultSubPartition,和接收端同樣他會不斷的向 Local BufferPool 和 Network BufferPool 申請內存。

30_38

Local BufferPool 和 Network BufferPool 都用盡後整個 Operator 就會中止寫數據,達到跨 TaskManager 的反壓。

TaskManager 內反壓過程

瞭解了跨 TaskManager 反壓過程後再來看 TaskManager 內反壓過程就更好理解了,下游的 TaskManager 反壓致使本 TaskManager 的 ResultSubPartition 沒法繼續寫入數據,因而 Record Writer 的寫也被阻塞住了,由於 Operator 須要有輸入纔能有計算後的輸出,輸入跟輸出都是在同一線程執行, Record Writer 阻塞了,Record Reader 也中止從 InputChannel 讀數據,這時上游的 TaskManager 還在不斷地發送數據,最終將這個 TaskManager 的 Buffer 耗盡。具體流程能夠參考下圖,這就是 TaskManager 內的反壓過程。

31_39
32_40
33_41
34_42

Flink Credit-based 反壓機制(since V1.5)

TCP-based 反壓的弊端

35_44

在介紹 Credit-based 反壓機制以前,先分析下 TCP 反壓有哪些弊端。

  • 在一個 TaskManager 中可能要執行多個 Task,若是多個 Task 的數據最終都要傳輸到下游的同一個 TaskManager 就會複用同一個 Socket 進行傳輸,這個時候若是單個 Task 產生反壓,就會致使複用的 Socket 阻塞,其他的 Task 也沒法使用傳輸,checkpoint barrier 也沒法發出致使下游執行 checkpoint 的延遲增大。
  • 依賴最底層的 TCP 去作流控,會致使反壓傳播路徑太長,致使生效的延遲比較大。

引入 Credit-based 反壓

這個機制簡單的理解起來就是在 Flink 層面實現相似 TCP 流控的反壓機制來解決上述的弊端,Credit 能夠類比爲 TCP 的 Window 機制。

Credit-based 反壓過程

36_46

如圖所示在 Flink 層面實現反壓機制,就是每一次 ResultSubPartition 向 InputChannel 發送消息的時候都會發送一個 backlog size 告訴下游準備發送多少消息,下游就會去計算有多少的 Buffer 去接收消息,算完以後若是有充足的 Buffer 就會返還給上游一個 Credit 告知他能夠發送消息(圖上兩個 ResultSubPartition 和 InputChannel 之間是虛線是由於最終仍是要經過 Netty 和 Socket 去通訊),下面咱們看一個具體示例。

37_47

假設咱們上下游的速度不匹配,上游發送速率爲 2,下游接收速率爲 1,能夠看到圖上在 ResultSubPartition 中累積了兩條消息,10 和 11, backlog 就爲 2,這時就會將發送的數據 <8,9> 和 backlog = 2 一同發送給下游。下游收到了以後就會去計算是否有 2 個 Buffer 去接收,能夠看到 InputChannel 中已經不足了這時就會從 Local BufferPool 和 Network BufferPool 申請,好在這個時候 Buffer 仍是能夠申請到的。

38_48

過了一段時間後因爲上游的發送速率要大於下游的接受速率,下游的 TaskManager 的 Buffer 已經到達了申請上限,這時候下游就會向上遊返回 Credit = 0,ResultSubPartition 接收到以後就不會向 Netty 去傳輸數據,上游 TaskManager 的 Buffer 也很快耗盡,達到反壓的效果,這樣在 ResultSubPartition 層就能感知到反壓,不用經過 Socket 和 Netty 一層層地向上反饋,下降了反壓生效的延遲。同時也不會將 Socket 去阻塞,解決了因爲一個 Task 反壓致使 TaskManager 和 TaskManager 之間的 Socket 阻塞的問題。

總結與思考

總結

  • 網絡流控是爲了在上下游速度不匹配的狀況下,防止下游出現過載
  • 網絡流控有靜態限速和動態反壓兩種手段
  • Flink 1.5 以前是基於 TCP 流控 + bounded buffer 實現反壓
  • Flink 1.5 以後實現了本身託管的 credit - based 流控機制,在應用層模擬 TCP 的流控機制

思考

有了動態反壓,靜態限速是否是徹底沒有做用了?

39_52

實際上動態反壓不是萬能的,咱們流計算的結果最終是要輸出到一個外部的存儲(Storage),外部數據存儲到 Sink 端的反壓是不必定會觸發的,這要取決於外部存儲的實現,像 Kafka 這樣是實現了限流限速的消息中間件能夠經過協議將反壓反饋給 Sink 端,可是像 ES 沒法將反壓進行傳播反饋給 Sink 端,這種狀況下爲了防止外部存儲在大的數據量下被打爆,咱們就能夠經過靜態限速的方式在 Source 端去作限流。因此說動態反壓並不能徹底替代靜態限速的,須要根據合適的場景去選擇處理方案。

 

 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索