單個 TaskManager 上的緩衝區總數一般不須要配置。須要配置時請參閱配置網絡緩衝區文檔。html
每當子任務的發送緩衝池耗盡時——也就是緩存駐留在結果子分區的緩存隊列中或更底層的基於 Netty 的網絡棧中時——生產者就被阻塞了,沒法繼續工做,並承受背壓。接收器也是相似:較底層網絡棧中傳入的 Netty 緩存須要經過網絡緩衝區提供給 Flink。若是相應子任務的緩衝池中沒有可用的網絡緩存,Flink 將在緩存可用前中止從該通道讀取。這將對這部分多路傳輸鏈路發送的全部子任務形成背壓,所以也限制了其餘接收子任務。下圖中子任務 B.4 過載了,它會對這條多路傳輸鏈路形成背壓,還會阻止子任務 B.3 接收和處理新的緩存。java
爲了防止這種狀況發生,Flink 1.5 引入了本身的流量控制機制。git
基於信用的流量控制可確保「線上」的任何內容都能被接收器處理。它是 Flink 原有機制的天然拓展,基於網絡緩衝區的可用性實現。每一個遠程輸入通道如今都有本身的一組獨佔緩衝區,而非使用共享的本地緩衝池。而本地緩衝池中的緩存稱爲浮動緩存,由於它們會浮動並可用於全部輸入通道。github
接收器將緩存的可用性聲明爲發送方的信用(1 緩存 = 1 信用)。每一個結果子分區將跟蹤其通道信用值。若是信用可用,則緩存僅轉發到較底層的網絡棧,而且發送的每一個緩存都會讓信用值減去一。除了緩存外,咱們還發送有關當前backlog大小的信息,從而指定在此子分區的隊列中等待的緩存數量。接收器將使用它來請求適當數量的浮動緩衝區,以便更快處理 backlog。它將嘗試獲取與 backlog 大小同樣多的浮動緩衝區,但有時並不會如意,可能只獲取一點甚至獲取不到緩衝。接收器將使用檢索到的緩存,並將繼續監聽可用的緩存。apache
基於信用的流量控制將使用每通道緩衝區來指定本地緩衝池(可選(3))的獨佔(強制)緩存數和每一個門的浮動緩衝區,從而實現與沒有流量控制時相同的緩衝區上限。這兩個參數的默認值會使流量控制的最大(理論)吞吐量至少與沒有流量控制時同樣高,前提是網絡的延遲處於通常水平上。你可能須要根據實際的網絡延遲和帶寬來調整這些參數。api
可選(3):若是沒有足夠的緩存,每一個緩衝池將從全局可用緩衝池中獲取相同份額(±1)。數組
相比沒有流量控制的接收器的背壓機制,信用機制提供了更直接的控制邏輯:若是接收器能力不足,其可用信用將減到 0,並阻止發送方將緩存轉發到較底層的網絡棧上。這樣只在這個邏輯信道上存在背壓,而且不須要阻止從多路複用 TCP 信道讀取內容。所以,其餘接收器在處理可用緩存時就不受影響了。緩存
經過流量控制,多路複用鏈路中的信道就不會阻塞鏈路中的另外一個邏輯信道,提高了總體資源利用率。此外,咱們還能經過徹底控制「在線」數據的數量來改善檢查點對齊狀況:若是沒有流量控制,通道須要一段時間才能填滿網絡堆棧的內部緩衝區,並廣播接收器已經中止讀取的消息。這段時間裏會多出不少緩存。全部檢查點障礙都必須在這些緩存後面排隊,所以必須等到全部這些緩存處理完畢後才能啓動(「障礙永遠不會越過記錄!」)。服務器
可是,來自接收器的附加通告消息可能會產生一些額外開銷,尤爲是在使用 SSL 加密通道的設置中更是如此。此外,單個輸入通道不能使用緩衝池中的全部緩存,由於獨佔緩存不能共享。它也不能當即開始發送儘量多的數據,因此在加速期間(生成數據的速度比計算信用的速度更快時)可能須要更長時間才能發送數據。雖然這可能會影響你的做業性能,但這些代價相比收益來講仍是值得的。你可能但願經過每一個通道的緩衝區增長獨佔緩存的數量,但代價是使用更多內存。但與以前的實現相比整體內存佔用可能仍是要少一些,由於較底層的網絡棧再也不須要緩存大量數據了,咱們老是能夠當即將其傳輸到 Flink 中。網絡
還有一件事要注意:因爲咱們在發送方和接收方之間緩存的數據更少了,你可能會更早地遇到背壓。但這也在預料之中,並且緩存的數據再多也沒什麼用。若是你想要緩存更多數據,同時還要有流量控制,能夠考慮經過每一個門的浮動緩衝區來提高浮動緩存的數量。
Advantages | Disadvantages |
---|---|
• better resource utilisation with data skew in multiplexed connections 經過多路複用鏈接中的數據傾斜提高資源利用率 • improved checkpoint alignment 改善了檢查點對齊 • reduced memory use (less data in lower network layers)減小內存佔用(較底層網絡層中的數據更少) |
• additional credit-announce messages 額外的信用通知消息 • additional backlog-announce messages (piggy-backed with buffer messages, almost no overhead)額外的 backlog 通知消息(緩存消息附帶,幾乎沒有開銷) • potential round-trip latency 潛在的往返延遲 |
• backpressure appears earlier 背壓出現得更早 |
注意:若是你須要關閉基於信用的流量控制,能夠將下列代碼添加到 flink-conf.yaml:taskmanager.network.credit-model: false。但此參數已棄用,最終將與不基於信用的流控制代碼一塊兒被移除。
下面的視圖比以前的級別更高一些,其中包含網絡棧及其周圍組件的更多詳細信息:
一個記錄被建立並傳遞以後(例如經過 Collector #colle()),它會被遞交到RecordWriter,其未來自 Java 對象的記錄序列化爲一個字節序列,後者最終成爲網絡緩存,而後像前文提到的那樣被處理。RecordWriter 首先使用SpanningRecordSerializer將記錄序列化爲一個靈活的堆上字節數組。而後,它嘗試將這些字節寫入目標網絡通道的關聯網絡緩存。
在接收方,較底層的網絡棧(netty)將接收到的緩存寫入適當的輸入通道。最後(流式)任務的線程從這些隊列中讀取並嘗試在RecordReader的幫助下,經過SpillingAdaptiveSpanningRecordDeserializer將積累的數據反序列化爲 Java 對象。與序列化器相似,這個反序列化器還必須處理特殊狀況,例如跨越多個網絡緩衝區的記錄——這多是由於記錄大於網絡緩衝區(默認爲 32KiB,經過taskmanager.memory.segment-size設置);或者是由於序列化記錄被添加到了沒有足夠剩餘空間的網絡緩衝區中。無論怎樣,Flink 將使用這些數據,並繼續將剩餘數據寫入新的網絡緩衝區。
在上圖中,基於信用的流量控制機制實際上位於「Netty 服務器」(和「Netty 客戶端」)組件內部,RecordWriter 寫入的緩存始終以空狀態添加到結果子分區中,而後逐漸填滿(序列化)記錄。可是何時 Netty 真的獲得了緩存呢?顯然,只要它們可用時就不能接收數據了,由於這不只會由於跨線程通訊和同步而增長大量成本,並且還會讓整個緩存都過期。
在 Flink 中,有三種狀況下 Netty 服務器能夠消費緩存:
寫入記錄時緩衝區變滿
緩存超時命中
發送特殊事件,例如檢查點障礙
RecordWriter 與本地序列化緩衝區一塊兒使用當前記錄,並將這些數據逐漸寫入位於相應結果子分區隊列的一個或多個網絡緩衝區。雖然 RecordWriter 能夠處理多個子分區,但每一個子分區只有一個 RecordWriter 向其寫入數據。另外一方面,Netty 服務器正在從多個結果子分區讀取並將適當的分區複用到單個信道中,如上所述。這是一個典型的生產者——消費者模式,網絡緩衝區位於中間位置,以下圖所示。在(1)序列化和(2)將數據寫入緩衝區以後,RecordWriter 相應地更新緩衝區的寫入器索引。一旦緩衝區被徹底填滿,記錄寫入器將(3)從其本地緩衝池中獲取當前記錄(或下一個記錄)的全部剩餘數據生成新的緩存,並將新的緩存添加到子分區隊列。這將(4)通知 Netty 服務器還有數據可用(注 4)。每當 Netty 有能力處理此通知時,它將(5)獲取緩存並沿適當的 TCP 通道發送它。
注4:若是隊列中有更多處理完的緩存,咱們能夠假設 Netty 已經收到了通知
爲了下降延遲,咱們不能在緩衝區填滿以後才向下遊發送數據。有些狀況下某個通訊信道沒有流過那麼多記錄,這樣會帶來無心義的延遲。爲此,一個名爲輸出刷新器的按期進程將刷新堆棧中可用的任何數據。能夠經過StreamExecutionEnvironment#setBufferTimeout配置週期間隔,這個間隔對於低吞吐量通道來講就是延遲上限(注 5)。下圖顯示了它與其餘組件的交互方式:RecordWriter 仍是會序列化並寫入網絡緩衝區,但同時,若是 Netty 服務器還沒有知曉,輸出刷新器能夠(3,4)通知 Netty 服務器有數據可用(相似上面的「緩衝區已滿」場景)。當 Netty 處理此通知(5)時,它將使用緩衝區中的可用數據並更新緩衝區的讀取器索引。緩存保留在隊列中——從 Netty 服務器端對此緩存作進一步操做後,將在下次繼續讀取讀取器索引。
注5:嚴格來講,輸出刷新器無法給出任何保證——它只會向 Netty 發送通知而已,後者是否響應通知則要取決於其意願和能力。這也意味着若是通道在經受背壓,輸出刷新器就沒用了。
某些特殊事件若是經過 RecordWriter 發送,也會觸發當即刷新。最重要的特殊事件是檢查點障礙或分區結束事件,顯然它們應該快速執行,而不是等待輸出刷新器啓動。
相比 Flink 1.5 以前的版本,請注意(a)網絡緩衝區如今直接放在子分區隊列中,(b)咱們不會在每次刷新時關閉緩衝區。這也帶來了一些好處:
同步開銷較少(輸出刷新和 RecordWriter 是各自獨立的)
在高負載場景中,當 Netty 是瓶頸時(由於背壓或直接緣由),咱們仍然能夠在不完整的緩衝區中積累數據
Netty 通知明顯減小
但在低負載狀況下 CPU 使用率和 TCP 包速率可能會增長。這是由於新版 Flink 將使用全部可用的 CPU 週期來維持所需的延遲。當負載增長時它將經過填充更多的緩衝區來自我調整。因爲同步開銷減小了,高負載場景不會受到影響,甚至能夠得到更大的吞吐量。
若是你想更深刻地瞭解如何在 Flink 中實現生產者——消費者機制,請仔細查看 Flink 1.5 中引入的BufferBuilder和BufferConsumer類。雖然讀取多是按緩存逐個進行的,但寫入是按記錄進行的這樣 Flink 中的全部網絡通訊都走熱路徑。所以,咱們很是清楚咱們須要在任務的線程和 Netty 線程之間創建輕量鏈接,這不會致使過多的同步開銷。詳細信息能夠參閱源代碼。
引入網絡緩衝區能得到更高的資源利用率和吞吐量,代價是讓一些記錄在緩衝區中等待一段時間。雖然能夠經過緩衝區超時設置來限制這個延遲,但你極可能想要知道延遲和吞吐量之間的權衡關係——顯然它們不可兼得。下圖顯示了緩衝區超時設置的不一樣值——從 0 開始(每一個記錄都刷新)到 100 毫秒(默認值)——以及在有 100 個節點,每一個節點 8 個插槽各運行一個做業的集羣上對應的吞吐量;做業沒有業務邏輯,只用來測試網絡棧。爲了對比,咱們還加入了 Flink 1.4 版本的狀況。
如你所見,使用 Flink 1.5+ 版本時即便是很是低的緩衝區超時(例如 1ms,適合低延遲場景)也設置也只比默認超時設置高出最多 75%的吞吐量。
如今你瞭解告終果分區、批處理和流式傳輸的各類網絡鏈接和調度類型。你還了解了基於信用的流量控制以及網絡棧的內部工做機制,知道怎樣調整網絡相關的參數,知道怎樣判斷某些做業行爲。本系列的後續文章將基於這些知識探討更多操做細節,包括須要查看的相關指標、進一步的網絡棧調整以及要避免的常見反模式。敬請期待。
原文連接:
https://flink.apache.org/2019/06/05/flink-network-stack.html