先上一張圖總體瞭解Flink中的反壓java
能夠看到每一個task都會有本身對應的IG(inputgate)對接上游發送過來的數據和RS(resultPatation)對接往下游發送數據, 整個反壓機制經過inputgate,resultPatation公用一個必定大小的memorySegmentPool來實現(Flink中memorySegment做爲內存使用的抽象,類比bytebuffer), 公用一個pool當接收上游數據時Decoder,往下游發送數據時Encoder,都會向pool中請求內存memorySegment 。由於是公共pool,也就是說運行時,當接受的數據佔用的內存多了,往下游發送的數據就少了,這樣是個什麼樣的狀況呢?緩存
好比說你sink端堵塞了,背壓了寫不進去,那這個task的resultPatation沒法發送數據了,也就沒法釋放memorySegment了,相應的用於接收數據的memorySegment就會愈來愈少,直到接收數據端拿不到memorySegment了,也就沒法接收上游數據了,既然這個task沒法接收數據了,天然引發這個task的上一個task數據發送端沒法發送,那上一個task又反壓了,因此這個反壓從發生反壓的地方,依次的往上游擴散直到source,這個就是flink的自然反壓。tcp
從源碼來看一下flink是如何實現的編碼
來到數據接收的地方StreamInputProcessor.java中processInput()方法中spa
這裏經過經過handler的getNextNonBlocked()方法獲取到了bufferOrEvent後面就會將這個bufferOrEvent解析成record數據而後使用用戶的代碼處理了3d
其實這裏的handler分爲兩種netty
區別主要是barrierbuffer實現了barrier對齊的數據緩存,用於實現一次語義,這裏之後隨緣更新到容錯機制的時候講code
來看一下getNextNonBlocked()方法對象
這個看到了經過會經過上游inputGate獲取數據,具體看一下getNextBufferOrEvent()其中有兩個比較重要的調用blog
先看requestPartitions()
先遍歷了全部的inputchannel而後調用了requestSubpartition()在其中
先看一下1處,這裏返回了一個Netty的Client來看一下createPartitionRequestClient是怎麼建立的
能夠看到源碼的描述,這裏其實就是建立與上游發送數據端的tcp鏈接的client端,用來接收上游數據的
接着
這裏若是已經創建TCP鏈接就直接拿,與上游尚未創建tcp鏈接的話就會先初始化Client端,經過這個connect()方法
來看一下第一次是如何初始化鏈接的
看到這個應該熟悉Netty的同窗一眼就瞭解了,在1處就是Client的具體邏輯了,而後與上游端口創建鏈接
來看一下具體的Client端具體的邏輯,這裏最好對netty有必定的認識
2處是用於Decoder的ChannelinboundHandler常規的解碼器沒有什麼好說的
PartitionRequestClientHandler: 不帶信任機制的
CreditBasedPartitionRequestClientHandler:帶credit信任機制的
這裏取出了全部的帶有信任的上游inputChannel而且向其響應發送了一個Credit對象
那帶Credit機制的handler什麼時候觸發userEventTriggered()來觸發向上遊發送Credit呢?
先不慌,先來看下client接收到數據後作了什麼,看下Nettyclient端的channelRead()方法(這裏只看credit機制的)
decodeMsg()方法中
decodeBufferOrEvent()方法
在沒有Credit機制的PartitionRequestClientHandler中
requestBuffer()方法就是請求memorySegmentPool中的memorySegment
這裏不能確保能獲取到,因此會用一個while(true)一直掛着
在Credit機制的CreditBasedPartitionRequestClientHandler中
請求requestBuffer()方法就是請求memorySegmentPool中的memorySegment由於信任機制在請求前就已經保證有足夠的memorySegment因此不會請求不到,這裏請求不到直接就拋異常了
而後OnBuffer( )方法
1處將將這個buffer加入到了這個receivedBuffers的ArrayDeque中,這裏要注意receivedBuffers,這個queue後面會用到(後面處理數據就是循環的從這個queue中poll拉數據出來)
這裏還要注意onBuffer方法還傳入了backlog參數,這裏是一個積壓的數據量
接着會根據積壓的數據量
當可用的buffer數 <(擠壓的數據量 + 已經分配給信任Credit的buffer量) 時,就會向Pool中繼續請求buffer,這裏請求不到也會一直while造成柱塞反壓
而後經過notifyCreditAvailable()方法發送Credit,具體來看一下
可用看到這裏就觸發了前面說到的向上遊發送Credit的方法了
到這裏,Nettyclient端的初始化以及Netty的處理邏輯就講完了
如今回到最最開始的地方
requestPartition()那裏建立nettyclient後
currentChannel.getNextBuffer()方法中
前面咱們說到的NettyClient端channelRead讀取數據後會把數據放到一個recivedBuffers的queue中,這裏就是去那個queue中取數據而後返回到咱們的 數據接收的地方StreamInputProcessor.java中processInput()方法中的獲得上游數據之後,就是開始執行咱們用戶的代碼了調用processElement方法了。
而後while(true)開始了下一輪拉取數據而後處理的過程