Flink 的網絡協議棧是組成 flink-runtime 模塊的核心組件之一,是每一個 Flink 做業的核心。它鏈接全部 TaskManager 的各個子任務(Subtask),所以,對於 Flink 做業的性能包括吞吐與延遲都相當重要。與 TaskManager 和 JobManager 之間經過基於 Akka 的 RPC 通訊的控制通道不一樣,TaskManager 之間的網絡協議棧依賴於更加底層的 Netty API。數組
本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,而後詳細介紹 Flink 網絡協議棧的物理實現和各類優化、優化的效果以及 Flink 在吞吐量和延遲之間的權衡。緩存
Flink 的網絡協議棧爲彼此通訊的子任務提供如下邏輯視圖,例如在 A 經過 keyBy() 操做進行數據 Shuffle :網絡
這一過程創建在如下三種基本概念的基礎上:併發
▼ 子任務輸出類型(ResultPartitionType):
Pipelined(有限的或無限的):一旦產生數據就能夠持續向下遊發送有限數據流或無限數據流。
Blocking:僅在生成完整結果後向下游發送數據。運維
▼ 調度策略:
同時調度全部任務(Eager):同時部署做業的全部子任務(用於流做業)。
上游產生第一條記錄部署下游(Lazy):一旦任何生產者生成任何輸出,就當即部署下游任務。
上游產生完整數據部署下游:當任何或全部生產者生成完整數據後,部署下游任務。性能
▼ 數據傳輸:
高吞吐:Flink 不是一個一個地發送每條記錄,而是將若干記錄緩衝到其網絡緩衝區中並一次性發送它們。這下降了每條記錄的發送成本所以提升了吞吐量。
低延遲:當網絡緩衝區超過必定的時間未被填滿時會觸發超時發送,經過減少超時時間,能夠經過犧牲必定的吞吐來獲取更低的延遲。測試
咱們將在下面深刻 Flink 網絡協議棧的物理實現時看到關於吞吐延遲的優化。對於這一部分,讓咱們詳細說明輸出類型與調度策略。首先,須要知道的是子任務的輸出類型和調度策略是緊密關聯的,只有二者的一些特定組合纔是有效的。優化
Pipelined 結果是流式輸出,須要目標 Subtask 正在運行以便接收數據。所以須要在上游 Task 產生數據以前或者產生第一條數據的時候調度下游目標 Task 運行。批處理做業生成有界結果數據,而流式處理做業產生無限結果數據。ui
批處理做業也可能以阻塞方式產生結果,具體取決於所使用的算子和鏈接模式。在這種狀況下,必須等待上游 Task 先生成完整的結果,而後才能調度下游的接收 Task 運行。這可以提升批處理做業的效率而且佔用更少的資源。加密
下表總結了 Task 輸出類型以及調度策略的有效組合:
註釋:
[1]目前 Flink 未使用
[2]批處理 / 流計算統一完成後,可能適用於流式做業
此外,對於具備多個輸入的子任務,調度以兩種方式啓動:當全部或者任何上游任務產生第一條數據或者產生完整數據時調度任務運行。要調整批處理做業中的輸出類型和調度策略,能夠參考 ExecutionConfig#setExecutionMode()——尤爲是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。
爲了理解物理數據鏈接,請回想一下,在 Flink 中,不一樣的任務能夠經過 Slotsharing group 共享相同 Slot。TaskManager 還能夠提供多個 Slot,以容許將同一任務的多個子任務調度到同一個 TaskManager 上。
對於下圖所示的示例,咱們假設 2 個併發爲 4 的任務部署在 2 個 TaskManager 上,每一個 TaskManager 有兩個 Slot。TaskManager 1 執行子任務 A.1,A.2,B.1 和 B.2,TaskManager 2 執行子任務 A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 鏈接類型,好比來自於 A 的 keyBy() 操做,在每一個 TaskManager 上會有 2x4 個邏輯鏈接,其中一些是本地的,另外一些是遠程的:
不一樣任務(遠程)之間的每一個網絡鏈接將在 Flink 的網絡堆棧中得到本身的 TCP 通道。可是,若是同一任務的不一樣子任務被調度到同一個 TaskManager,則它們與同一個 TaskManager 的網絡鏈接將多路複用並共享同一個 TCP 信道以減小資源使用。在咱們的例子中,這適用於 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,以下圖所示:
每一個子任務的輸出結果稱爲 ResultPartition,每一個 ResultPartition 被分紅多個單獨的 ResultSubpartition- 每一個邏輯通道一個。Flink 的網絡協議棧在這一點的處理上,再也不處理單個記錄,而是將一組序列化的記錄填充到網絡緩衝區中進行處理。每一個子任務本地緩衝區中最多可用 Buffer 數目爲(每一個發送方和接收方各一個):
#channels * buffers-per-channel + floating-buffers-per-gate
單個 TaskManager 上的網絡層 Buffer 總數一般不須要配置。有關如何在須要時進行配置的詳細信息,請參閱配置網絡緩衝區的文檔。
▼ 形成反壓(1)
每當子任務的數據發送緩衝區耗盡時——數據駐留在 Subpartition 的緩衝區隊列中或位於更底層的基於 Netty 的網絡堆棧內,生產者就會被阻塞,沒法繼續發送數據,而受到反壓。接收端以相似的方式工做:Netty 收到任何數據都須要經過網絡 Buffer 傳遞給 Flink。若是相應子任務的網絡緩衝區中沒有足夠可用的網絡 Buffer,Flink 將中止從該通道讀取,直到 Buffer 可用。這將反壓該多路複用上的全部發送子任務,所以也限制了其餘接收子任務。下圖說明了過載的子任務 B.4,它會致使多路複用的反壓,也會致使子任務 B.3 沒法接受和處理數據,即便是 B.3 還有足夠的處理能力。
爲了防止這種狀況發生,Flink 1.5 引入了本身的流量控制機制。
Credit-based 流量控制可確保發送端已經發送的任何數據,接收端都具備足夠的能力(Buffer)來接收。新的流量控制機制基於網絡緩衝區的可用性,做爲 Flink 以前機制的天然延伸。每一個遠程輸入通道(RemoteInputChannel)如今都有本身的一組獨佔緩衝區(Exclusive buffer),而不是隻有一個共享的本地緩衝池(LocalBufferPool)。與以前不一樣,本地緩衝池中的緩衝區稱爲流動緩衝區(Floating buffer),由於它們會在輸出通道間流動而且可用於每一個輸入通道。
數據接收方會將自身的可用 Buffer 做爲 Credit 告知數據發送方(1 buffer = 1 credit)。每一個 Subpartition 會跟蹤下游接收端的 Credit(也就是可用於接收數據的 Buffer 數目)。只有在相應的通道(Channel)有 Credit 的時候 Flink 纔會向更底層的網絡協議棧發送數據(以 Buffer 爲粒度),而且每發送一個 Buffer 的數據,相應的通道上的 Credit 會減 1。除了發送數據自己外,數據發送端還會發送相應 Subpartition 中有多少正在排隊發送的 Buffer 數(稱之爲 Backlog)給下游。數據接收端會利用這一信息(Backlog)去申請合適數量的 Floating buffer 用於接收發送端的數據,這能夠加快發送端堆積數據的處理。接收端會首先申請和 Backlog 數量相等的 Buffer,但可能沒法申請到所有,甚至一個都申請不到,這時接收端會利用已經申請到的 Buffer 進行數據接收,並監聽是否有新的 Buffer 可用。
Credit-based 的流控使用 Buffers-per-channel 來指定每一個 Channel 有多少獨佔的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩衝池(Local buffer pool)大小(可選3),經過共享本地緩衝池,Credit-based 流控可使用的 Buffer 數目能夠達到與原來非 Credit-based 流控一樣的大小。這兩個參數的默認值是被精心選取的,以保證新的 Credit-based 流控在網絡健康延遲正常的狀況下至少能夠達到與原策略相同的吞吐。能夠根據實際的網絡 RRT (round-trip-time)和帶寬對這兩個參數進行調整。
註釋3:若是沒有足夠的 Buffer 可用,則每一個緩衝池將得到全局可用 Buffer 的相同份額(±1)。
▼ 形成反壓(2)
與沒有流量控制的接收端反壓機制不一樣,Credit 提供了更直接的控制:若是接收端的處理速度跟不上,最終它的 Credit 會減小成 0,此時發送端就不會在向網絡中發送數據(數據會被序列化到 Buffer 中並緩存在發送端)。因爲反壓只發生在邏輯鏈路上,所以不必阻斷從多路複用的 TCP 鏈接中讀取數據,也就不會影響其餘的接收者接收和處理數據。
▼ Credit-based 的優點與問題
因爲經過 Credit-based 流控機制,多路複用中的一個信道不會因爲反壓阻塞其餘邏輯信道,所以總體資源利用率會增長。此外,經過徹底控制正在發送的數據量,咱們還可以加快 Checkpoint alignment:若是沒有流量控制,通道須要一段時間才能填滿網絡協議棧的內部緩衝區並代表接收端再也不讀取數據了。在這段時間裏,大量的 Buffer 不會被處理。任何 Checkpoint barrier(觸發 Checkpoint 的消息)都必須在這些數據 Buffer 後排隊,所以必須等到全部這些數據都被處理後纔可以觸發 Checkpoint(「Barrier 不會在數據以前被處理!」)。
可是,來自接收方的附加通告消息(向發送端通知 Credit)可能會產生一些額外的開銷,尤爲是在使用 SSL 加密信道的場景中。此外,單個輸入通道( Input channel)不能使用緩衝池中的全部 Buffer,由於存在沒法共享的 Exclusive buffer。新的流控協議也有可能沒法作到當即發送儘量多的數據(若是生成數據的速度快於接收端反饋 Credit 的速度),這時則可能增加發送數據的時間。雖然這可能會影響做業的性能,但因爲其全部優勢,一般新的流量控制會表現得更好。可能會經過增長單個通道的獨佔 Buffer 數量,這會增大內存開銷。然而,與先前實現相比,整體內存使用可能仍然會下降,由於底層的網絡協議棧再也不須要緩存大量數據,由於咱們老是能夠當即將其傳輸到 Flink(必定會有相應的 Buffer 接收數據)。
在使用新的 Credit-based 流量控制時,可能還會注意到另外一件事:因爲咱們在發送方和接收方之間緩衝較少的數據,反壓可能會更早的到來。然而,這是咱們所指望的,由於緩存更多數據並無真正得到任何好處。若是要緩存更多的數據而且保留 Credit-based 流量控制,能夠考慮經過增長單個輸入共享 Buffer 的數量。
注意:若是須要關閉 Credit-based 流量控制,能夠將這個配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。可是,此參數已過期,最終將與非 Credit-based 流控制代碼一塊兒刪除。
下圖從上面的擴展了更高級別的視圖,其中包含網絡協議棧及其周圍組件的更多詳細信息,從發送算子發送記錄(Record)到接收算子獲取它:
在生成 Record 並將其傳遞出去以後,例如經過 Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會將 Java 對象序列化爲字節序列,最終存儲在 Buffer 中按照上面所描述的在網絡協議棧中進行處理。RecordWriter 首先使用 SpanningRecordSerializer 將 Record 序列化爲靈活的堆上字節數組。而後,它嘗試將這些字節寫入目標網絡 Channel 的 Buffer 中。咱們將在下面的章節回到這一部分。
在接收方,底層網絡協議棧(Netty)將接收到的 Buffer 寫入相應的輸入通道(Channel)。流任務的線程最終從這些隊列中讀取並嘗試在 RecordReader 的幫助下經過 SpillingAdaptiveSpanningRecordDeserializer 將累積的字節反序列化爲 Java 對象。與序列化器相似,這個反序列化器還必須處理特殊狀況,例如跨越多個網絡 Buffer 的 Record,或者由於記錄自己比網絡緩衝區大(默認狀況下爲32KB,經過 taskmanager.memory.segment-size 設置)或者由於序列化 Record 時,目標 Buffer 中已經沒有足夠的剩餘空間保存序列化後的字節數據,在這種狀況下,Flink 將使用這些字節空間並繼續將其他字節寫入新的網絡 Buffer 中。
在上圖中,Credit-based 流控制機制實際上位於「Netty Server」(和「Netty Client」)組件內部,RecordWriter 寫入的 Buffer 始終以空狀態(無數據)添加到 Subpartition 中,而後逐漸向其中填寫序列化後的記錄。可是 Netty 在何時真正的獲取併發送這些 Buffer 呢?顯然,不能是 Buffer 中只要有數據就發送,由於跨線程(寫線程與發送線程)的數據交換與同步會形成大量的額外開銷,而且會形成緩存自己失去意義(若是是這樣的話,不如直接將將序列化後的字節發到網絡上而沒必要引入中間的 Buffer)。
在 Flink 中,有三種狀況可使 Netty 服務端使用(發送)網絡 Buffer:
▼ 在 Buffer 滿後發送
RecordWriter 將 Record 序列化到本地的序列化緩衝區中,並將這些序列化後的字節逐漸寫入位於相應 Result subpartition 隊列中的一個或多個網絡 Buffer中。雖然單個 RecordWriter 能夠處理多個 Subpartition,但每一個 Subpartition 只會有一個 RecordWriter 向其寫入數據。另外一方面,Netty 服務端線程會從多個 Result subpartition 中讀取並像上面所說的那樣將數據寫入適當的多路複用信道。這是一個典型的生產者 - 消費者模式,網絡緩衝區位於生產者與消費者之間,以下圖所示。在(1)序列化和(2)將數據寫入 Buffer 以後,RecordWriter 會相應地更新緩衝區的寫入索引。一旦 Buffer 徹底填滿,RecordWriter 會(3)爲當前 Record 剩餘的字節或者下一個 Record 從其本地緩衝池中獲取新的 Buffer,並將新的 Buffer 添加到相應 Subpartition 的隊列中。這將(4)通知 Netty服務端線程有新的數據可發送(若是 Netty 還不知道有可用的數據的話4)。每當 Netty 有能力處理這些通知時,它將(5)從隊列中獲取可用 Buffer 並經過適當的 TCP 通道發送它。
註釋4:若是隊列中有更多已完成的 Buffer,咱們能夠假設 Netty 已經收到通知。
▼ 在 Buffer 超時後發送
爲了支持低延遲應用,咱們不能只等到 Buffer 滿了才向下遊發送數據。由於可能存在這種狀況,某種通訊信道沒有太多數據,等到 Buffer 滿了在發送會沒必要要地增長這些少許 Record 的處理延遲。所以,Flink 提供了一個按期 Flush 線程(the output flusher)每隔一段時間會將任何緩存的數據所有寫出。能夠經過 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,並做爲延遲5的上限(對於低吞吐量通道)。下圖顯示了它與其餘組件的交互方式:RecordWriter 如前所述序列化數據並寫入網絡 Buffer,但同時,若是 Netty 還不知道有數據能夠發送,Output flusher 會(3,4)通知 Netty 服務端線程數據可讀(相似與上面的「buffer已滿」的場景)。當 Netty 處理此通知(5)時,它將消費(獲取併發送)Buffer 中的可用數據並更新 Buffer 的讀取索引。Buffer 會保留在隊列中——從 Netty 服務端對此 Buffer 的任何進一步操做將在下次從讀取索引繼續讀取。
註釋5:嚴格來講,Output flusher 不提供任何保證——它只向 Netty 發送通知,而 Netty 線程會按照能力與意願進行處理。這也意味着若是存在反壓,則 Output flusher 是無效的。
▼ 特殊消息後發送
一些特殊的消息若是經過 RecordWriter 發送,也會觸發當即 Flush 緩存的數據。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應該儘快被髮送,而不該該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。
▼ 進一步的討論
與小於 1.5 版本的 Flink 不一樣,請注意(a)網絡 Buffer 如今會被直接放在 Subpartition 的隊列中,(b)網絡 Buffer 不會在 Flush 以後被關閉。這給咱們帶來了一些好處:
可是,在低負載狀況下,可能會出現 CPU 使用率和 TCP 數據包速率的增長。這是由於,Flink 將使用任何可用的 CPU 計算能力來嘗試維持所需的延遲。一旦負載增長,Flink 將經過填充更多的 Buffer 進行自我調整。因爲同步開銷減小,高負載場景不會受到影響,甚至能夠實現更高的吞吐。
更深刻地瞭解 Flink 中是如何實現生產者 - 消費者機制,須要仔細查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類。雖然讀取是以 Buffer 爲粒度,但寫入它是按 Record 進行的,所以是 Flink 中全部網絡通訊的核心路徑。所以,咱們須要在任務線程(Task thread)和 Netty 線程之間實現輕量級鏈接,這意味着儘可能小的同步開銷。你能夠經過查看源代碼獲取更加詳細的信息。
引入網絡 Buffer 的目是得到更高的資源利用率和更高的吞吐,代價是讓 Record 在 Buffer 中等待一段時間。雖然能夠經過 Buffer 超時給出此等待時間的上限,但可能很想知道有關這兩個維度(延遲和吞吐)之間權衡的更多信息,顯然,沒法二者同時兼得。下圖顯示了不一樣的 Buffer 超時時間下的吞吐,超時時間從 0 開始(每一個 Record 直接 Flush)到 100 毫秒(默認值),測試在具備 100 個節點每一個節點 8 個 Slot 的羣集上運行,每一個節點運行沒有業務邏輯的 Task 所以只用於測試網絡協議棧的能力。爲了進行比較,咱們還測試了低延遲改進(如上所述)以前的 Flink 1.4 版本。
如圖,使用 Flink 1.5+,即便是很是低的 Buffer 超時(例如1ms)(對於低延遲場景)也提供高達超時默認參數(100ms)75% 的最大吞吐,但會緩存更少的數據。
瞭解 Result partition,批處理和流式計算的不一樣網絡鏈接以及調度類型,Credit-Based 流量控制以及 Flink 網絡協議棧內部的工做機理,有助於更好的理解網絡協議棧相關的參數以及做業的行爲。後續咱們會推出更多 Flink 網絡棧的相關內容,並深刻更多細節,包括運維相關的監控指標(Metrics),進一步的網絡調優策略以及須要避免的常見錯誤等。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。