一文搞懂 Flink 網絡流控與反壓機制

     https://www.jianshu.com/p/2779e73abcb8html

  • 看完本文,你能get到如下知識
    • Flink 流處理爲何須要網絡流控?
    • Flink V1.5 版以前網絡流控介紹
    • Flink V1.5 版以前的反壓策略存在的問題
    • Credit的反壓策略實現原理,Credit是如何解決 Flink 1.5 以前的問題?
    • 對比spark,都說flink延遲低,來一條處理一條,真是這樣嗎?其實Flink內部也有Buffer機制,Buffer機制具體是如何實現的?
    • Flink 如何在吞吐量和延遲之間作權衡?
  • 後續相關博客
    • Flink 反壓相關 Metrics 介紹
    • 基於 Flink 的流控機制和反壓如何定位 Flink 任務的瓶頸。或者說,若是一個平時正常的 Flink 任務忽然出現延遲了,怎麼來定位問題?究竟是 Kafka 讀取數據慢,仍是中間某個計算環節比較消耗資源使得變慢,仍是因爲最後的寫入外部存儲時比較慢?

Flink 流處理爲何須要網絡流控?

分析一個簡單的 Flink 流任務,下圖是一個簡單的Flink流任務執行圖:任務首先從 Kafka 中讀取數據、 map 算子對數據進行轉換、keyBy 按照指定 key 對數據進行分區(相同 key 的數據通過 keyBy 後分到同一個 subtask 實例中),keyBy 後對數據接着進行 map 轉換,而後使用 Sink 將數據輸出到外部存儲。git

 
Flink任務簡單示例圖.png

衆所周知,在大數據處理中,不管是批處理仍是流處理,單點處理的性能老是有限的,咱們的單個 Job 通常會運行在多個節點上,多個節點共同配合來提高整個系統的處理性能。圖中,任務被切分紅 4 個可獨立執行的 subtask( A0、A一、B0、B1),在數據處理過程當中,就會存在 shuffle(數據傳輸)的過程。例如,subtask A0 處理完的數據通過 keyBy 後發送到 subtask B0、B1 所在節點去處理。github

那麼問題來了,下圖中,上游 Producer 向下遊 Consumer 發送數據,在發送端和接受端都有相應的 Send Buffer 和 Receive Buffer,可是上游 Producer 生成數據的速率比下游 Consumer 消費數據的速率快。Producer 生產數據 2MB/s, Consumer 消費數據 1MB/s,Receive Buffer 只有 5MB,因此過了5秒後,接收端的 Receive Buffer 滿了。(能夠把下圖中的 Producer 當作上面案例中的 subtask A0,把下圖中的 Consumer 當作上面案例中的 subtask B0)apache

 
流控-存在的問題.png

下游接收區的 Receive Buffer 有限,若是上游一直有源源不斷的數據,那麼將會面臨着如下兩個狀況:緩存

  1. 下游消費者會丟棄新到達的數據,由於下游消費者的緩衝區放不下網絡

  2. 爲了避免丟棄數據,因此下游消費者的 Receive Buffer 持續擴張,最後耗盡消費者的內存,OOM,程序掛掉性能

常識告訴咱們,這兩種狀況在生產環境都是不能接受的,第一種會把數據丟棄、第二種會把咱們的應用程序掛掉。因此,該問題的解決方案不該該是下游 Receive Buffer 一直累積數據,而是上游 Producer 發現下游 Consumer 處理比較慢以後,應該在 Producer 端作出限流的策略,防止在下游 Consumer 端無限制的數據堆積。學習

那上游 Producer 端該如何作限流呢?能夠採用下圖所示靜態限流的策略:測試

 
流控-靜態限速.png

靜態限速的思想就是,提早已知下游 Consumer 的消費速率,而後經過在上游 Producer 端使用相似令牌桶的思想,限制 Producer 端生產數據的速率,從而控制上游 Producer 端向下遊 Consumer 端發送數據的速率。可是靜態限速會存在問題:大數據

  1. 一般沒法事先預估下游 Consumer 端能承受的最大速率
  2. 就算經過某種方式預估出下遊 Consumer 端能承受的最大速率,下游應用程序也可能會由於網絡抖動、 CPU 共享競爭、內存緊張、IO阻塞等緣由形成下游應用程序的吞吐量下降,而後又會出現上面所說的下游接收區的 Receive Buffer 有限,上游一直有源源不斷的數據發送到下游的問題,仍是會形成下游要麼丟數據,要麼爲了避免丟數據 buffer 不斷擴充致使下游 OOM的問題

綜上所述,咱們發現了,上游 Producer 端必須有一個限流的策略,且靜態限流是不可靠的,因而就須要一個動態限流的策略。能夠採用下圖動態反饋所示:

 
流控-動態反饋.png

下游 Consumer 端會頻繁地向上遊 Producer 端進行動態反饋,告訴 Producer 下游 Consumer 的負載能力,從而 Producer 端動態調整向下遊 Consumer 發送數據的速率實現 Producer 端的動態限流。當 Consumer 端處理較慢時,Consumer 將負載反饋到 Producer 端,Producer端會根據反饋適當下降 Producer 自身從上游或者 Source 端讀數據的速率來下降向下遊 Consumer 發送數據的速率。當 Consumer 處理負載能力提高後,又及時向 Producer 端反饋,Producer 會經過提高從上游或 Source 端讀數據的速率來提高向下遊發送數據的速率。經過這個動態反饋來提高整個系統的吞吐量。

補充一點,以下圖所示,假如咱們的 Job 分爲 Task A、B、C,Task A 是 Source Task、Task B 處理數據、Task C 是 Sink Task。假如 Task C 因爲各類緣由吞吐量下降,會將負載信息反饋給 Task B,Task B 會下降向 Task C 發送數據的速率,此時若是 Task B 若是仍是一直從 Task A 讀取數據,那麼按照一樣的道理,數據會把 Task B 的 Send Buffer 和 Receive Buffer 撐爆,又會出現上面描述的問題。因此,當 Task B 的 Send Buffer 和 Receive Buffer 被用完後,Task B 會用一樣的原理將負載信息反饋給 Task A,Task A 收到 Task B 的負載信息後,會下降 給 Task B 發送數據的速率,以此類推。

 
簡單的3個Task圖示.png

上面這個流程,就是 Flink 動態限流(反壓機制)的簡單描述。咱們能夠看到 Flink 的反壓實際上是從下游往上游傳播的,一直往上傳播到 Source Task 後,Source Task 最終會下降從 Source 端讀取數據的速率。若是下游 Task C 的負載能力提高後,會及時反饋給 Task B,因而 Task B 會提高往 Task C 發送數據的速率,Task B 又將負載提高的信息反饋給 Task A,Task A 就會提高從 Source 端讀取數據的速率,從而提高整個系統的負載能力。

讀到這裏,咱們應該知道 Flink 爲何須要一個網絡流控機制了,而且知道 Flink 的網絡流控機制必須是一個動態反饋的過程。可是還有如下幾個問題:

  1. 數據具體是怎麼從上游 Producer 端發送到下游 Consumer 端的?
  2. Flink 的動態限流具體是怎麼實現的?下游的負載能力和壓力是如何傳遞給上游的?

咱們帶着這兩個問題,學習下面的 Flink 網絡流控與反壓機制

Flink V1.5 版以前網絡流控介紹

在 Flink V1.5 版以前,其實 Flink 並無刻意作上述所說的動態反饋。那麼問題來了,沒有作上述的動態反饋機制,Flink 難道不怕數據丟失或者上游和下游的一些 Buffer 把內存撐爆嗎?固然不怕了,由於 Flink 已經依賴其餘機制來實現了所謂的動態反饋。其實很簡單,讓咱們繼續往下看。

以下圖所示,對於一個 Flink 任務,動態反饋能夠抽象成如下兩個階段:

  1. 跨 Task,動態反饋如何從下游 Task 的 Receive Buffer 反饋給上游 Task 的 Send Buffer
  • 當下遊 Task C 的 Receive Buffer 滿了,如何告訴上游 Task B 應該下降數據發送速率

  • 當下遊 Task C 的 Receive Buffer 空了,如何告訴上游 Task B 應該提高數據發送速率

  • 注:這裏又分了兩種狀況,Task B 和 Task C 可能在同一臺節點上運行,也有可能不在同一個臺節點運行

    • Task B 和 Task C 在同一臺節點上運行指的是:一臺節點運行了一個或多個 TaskManager,包含了多個 Slot,Task B 和 Task C 都運行在這臺節點上,且 Task B 是 Task C 的上游,給 Task C 發送數據。此時 Task B 給 Task C 發送數據其實是同一個 JVM 內的數據發送,因此不存在網絡通訊
    • Task B 和 Task C 不在同一臺節點上運行指的是:Task B 和 Task C 運行在不一樣的 TaskManager 中,且 Task B 是 Task C 的上游,給 Task C 發送數據。此時 Task B 給 Task C 發送數據是跨節點的,因此會存在網絡通訊
  1. Task 內,動態反饋如何從內部的 Send Buffer 反饋給內部的 Receive Buffer
  • 當 Task B 的 Send Buffer 滿了,如何告訴 Task B 內部的 Receive Buffer 下游 Send Buffer 滿了、下游處理性能不行了?由於要讓 Task B 的 Receive Buffer 感覺到壓力,才能把下游的壓力傳遞到 Task A
  • 當 Task B 的 Send Buffer 空了,如何告訴 Task B 內部的 Receive Buffer 下游 Send Buffer 空了,下游處理性能很強,上游加快處理數據吧
 
簡單的3個Task反壓圖示.png

跨 TaskManager,反壓如何向上遊傳播

先了解一下 Flink 的 TaskManager 之間網絡傳輸的數據流向:

 
Flink 網絡傳輸的數據流向.png

圖中,咱們能夠看到 TaskManager A 給 TaskManager B 發送數據,TaskManager A 作爲 Producer,TaskManager B 作爲 Consumer。Producer 端的 Operator 實例會產生數據,最後經過網絡發送給 Consumer 端的 Operator 實例。Producer 端 Operator 實例生產的數據首先緩存到 TaskManager 內部的 NetWork Buffer。NetWork 依賴 Netty 來作通訊,Producer 端的 Netty 內部有 ChannelOutbound Buffer,Consumer 端的 Netty 內部有 ChannelInbound Buffer。Netty 最終仍是要經過 Socket 發送網絡請求,Socket 這一層也會有 Buffer,Producer 端有 Send Buffer,Consumer 端有 Receive Buffer。

總結一下,如今有兩個 TaskManager A、B,TaskManager A 中 Producer Operator 處理完的數據由 TaskManager B 中 Consumer Operator 處理。那麼 Producer Operator 處理完的數據是怎麼到達 Consumer Operator 的?首先 Producer Operator 從本身的上游或者外部數據源讀取到數據後,對一條條的數據進行處理,處理完的數據首先輸出到 Producer Operator 對應的 NetWork Buffer 中。Buffer 寫滿或者超時後,就會觸發將 NetWork Buffer 中的數據拷貝到 Producer 端 Netty 的 ChannelOutbound Buffer,以後又把數據拷貝到 Socket 的 Send Buffer 中,這裏有一個從用戶態拷貝到內核態的過程,最後經過 Socket 發送網絡請求,把 Send Buffer 中的數據發送到 Consumer 端的 Receive Buffer。數據到達 Consumer 端後,再依次從 Socket 的 Receive Buffer 拷貝到 Netty 的 ChannelInbound Buffer,再拷貝到 Consumer Operator 的 NetWork Buffer,最後 Consumer Operator 就能夠讀到數據進行處理了。這就是兩個 TaskManager 之間的數據傳輸過程,咱們能夠看到發送方和接收方各有三層的 Buffer。

瞭解了數據傳輸流程,咱們再具體瞭解一下跨 TaskManager 的反壓過程,以下圖所示,Producer 端生產數據速率爲 2,Consumer 消費數據速率爲 1。持續下去,下游消費較慢,Buffer 容量又是有限的,那 Flink 反壓是怎麼作的?

 
跨 TaskManager 數據傳輸.png

上面介紹後,咱們知道每一個 Operator 計算數據時,輸出和輸入都有對應的 NetWork Buffer,這個 NetWork Buffer 對應到 Flink 就是圖中所示的 ResultSubPartition 和 InputChannel。ResultSubPartition 和 InputChannel 都是向 LocalBufferPool 申請 Buffer 空間,而後 LocalBufferPool 再向 NetWork BufferPool 申請內存空間。這裏,NetWork BufferPool 是 TaskManager 內全部 Task 共享的 BufferPool,TaskManager 初始化時就會向堆外內存申請 NetWork BufferPool。LocalBufferPool 是每一個 Task 本身的 BufferPool,假如一個 TaskManager 內運行着 5 個 Task,那麼就會有 5 個 LocalBufferPool,但 TaskManager 內永遠只有一個 NetWork BufferPool。Netty 的 Buffer 也是初始化時直接向堆外內存申請內存空間。雖然能夠申請,可是必須明白內存申請確定是有限制的,不可能無限制的申請,咱們在啓動任務時能夠指定該任務最多可能申請多大的內存空間用於 NetWork Buffer。

咱們繼續分析咱們的場景, Producer 端生產數據速率爲2,Consumer 端消費數據速率爲1。數據從 Task A 的 ResultSubPartition 按照上面的流程最後傳輸到 Task B 的 InputChannel 供 Task B 讀取並計算。持續一段時間後,因爲 Task B 消費比較慢,致使 InputChannel 被佔滿了,因此 InputChannel 向 LocalBufferPool 申請新的 Buffer 空間,LocalBufferPool 分配給 InputChannel 一些 Buffer。

 
跨 TaskManager 數據反壓1.png

再持續一段時間後,InputChannel 重複向 LocalBufferPool 申請 Buffer 空間,致使 LocalBufferPool 也滿了,因此 LocalBufferPool 向 NetWork BufferPool 申請 Buffer 空間,NetWork BufferPool 給 LocalBufferPool 分配 Buffer。

 
跨 TaskManager 數據反壓2.png

再持續下去,NetWork BufferPool 滿了,或者說 NetWork BufferPool 不能把本身的 Buffer 全分配給 Task B 對應的 LocalBufferPool ,由於 TaskManager 上通常會運行了多個 Task,每一個 Task 只能使用 NetWork BufferPool 中的一部分。因此,能夠認爲 Task B 把本身可使用的 InputChannel 、 LocalBufferPool 和 NetWork BufferPool 都用完了。此時 Netty 還想把數據寫入到 InputChannel,可是發現 InputChannel 滿了,因此 Socket 層會把 Netty 的 autoRead disable,Netty 不會再從 Socket 中去讀消息。能夠看到下圖中多個 ❌,表示 Buffer 已滿,數據已經不能往下游寫了,發生了阻塞。

 
跨 TaskManager 數據反壓3.png

因爲 Netty 不從 Socket 的 Receive Buffer 讀數據了,因此很快 Socket 的 Receive Buffer 就會變滿,TCP 的 Socket 通訊有動態反饋的流控機制,會把容量爲0的消息反饋給上游發送端,因此上游的 Socket 就不會往下游再發送數據 。

 
跨 TaskManager 數據反壓4.png

Task A 持續生產數據,發送端 Socket 的 Send Buffer 很快被打滿,因此 Task A 端的 Netty 也會中止往 Socket 寫數據。

 
跨 TaskManager 數據反壓5.png

接下來,數據會在 Netty 的 Buffer 中緩存數據,但 Netty 的 Buffer 是無界的。但能夠設置 Netty 的高水位,即:設置一個 Netty 中 Buffer 的上限。因此每次 ResultSubPartition 向 Netty 中寫數據時,都會檢測 Netty 是否已經到達高水位,若是達到高水位就不會再往 Netty 中寫數據,防止 Netty 的 Buffer 無限制的增加。

 
跨 TaskManager 數據反壓6.png

接下來,數據會在 Task A 的 ResultSubPartition 中累積,ResultSubPartition 滿了後,會向 LocalBufferPool 申請新的 Buffer 空間,LocalBufferPool 分配給 ResultSubPartition 一些 Buffer。

 
跨 TaskManager 數據反壓7.png

持續下去 LocalBufferPool 也會用完,LocalBufferPool 再向 NetWork BufferPool 申請 Buffer。

 
跨 TaskManager 數據反壓8.png

而後 NetWork BufferPool 也會用完,或者說 NetWork BufferPool 不能把本身的 Buffer 全分配給 Task A 對應的 LocalBufferPool ,由於 TaskManager 上通常會運行了多個 Task,每一個 Task 只能使用 NetWork BufferPool 中的一部分。此時,Task A 已經申請不到任何的 Buffer 了,Task A 的 Record Writer 輸出就被 wait ,Task A 再也不生產數據。

 
跨 TaskManager 數據反壓9.png

經過上述的這個流程,來動態反饋,保障各個 Buffer 都不會由於數據太多致使內存溢出。上面描述了整個阻塞的流程,當下遊 Task B 持續消費,Buffer 的可用容量會增長,全部被阻塞的數據通道會被一個個打開,以後 Task A 又能夠開始正常的生產數據了。

以前介紹,Task 之間的數據傳輸可能存在上游的 Task A 和下游的 Task B 運行在同一臺節點的狀況,整個流程與上述相似,只不過因爲 Task A 和 B 運行在同一個 JVM,因此不須要網絡傳輸的環節,Task B 的 InputChannel 會直接從 Task A 的 ResultSubPartition 讀取數據。

Task 內部,反壓如何向上遊傳播

假如 Task A 的下游全部 Buffer 都佔滿了,那麼 Task A 的 Record Writer 會被 block,Task A 的 Record Reader、Operator、Record Writer 都屬於同一個線程,因此 Task A 的 Record Reader 也會被 block。

 
Task 內數據反壓1.png

而後能夠把這裏的 Task A 類比成上面所說的 Task B,Task A 上游持續高速率發送數據到 Task A 就會致使可用的 InputChannel、 LocalBufferPool 和 NetWork BufferPool 都會被用完。而後 Netty 、Socket 同理將壓力傳輸到 Task A 的上游。

 
Task 內數據反壓4.png

假設 Task A 的上游是 Task X,那麼 Task A 將壓力反饋給 Task X 的過程與 Task B 將壓力反饋給 Task A 的過程是同樣的。整個 Flink 的反壓是從下游往上游傳播的,一直傳播到 Source Task,Source Task 有壓力後,會下降從外部組件中讀取數據的速率,例如:Source Task 會下降從 Kafka 中讀取數據的速率,來下降整個 Flink Job 中緩存的數據,從而下降負載。

因此得出的結論是:Flink 1.5以前並無特殊的機制來處理反壓,由於 Flink 中的數據傳輸至關於已經提供了應對反壓的機制。

Flink V1.5 版以前的反壓策略存在的問題

看着挺完美的反壓機制,實際上是有問題的。以下圖所示,咱們的任務有4個 SubTask,SubTask A 是 SubTask B的上游,即 SubTask A 給 SubTask B 發送數據。Job 運行在兩個 TaskManager中, TaskManager 1 運行着 SubTask A.1 和 SubTask A.2, TaskManager 2 運行着 SubTask B.3 和 SubTask B.4。如今假如因爲CPU共享或者內存緊張或者磁盤IO瓶頸形成 SubTask B.4 遇到瓶頸、處理速率有所降低,可是上游源源不斷地生產數據,因此致使 SubTask A.2 與 SubTask B.4 產生反壓。

 
V1.5以前存在的問題.png

這裏須要明確一點:不一樣 Job 之間的每一個(遠程)網絡鏈接將在 Flink 的網絡堆棧中得到本身的TCP通道。 可是,若是同一 Task 的不一樣 SubTask 被安排到同一個TaskManager,則它們與其餘 TaskManager 的網絡鏈接將被多路複用並共享一個TCP信道以減小資源使用。例如,圖中的 A.1 -> B.三、A.1 -> B.四、A.2 -> B.三、A.2 -> B.4 這四條將會多路複用共享一個 TCP 信道。

如今 SubTask B.3 並無壓力,從上面跨 TaskManager 的反壓流程,咱們知道當上圖中 SubTask A.2 與 SubTask B.4 產生反壓時,會把 TaskManager1 端該任務對應 Socket 的 Send Buffer 和 TaskManager2 端該任務對應 Socket 的 Receive Buffer 佔滿,多路複用的 TCP 通道已經被佔住了,會致使 SubTask A.1 和 SubTask A.2 要發送給 SubTask B.3 的數據全被阻塞了,從而致使原本沒有壓力的 SubTask B.3 如今接收不到數據了。因此,Flink 1.5 版以前的反壓機制會存在當一個 Task 出現反壓時,可能致使其餘正常的 Task 接收不到數據。

Credit的反壓策略實現原理

Flink 1.5 以後,爲了解決上述所描述的問題,引入了基於 Credit 的反壓機制。以下圖所示,反壓機制做用於 Flink 的應用層,即在 ResultSubPartition 和 InputChannel 這一層引入了反壓機制。每次上游 SubTask A.2 給下游 SubTask B.4 發送數據時,會把 Buffer 中的數據和上游 ResultSubPartition 堆積的數據量 Backlog size發給下游,下游會接收上游發來的數據,並向上遊反饋目前下游如今的 Credit 值,Credit 值表示目前下游能夠接收上游的 Buffer 量,1 個Buffer 等價於 1 個 Credit 。

 
Credit 數據傳輸.png

例如,上游 SubTask A.2 發送完數據後,還有 5 個 Buffer 被積壓,那麼會把發送數據和 Backlog size = 5 一塊發送給下游 SubTask B.4,下游接受到數據後,知道上游積壓了 5 個Buffer,因而向 Buffer Pool 申請 Buffer,因爲容量有限,下游 InputChannel 目前僅有 2 個 Buffer 空間,因此,SubTask B.4 會向上遊 SubTask A.2 反饋 Channel Credit = 2。而後上游下一次最多隻給下游發送 2 個 Buffer 的數據,這樣每次上游發送的數據都是下游 InputChannel 的 Buffer 能夠承受的數據量,因此經過這種反饋策略,保證了不會在公用的 Netty 和 TCP 這一層數據堆積而影響其餘 SubTask 通訊。

ResultSubPartition 會把 buffer 和 backlog size 同時發送給下游,下游向上遊反饋 credit。再用一個案例來詳細地描述一下整個過程。

 
Task 內數據反壓1.png

Task A 向 Task B 發送了數據 <8,9> 和 backlog size =3,下游 InputChannel 接受完 <8,9> 後,發現上游目前積壓了 3 條數據,可是本身的緩衝區不夠,因而向 LocalBufferPool 申請 buffer 空間,申請成功後,向上遊反饋 credit = 3,表示下游目前能夠接受 3 條記錄(其實是以 Buffer 爲單位,而不是記錄數,Flink 將真實記錄序列化後的二進制數據放到 Buffer 中),而後上游下次最多發送 3 條數據給下游。

 
Credit based反壓過程2.png

持續下去,上游生產數據速率比下游消費速率快,因此 LocalBufferPool 和 NetWork BufferPool 都會被申請完,下游的 InputChannel 沒有可用的緩衝區了,因此會向上遊反饋 credit = 0,而後上游就不會發送數據到 Netty。因此基於 Credit 的反壓策略不會致使 Netty 和 Socket 的數據積壓。固然上游也不會一直不發送數據到下游,上游會按期地僅發送 backlog size 給下游,直到下游反饋 credit > 0 時,上游就會繼續發送真正的數據到下游了。

 
Credit based反壓過程3.png

基於 Credit 的反壓機制還帶來了一個優點:因爲咱們在發送方和接收方之間緩存較少的數據,可能會更早地將反壓反饋給上游,緩衝更多數據只是把數據緩衝在內存中,並無提升處理性能。

Flink 如何在吞吐量和延遲之間作權衡?

Flink 自然支持流式處理,即每來一條數據就能處理一條,而不是像 Spark Streaming 同樣,徹底是微批處理。可是爲了提升吞吐量,默認使用的 Flink 並非每來一條數據就處理一條。那這個究竟是怎麼控制的呢?

咱們分析了上述的網絡傳輸後,知道每一個 SubTask 輸出的數據並非直接輸出到下游,而是在 ResultSubPartition 中有一個 Buffer 用來緩存一批數據後,再 Flush 到 Netty 發送到下游 SubTask。那到底哪些狀況會觸發 Buffer Flush 到 Netty 呢?

  1. Buffer 變滿時

  2. Buffer timeout 時

  3. 特殊事件來臨時,例如:CheckPoint 的 barrier 來臨時

Flink 在數據傳輸時,會把數據序列化成二進制而後寫到 Buffer 中,當 Buffer 滿了,須要 Flush(默認爲32KiB,經過taskmanager.memory.segment-size設置)。可是當流量低峯或者測試環節,可能1分鐘都沒有 32 KB的數據,就會致使1分鐘內的數據都積攢在 Buffer 中不會發送到下游 Task 去處理,從而致使數據出現延遲,這並非咱們想看到的。因此 Flink 有一個 Buffer timeout 的策略,意思是當數據量比較少,Buffer 一直沒有變滿時,後臺的 Output flusher 線程會強制地將 Buffer 中的數據 Flush 到下游。Flink 中默認 timeout 時間是 100ms,即:Buffer 中的數據要麼變滿時 Flush,要麼最多等 100ms 也會 Flush 來保證數據不會出現很大的延遲。固然這個能夠經過 env.setBufferTimeout(timeoutMillis) 來控制超時時間。

  • timeoutMillis > 0 表示最長等待 timeoutMillis 時間,就會flush
  • timeoutMillis = 0 表示每條數據都會觸發 flush,直接將數據發送到下游,至關於沒有Buffer了(避免設置爲0,可能致使性能降低)
  • timeoutMillis = -1 表示只有等到 buffer滿了或 CheckPoint的時候,纔會flush。至關於取消了 timeout 策略

嚴格來說,Output flusher 不提供任何保證——它只向 Netty 發送通知,而 Netty 線程會按照能力與意願進行處理。這也意味着若是存在反壓,則 Output flusher 是無效的。言外之意,若是反壓很嚴重,下游 Buffer 都滿了,固然不能強制一直往下游發數據。

一些特殊的消息若是經過 RecordWriter 發送,也會觸發當即 Flush 緩存的數據。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應該儘快被髮送,而不該該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。固然若是出現反壓,CheckPoint barrier 也會等待,不能發送到下游。

引入 Network buffers 以得到更高的資源利用率和更高的吞吐量,代價是讓一些記錄在 Buffer 中等待一段時間。雖然能夠經過緩衝區超時給出此等待時間的上限,但你可能知道有關這兩個維度(延遲和吞吐量)之間權衡的更多信息:顯然,沒法同時得到這二者。下圖是 Flink 官網的博客展現的不一樣的 buffer timeout 下對應的吞吐量,從0毫秒開始(每一個記錄都 flush)到100毫秒(默認值),測試在具備 100 個節點每一個節點 8 個 Slot 的羣集上運行,每一個節點運行沒有業務邏輯的 Task,所以只用於測試網絡協議棧。爲了進行比較,還測試了低延遲改進以前的 Flink 1.4 版本。

 
flink-network-stack9.png

如圖,使用 Flink 1.5+,即便是很是低的 Buffer timeout(例如1ms,對於低延遲場景)也提供高達超時默認參數(100ms)75% 的最大吞吐,但會緩存更少的數據。可是筆者仍然不理解爲何 timeout 設置爲0時,吞吐量居然能比 Flink 1.4 的吞吐量提升那麼多。Credit 只是解決了反壓的問題,並不能優化低延遲的吞吐量。楊華老師的回答是網絡協議棧作了其餘優化並且性能測試是在特定場景下作的。筆者後續會繼續深刻學習研究 Flink 網絡通訊來解決筆者目前的疑問。

參考文獻:

Flink官網

flink-china系列課程----2.7 Flink網絡流控及反壓剖析

Flink 官網兩篇關於 Flink 網絡協議棧的博客:

A Deep-Dive into Flink's Network Stack

Flink Network Stack Vol. 2: Monitoring, Metrics, and that Backpressure Thing

相關文章
相關標籤/搜索