上一篇《Flink接收端反壓機制》說到由於Flink每一個Task的接收端和發送端是共享一個bufferPool的,造成了自然的反壓機制,當Task接收數據的時候,接收端會根據積壓的數據量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊發送Credit(簡而言之就是當我還有空間的時候,我向上游也就是上一個Task的發送端發送一個ack消息,代表我還有空間你能夠發送數據過來,若是下游沒有給你Credit就證實下游已經堵了,沒有空間了也就不能繼續往下游發送了)html
如今從源碼來看一下Task的數據發送端,也就是Netty的Server端的實現java
先看Task初始化的時候TaskManagerRunner.java中startTaskManager()方法中bootstrap
這個connectionManager其實分爲兩種,Netty,local一看就知道netty這種確定是對應須要經過網絡傳輸,本地模式這裏就不講了網絡
這個地方看到Flink的client和server都初始化了,須要注意的是其實這個地方client端只是初始化了一些配置,並無調用bind()方法啓動起來,這裏看過上一篇文章的同窗就會知道,client只有當第一次須要拉取上游subpatition數據的時候纔會啓動起來也就是bind(),tcp
而server端在這裏也就是task啓動的時候就啓動起來了,繼續看server端如何啓動的server.init()方法spa
init方法中,這裏能夠看到,這是Flink1.6之前只有基於netty的tcp網絡層反壓,這裏是經過bootstrap的兩個參數3d
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK 大小爲兩倍的memorySegment大小netty
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK 大小爲memorySegment + 1server
接着htm
1處2處常規的Netty定長編解碼器,沒有什麼好說的
看看3處,4這裏先不講後面會提到
看到3是一個inboundHandler,反壓機制時他的用處是用來接收來自下游響應的Credit,來看他的ChannelRead0方法
當接收到的消息是一個Credit信任的時候
先是
增長了這個reader的可用的Credit可用數
而後
其實瞭解了接收端的反壓,發送端接收到了下游的credit,那發送數據的時候確定有一個地方會先判斷是否有可用的Credit才決定是否往下發數據
其實就是這個帶星號的地方判斷,而後下面就是常規的從queue中拉取reader往netty下游writeAndFlash()數據了,沒什麼好講的
來看一下他判斷Credit是否知足的地方
能夠看到只有當
有數據且可用的Credit數量大於0
或者有數據且數據是一個事件而不是record的時候,才返回true往下游發送
能夠看到這個 enqueueAvailableReader()方法比較重要,裏面包含了判斷credit以及日後下游發送數據的邏輯
那這個enqueueAvailableReader()方法除了會在接收到下游的Credit的時候觸發一次,還有哪會被觸發呢
既然是往下游發送數據那咱們task處理完數據之後應該也會調用這個方法
因而來看一下Task發送數據,之前的文章講過,這裏就不贅述了,直接看到RecordWriterOutput的emit()
會先將record寫入到這個Serializer裏面去
而後copyFromSerializerToTargetChannel()方法中
先去localBufferPool中請求buffer,這裏就是反壓了
請求到buffer了之後
這個調用鏈有點長不全列出來了
最後
這個requestQueue實際上是前面Netty初始化時具體邏輯中的4,是一個ChannelInboundHandlerAdapter
這個Inbound一開始我也很疑惑,這個Inbound沒有重寫他的channelRead()方法,那這個不就直接轉發數據了嗎,那他的做用是幹嗎的呢
繼續往下看
原來發送數據的時候會觸發這個inbound的eventTrigger
看下userEventTriggered()具體觸發了什麼
這個方法就很眼熟了,就是前面到接收到下游發送過來的Credit時會觸發一次的方法,用來判斷是否有Credit以及經過netty往下游發送數據
這裏在發送數據的時候果真又觸發了,後面就是判斷是否有Credit知足往下游發送數據的條件,而後往下游發送數據
也就是說
當接收到下游返回的Credit的時候會觸發一次,是否能往下游寫數據的判斷並拉queue數據寫數據
每次Task處理完數據之後emit,也會觸發一次判斷並拉queue數據寫數據