Flink 反壓 淺入淺出

前言

微信搜【Java3y】關注這個樸實無華的男人,點贊關注是對我最大的支持!git

文本已收錄至個人GitHubhttps://github.com/ZhongFuCheng3y/3y,有300多篇原創文章,最近在連載面試和項目系列!github

最近一直在遷移Flink相關的工程,期間也踩了些坑,checkpoint反壓是其中的一個。面試

敖丙太菜了,Flink都不會,只能我本身來了。看敖丙只能圖一樂,學技術仍是得看三歪數據庫

平時敖丙黑我都沒啥水平,拿點簡單的東西來就說我不會。我是敖丙的頭號黑粉apache

今天來分享一下 Flinkcheckpoint機制和背壓原理,我相信經過這篇文章,你們在玩Flink的時候能夠更加深入地瞭解Checkpoint是怎麼實現的,而且在設置相關參數以及使用的時候能夠更加地駕輕就熟。微信

上一篇已經寫過Flink的入門教程了,若是還不瞭解Flink的同窗能夠先去看看:《Flink入門教程架構

前排提醒,本文基於Flink 1.7併發

淺入淺出學習Flink的背壓知識》ide

開胃菜

在講解FlinkcheckPoint背壓機制以前,咱們先來看下checkpoint背壓的相關基礎,有助於後面的理解。學習

做爲用戶,咱們寫好Flink的程序,上管理平臺提交,Flink就跑起來了(只要程序代碼沒有問題),細節對用戶都是屏蔽的。

實際上大體的流程是這樣的:

  1. Flink會根據咱們所寫代碼,會生成一個StreamGraph的圖出來,來表明咱們所寫程序的拓撲結構。

  2. 而後在提交的以前會將StreamGraph這個圖優化一把(能夠合併的任務進行合併),變成JobGraph

  3. JobGraph提交給JobManager

  4. JobManager收到以後JobGraph以後會根據JobGraph生成ExecutionGraphExecutionGraphJobGraph 的並行化版本)

  5. TaskManager接收到任務以後會將ExecutionGraph生成爲真正的物理執行圖

能夠看到物理執行圖真正運行在TaskManagerTransformSink之間都會有ResultPartitionInputGate這倆個組件,ResultPartition用來發送數據,而InputGate用來接收數據。

屏蔽掉這些Graph,能夠發現Flink的架構是:Client->JobManager->TaskManager

從名字就能夠看出,JobManager是幹「管理」,而TaskManager是真正幹活的。回到咱們今天的主題,checkpoint就是由JobManager發出。

Flink自己就是有狀態的,Flink可讓你選擇執行過程當中的數據保存在哪裏,目前有三個地方,在Flink的角度稱做State Backends

  • MemoryStateBackend(內存)

  • FsStateBackend(文件系統,通常是HSFS)

  • RocksDBStateBackend(RocksDB數據庫)

一樣的,checkpoint信息也是保存在State Backends

耗子屎

最近在Storm遷移Flink的時候遇到個問題,我來簡單描述一下背景。

咱們從各個數據源從清洗出數據,藉助Flink清洗,組裝成一個寬模型,最後交由kylin作近實時數據統計和展現,供運營實時查看。

遷移的過程當中,發現訂單的topic消費延遲了很久,初步懷疑是由於訂單上游的併發度不夠所影響的,因此調整了兩端的並行度從新發布一把。

發佈的過程當中,系統起來之後,再去看topic 消費延遲的監控,就懵逼了。什麼?怎麼這麼久了啊?絲毫沒有降下去的意思。

這時候只能找組內的大神去尋求幫忙了,他排查一番後表示:這checkpoint一直沒作上,都堵住了,從新發布的時候只會在上一次checkpoint開始,因爲checkpoint長時間沒完成掉,因此從新發布數據量會很大。這沒啥好辦法了,只能在這個堵住的環節下扔掉吧,估計是業務邏輯出了問題。

畫外音:接收到訂單的數據,會去溯源點擊,判斷該訂單從哪一個業務來,通過了哪些的業務,最終是哪塊業務導致該訂單成交。

畫外音:外部真正使用時,依賴「訂單結果HBase」數據

咱們認爲點擊的數據有可能會比訂單的數據處理要慢一會,因此找不到的數據會間隔一段時間輪詢,又由於Flink提供State「狀態」 和checkpoint機制,咱們把找不到的數據放入ListState按必定的時間輪詢就行了(即使系統因爲重啓或其餘緣由掛了,也不會把數據丟了)。

理論上只要沒問題,這套方案是可行的。但如今結果告訴咱們:訂單數據報來了之後,一小批量數據一直在「訂單結果HBase」沒找到數據,就放置到ListState上,而後來一條數據就去遍歷ListState。致使的後果就是:

  • 數據消費不過來,造成反壓

  • checkpoint一直沒成功

當時處理的方式就是把ListState清空掉,暫時丟掉這一部分的數據,讓數據追上進度。

後來排查後發現是上游在消息報字段上作了「手腳」,解析失敗致使點擊丟失,形成這一連鎖的後果。

排查問題的關鍵是理解Flink反壓checkpoint的原理是什麼樣的,下面我來說述一下。

反壓

反壓backpressure是流式計算中很常見的問題。它意味着數據管道中某個節點成爲瓶頸,處理速率跟不上「上游」發送數據的速率,上游須要進行限速

上面的圖表明瞭是反壓極簡的狀態,說白了就是:下游處理不過來了,上游得慢點,要堵了!

最使人好奇的是:「下游是怎麼通知上游要發慢點的呢?

在前面Flink的基礎知識講解,咱們能夠看到ResultPartition用來發送數據,InputGate用來接收數據。

Flink在一個TaskManager內部讀寫數據的時候,會有一個BufferPool(緩衝池)供該TaskManager讀寫使用(一個TaskManager共用一個BufferPool),每一個讀寫ResultPartition/InputGate都會去申請本身的LocalBuffer

以上圖爲例,假設下游處理不過來,那InputGateLocalBuffer是否是被填滿了?填滿了之後,ResultPartition是否是沒辦法往InputGate發了?而ResultPartition無法發的話,它本身自己的LocalBuffer 也早晚被填滿,那是否是依照這個邏輯,一直到Source就不會拉數據了...

這個過程就猶如InputGate/ResultPartition都開了本身的有界阻塞隊列,反正「我」就只能處理這麼多,往我這裏發,我滿了就堵住唄,造成連鎖反應一直堵到源頭上...

上面是隻有一個TaskManager的狀況下的反壓,那多個TaskManager呢?(畢竟咱們不少時候都是有多個TaskManager在爲咱們工做的)

咱們再看回Flink通訊的整體數據流向架構圖:

從圖上能夠清洗地發現:遠程通訊用的Netty,底層是TCP Socket來實現的。

因此,從宏觀的角度看,多個TaskManager只不過多了兩個Buffer(緩衝區)。

按照上面的思路,只要InputGateLocalBuffer被打滿,Netty Buffer也早晚被打滿,而Socket Buffer一樣早晚也會被打滿(TCP 自己就帶有流量控制),再反饋到ResultPartition上,數據又又又發不出去了...致使整條數據鏈路都存在反壓的現象。

如今問題又來了,一個TaskManagertask但是有不少的,它們都共用一個TCP Buffer/Buffer Pool,那隻要其中一個task的鏈路存在問題,那不致使整個TaskManager跟着遭殃?

Flink 1.5版本以前,確實會有這個問題。而在Flink 1.5版本以後則引入了credit機制。

從上面咱們看到的Flink所實現的反壓,宏觀上就是直接依賴各個Buffer是否滿了,若是滿了則沒法寫入/讀取致使連鎖反應,直至Source端。

credit機制,實際上能夠簡單理解爲以「更細粒度」去作流量控制:每次InputGate會告訴ResultPartition本身還有多少的空閒量能夠接收,讓ResultPartition看着發。若是InputGate告訴ResultPartition已經沒有空閒量了,那ResultPartition就不發了。

那其實是怎麼實現的呢?擼源碼!

在擼源碼以前,咱們再來看看下面物理執行圖:實際上InPutGate下是InputChannelResultPartition下是ResultSubpartition(這些在源碼中都有體現)。

InputGate(接收端處理反壓)

咱們先從接收端看起吧。Flink接收數據的方法在org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

隨後定位處處理反壓的邏輯:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

進去getNextNonBlocked()方法看(選擇的是BarrierBuffer實現):

咱們就直接看null的狀況,看下從初始化階段開始是怎麼搞的,進去getNextBufferOrEvent()

進去方法裏面看到兩個比較重要的調用:

requestPartitions();

result = currentChannel.getNextBuffer();

先從requestPartitions()看起吧,發現裏邊套了一層(從InputChannel下獲取到subPartition):

因而再進requestSubpartition()(看RemoteInputChannel的實現吧)

在這裏看起來就是建立Client端,而後接收上游發送過來的數據:

先看看client端的建立姿式吧,進createPartitionRequestClient()方法看看(咱們看Netty的實現)。

點了兩層,咱們會進到createPartitionRequestClient()方法,看源碼註釋就能夠清晰發現,這會建立TCP鏈接而且建立出Client供咱們使用

咱們仍是看null的狀況,因而定位到這裏:

進去connect()方法看看:

咱們就看看具體生成邏輯的實現吧,因此進到getClientChannelHandlers

意外發現源碼還有個通訊簡要流程圖給咱們看(哈哈哈):

好了,來看看getClientChannelHandlers方法吧,這個方法不長,主要判斷了下要生成的client是否開啓creditBased機制:

public ChannelHandler[] getClientChannelHandlers() {
  NetworkClientHandler networkClientHandler =
   creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
    new PartitionRequestClientHandler();
  return new ChannelHandler[] {
   messageEncoder,
   new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
   networkClientHandler};
 }

因而咱們的networkClientHandler實例是CreditBasedPartitionRequestClientHandler

到這裏,咱們暫且就認爲Client端已經生成完了,再退回去getNextBufferOrEvent()這個方法,requestPartitions()方法是生成接收數據的Client端,具體的實例是CreditBasedPartitionRequestClientHandler

下面咱們進getNextBuffer()看看接收數據具體是怎麼處理的:

拿到數據後,就會開始執行咱們用戶的代碼了調用process方法了(這裏咱們先不看了)。仍是回到反壓的邏輯上,咱們好像還沒看到反壓的邏輯在哪裏。重點就是receivedBuffers這裏,是誰塞進去的呢?

因而咱們回看到Client具體的實例CreditBasedPartitionRequestClientHandler,打開方法列表一看,感受就是ChannelRead()沒錯了:

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  try {
   decodeMsg(msg);
  } catch (Throwable t) {
   notifyAllChannelsOfErrorAndClose(t);
  }
 }

跟着decodeMsg繼續往下走吧:

繼續下到decodeBufferOrEvent()

繼續下到onBuffer

因此咱們往onSenderBacklog上看看:

最後調用notifyCreditAvailableCredit往上游發送:

public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
  ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
 }

最後再畫張圖來理解一把(關鍵鏈路):

ResultPartition(發送端處理反壓)

發送端咱們從org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager開始看起

因而咱們進去看fromConfiguration()

進去start()去看,隨後進入connectionManager.start()(仍是看Netty的實例):

image-20201206141859451

image-20201206141859451

進去看service.init()方法作了什麼(又看到熟悉的身影):

好了,咱們再進去getServerChannelHandlers()看看吧:

有了上面經驗的咱們,直接進去看看它的方法,沒錯,又是channnelRead,只是此次是channelRead0

ok,咱們進去addCredit()看看:

reader.addCredit(credit)只是更新了下數量

public void addCredit(int creditDeltas) {
  numCreditsAvailable += creditDeltas;
 }

重點咱們看下enqueueAvailableReader() 方法,而enqueueAvailableReader()的重點就是判斷Credit是否足夠發送

isAvailable的實現也很簡單,就是判斷Credit是否大於0且有真實數據可發

writeAndFlushNextMessageIfPossible實際上就是往下游發送數據:

拿數據的時候會判斷Credit是否足夠,不足夠拋異常:

再畫張圖來簡單理解一下:

背壓總結

「下游」的處理速度跟不上「上游」的發送速度,從而下降了處理速度,看似是很美好的(畢竟看起來就是幫助咱們限流了)。

但在Flink裏,背壓再加上Checkponit機制,頗有可能致使State狀態一直變大,拖慢完成checkpoint速度甚至超時失敗。

checkpoint處理速度延遲時,會加重背壓的狀況(極可能大多數時間都在處理checkpoint了)。

checkpoint作不上時,意味着重啓Flink應用就會從上一次完成checkpoint從新執行(...

舉個我真實遇到的例子:

我有一個Flink任務,我只給了它一臺TaskManager去執行任務,在更新DB的時候發現會有併發的問題。

只有一臺TaskManager定位問題很簡單,稍微定位了下判斷:我更新DB的Sink 並行度調高了。

若是Sink的並行度設置爲1,那確定沒有併發的問題,但這樣處理起來太慢了。

因而我就在Sink以前根據userId進行keyBy(相同的userId都由同一個Thread處理,那這樣就沒併發的問題了)

看似很美好,但userId存在熱點數據的問題,致使下游數據處理造成反壓。本來一次checkpoint執行只須要30~40ms反壓後一次checkpoint須要2min+

checkpoint執行間隔相對頻繁(6s/次),執行時間2min+,最終致使數據一直處理不過來,整條鏈路的消費速度從原來的3000qps到背壓後的300qps,一直堵住(程序沒問題,就是處理速度大大降低,影響到數據的最終產出)。

最後

原本想着這篇文章把反壓和Checkpoint都一塊兒寫了,但寫着寫着發現有點長了,那checkpoint開下一篇吧。

相信我,只要你用到Flink,早晚會遇到這種問題的,如今可能有的同窗還沒看懂,不要緊,先點個贊👍,收藏起來,後面就用得上了。

參考資料:

三歪把【大廠面試知識點】、【簡歷模板】、【原創文章】所有整理成電子書,共有1263頁!點擊下方連接直接取就行了

PDF文檔的內容均爲手打,有任何的不懂均可以直接來問我

相關文章
相關標籤/搜索