Flink中發送端反壓以及Credit機制(源碼分析)

上一篇《Flink接收端反壓機制》說到由於Flink每一個Task的接收端和發送端是共享一個bufferPool的,造成了自然的反壓機制,當Task接收數據的時候,接收端會根據積壓的數據量以及可用的buffer數量(可用的memorySegment數)來決定是否向上遊發送Credit(簡而言之就是當我還有空間的時候,我向上游也就是上一個Task的發送端發送一個ack消息,代表我還有空間你能夠發送數據過來,若是下游沒有給你Credit就證實下游已經堵了,沒有空間了也就不能繼續往下游發送了)html

如今從源碼來看一下Task的數據發送端,也就是Netty的Server端的實現java

先看Task初始化的時候TaskManagerRunner.java中startTaskManager()方法中bootstrap

 這個connectionManager其實分爲兩種,Netty,local一看就知道netty這種確定是對應須要經過網絡傳輸,本地模式這裏就不講了網絡

 

這個地方看到Flink的client和server都初始化了,須要注意的是其實這個地方client端只是初始化了一些配置,並無調用bind()方法啓動起來,這裏看過上一篇文章的同窗就會知道,client只有當第一次須要拉取上游subpatition數據的時候纔會啓動起來也就是bind(),tcp

而server端在這裏也就是task啓動的時候就啓動起來了,繼續看server端如何啓動的server.init()方法spa

 

 init方法中,這裏能夠看到,這是Flink1.6之前只有基於netty的tcp網絡層反壓,這裏是經過bootstrap的兩個參數3d

    ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小爲兩倍的memorySegment大小netty

    ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小爲memorySegment + 1server

接着htm

  

       

1處2處常規的Netty定長編解碼器,沒有什麼好說的

看看3處,4這裏先不講後面會提到

 看到3是一個inboundHandler,反壓機制時他的用處是用來接收來自下游響應的Credit,來看他的ChannelRead0方法

 當接收到的消息是一個Credit信任的時候

 

 

 先是

 

 

 增長了這個reader的可用的Credit可用數

而後

 

 

 其實瞭解了接收端的反壓,發送端接收到了下游的credit,那發送數據的時候確定有一個地方會先判斷是否有可用的Credit才決定是否往下發數據

其實就是這個帶星號的地方判斷,而後下面就是常規的從queue中拉取reader往netty下游writeAndFlash()數據了,沒什麼好講的

來看一下他判斷Credit是否知足的地方

 

 

能夠看到只有當

    有數據且可用的Credit數量大於0

    或者有數據且數據是一個事件而不是record的時候,才返回true往下游發送

 

能夠看到這個 enqueueAvailableReader()方法比較重要,裏面包含了判斷credit以及日後下游發送數據的邏輯

那這個enqueueAvailableReader()方法除了會在接收到下游的Credit的時候觸發一次,還有哪會被觸發呢

既然是往下游發送數據那咱們task處理完數據之後應該也會調用這個方法

因而來看一下Task發送數據,之前的文章講過,這裏就不贅述了,直接看到RecordWriterOutput的emit()

 

 

 

 

 

會先將record寫入到這個Serializer裏面去

而後copyFromSerializerToTargetChannel()方法中

 

 

 先去localBufferPool中請求buffer,這裏就是反壓了

請求到buffer了之後

 

 

這個調用鏈有點長不全列出來了

最後

 

 

 這個requestQueue實際上是前面Netty初始化時具體邏輯中的4,是一個ChannelInboundHandlerAdapter

 

 

 

 這個Inbound一開始我也很疑惑,這個Inbound沒有重寫他的channelRead()方法,那這個不就直接轉發數據了嗎,那他的做用是幹嗎的呢

繼續往下看

 

 

原來發送數據的時候會觸發這個inbound的eventTrigger

看下userEventTriggered()具體觸發了什麼

 

這個方法就很眼熟了,就是前面到接收到下游發送過來的Credit時會觸發一次的方法,用來判斷是否有Credit以及經過netty往下游發送數據

這裏在發送數據的時候果真又觸發了,後面就是判斷是否有Credit知足往下游發送數據的條件,而後往下游發送數據

也就是說

    當接收到下游返回的Credit的時候會觸發一次,是否能往下游寫數據的判斷並拉queue數據寫數據

    每次Task處理完數據之後emit,也會觸發一次判斷並拉queue數據寫數據

相關文章
相關標籤/搜索