做者:王治江apache
本文講述的shuffle概念範圍以下圖虛線框所示,從上游算子產出數據到下游算子消費數據的所有流程,基本能夠劃分紅三個子模塊:數組
上游寫數據:算子產出的record序列化成buffer數據結構插入到sub partition隊列;緩存
網絡傳輸:上下游可能調度部署到不一樣的container中,上游的數據須要通過網絡傳輸到下游,涉及到數據拷貝和編解碼流程;服務器
下游讀數據:從網絡上接收到的buffer反序列化成record給op處理。網絡
當job被調度開始運行後,除了算子內部的業務邏輯開銷外,整個runtime引擎的運行時開銷基本都在shuffle過程,其中涉及了數據序列化、編解碼、內存拷貝和網絡傳輸等複雜操做,所以能夠說shuffle的總體性能決定了runtime引擎的性能。數據結構
Flink對於batch和streaming job的shuffle架構設計是統一的,從性能的角度咱們設計實現了統一的網絡流控機制,針對序列化和內存拷貝進行了優化。從batch job可用性角度,咱們實現了external shuffle service以及重構了插件化的shuffle manager機制,在功能、性能和擴展性方面進行了全方位的提高,下面從三個主要方面分別具體介紹。架構
Flink原有的網絡傳輸機制是上游隨機push,下游被動接收模式:併發
一個container容器一般部署多個task併發線程執行op的業務邏輯,不一樣task線程會複用同一個TCP channel進行網絡數據傳輸,這樣能夠減小大規模場景下進程之間的網絡鏈接數量;框架
Flink定義一種buffer數據結構用來緩存上下游的輸入和輸出,不一樣op的輸入和輸出端都維護一個獨立有限的local buffer pool,這樣可讓上下游以pipelined模式並行運行的更平滑;socket
上游op產出的數據序列化寫到flink buffer中,網絡端的netty線程從partition queue中取走flink buffer拷貝到netty buffer中,flink buffer被回收到local buffer pool中繼續給op複用,netty buffer最終寫入到socket buffer後回收;
下游網絡端netty線程從socket buffer中讀取數據拷貝到netty buffer中,通過decode後向local buffer pool申請flink buffer進行數據拷貝,flink buffer插入到input channel隊列,通過input processor反序列化成record給op消費,再被回收到local buffer pool中繼續接收網絡上的數據;
整個鏈路輸入輸出端的local buffer pool若是能夠緩衝抵消上下游生產和消費的能力差別時,這種模式不會形成性能上的影響。
實際job運行過程當中,常常會看到整個鏈路上下游的inqueue和outqueue隊列所有塞滿buffer形成反壓,尤爲在追數據和負載不均衡的場景下。
如上圖所示,當下遊輸入端local buffer pool中的資源耗盡時,網絡端的netty線程沒法申請到flink buffer來拷貝接收到的數據,爲了不把數據spill到磁盤,出於內存資源的保護而被迫臨時關閉channel通道上的read操做。但因爲TCP channel是被多個op共享的,一旦關閉會致使全部其它的正常op都不能接收上游的數據;
TCP自身的流控機制使下游client端ack的advertise window逐漸減少到0,致使上游server再也不繼續發送網絡數據,最終socket send buffer被逐漸塞滿;
上游的netty buffer因爲不能寫入到socket send buffer,致使netty buffer水位線逐漸上升,當到達閾值後netty線程再也不從partition隊列中取flink buffer,這樣flink buffer不能被及時回收致使local buffer pool資源最終耗盡;
上游op因爲拿不到flink buffer沒法繼續輸出數據被block中止工做,這樣一層層反壓直到整個拓撲的source節點。
反壓雖然是很難避免的,但現有的流控機制加重了反壓的影響:
因爲進程間的TCP共享複用,一個task線程的瓶頸會致使整條鏈路上全部task線程都不能接收數據,影響總體tps;
一旦數據傳輸通道臨時關閉,checkpoint barrier也沒法在網絡上傳輸,checkpoint長期作不出來,一旦發生failover須要回放大量的歷史數據;
除了輸入輸出端的flink buffer被耗盡,還會額外佔用netty內部的buffer資源以及通道關閉前接收到的臨時buffer overhead,在大規模場景下容易出現oom不穩定因素。
經過上面分析能夠看出,上下游信息不對稱致使上游按照數據產出驅動盲目的向下遊推送,當下遊沒有能力接收新數據時而被迫關閉了數據通道。所以須要一種上層更細粒度的流控機制,可以讓複用同一個物理通道的全部邏輯鏈路互不影響進行數據傳輸。
咱們藉助了credit思想讓下游隨時反饋本身的接收能力,這樣上游能夠有針對性的選擇有能力的下游發送對應的數據,即以前的上游盲目push模式變成了下游基於credit的pull模式。
以下圖所示,上游定義了backlog概念表示sub partition中已經緩存的待發送buffer數量,至關於生產者的庫存狀況,這個信息做爲payload隨着現有的數據協議傳輸給下游,所以這部分的overhead能夠忽略;
下游定義了credit概念表示每一個input channel上可用的空閒buffer數量,每一個input channel都會獨佔有限個exclusive buffer,全部input channel共享同一個local buffer pool用來申請floating buffer,這種buffer類型的區分能夠保證每一個input既有最基本的資源保證不會資源搶佔致使的死鎖,又能夠根據backlog合理的搶佔全局floating資源。
下游的credit應該儘可能及時增量反饋,避免上游由於等待credit而延時發送數據。下游也會盡可能每次申請比backlog多一些overhead的credit,能夠保證上游新產出的數據不須要等待credit反饋而延時。新定義的credit反饋協議數據量很小,和正常的數據傳輸相比在網絡帶寬不是瓶頸的前提下,空間佔用基本能夠忽略。
新流控機制在某條鏈路出現反壓的場景下,能夠保證共享物理通道的其它鏈路正常傳輸數據。咱們用雙11大屏的一個典型業務驗證job總體throughput提高了20%(以下圖),對於這種keyby類型的上下游all-to-all模式,性能的提高比例取決於反壓後的數據分佈狀況。對於one-to-one模式的job,咱們實驗驗證在出現反壓場景下的性能提高能夠達到1倍以上。
新流控機制保證上游發送的數據都是下游能正常接收的,這樣數據再也不堵塞在網絡層,即netty buffer以及socket buffer中再也不殘留數據,至關於總體上in-flighting buffer比以前少了,這對於checkpoint的barrier對齊是有好處的。另外,基於新機制下每一個input channel都有exclusive buffer而不會形成資源死鎖,咱們能夠在下游接收端有傾向性的選擇不一樣channel優先讀取,這樣能夠保證barrier儘快對齊而觸發checkpoint流程,以下圖所示checkpoint對齊事件比以前明顯快了幾倍,這對於線上job的穩定性是相當重要的。
此外,基於新流控機制還能夠針對不少場景作優化,好比對於非keyby的rebalance模式,上游採用round-robin方式輪詢向不一樣下游產出數據,這種看似rebalance的作法在實際運行過程當中每每會帶來負載不均衡而觸發反壓,由於不一樣record的處理開銷不一樣,以及不一樣下游task的物理環境load也不一樣。經過backlog的概念,上游產出數據再也不按照簡單的round-robin,而是參考不一樣partition中的backlog大小,backlog越大說明庫存壓力越大,反映下游的處理能力不足,優先向backlog小的partition中產出數據,這種優化對於不少業務場景下帶來的收益很是大。新流控機制已經貢獻回社區1.5版本,參考[1]。
如開篇所列,整個shuffle過程涉及最多的就是數據序列化和內存拷貝,在op業務邏輯很輕的狀況下,這部分開銷佔總體比例是最大的,每每也是整個runtime的瓶頸所在,下面分別介紹這兩部分的優化。
Broadcast模式指上游同一份數據傳輸給下游全部的併發task節點,這種模式使用的場景也比較多,好比hash-join中build source端的數據就是經過broadcast分發的。
Flink爲每一個sub partition單首創建一個serializer,每一個serializer內部維護兩個臨時ByteBuffer,一個用來存儲record序列化後的長度信息,一個用來存儲序列化後的數據信息。op產出的record先序列化到兩個臨時ByteBuffer中,再從local buffer pool中申請flink buffer進行長度和數據信息拷貝,最後插入到sub partition隊列中。這種實現主要有兩個問題:
假設有n個sub partition對應n個併發下游,broadcast模式下一樣的數據要通過n次序列化轉化,再通過n次數據拷貝,當sub partition數量多時這個開銷很大;
Serializer數量和sub partition數量成正比,每一個serializer內部又須要維護兩個臨時數組,尤爲當record size比較大時,存儲數據的臨時數組膨脹會比較大,這部份內存overhead當sub partition數量多時不可忽視,容易產生oom。
針對上述問題,如上圖咱們從兩個方面進行了優化:
保留一個serializer服務於全部的sub partition,這樣大量減小了serializer內部臨時內存的overhead,serializer自己是無狀態的;
Broadcast場景下數據只序列化一次,序列化後的臨時結果只拷貝到一個flink buffer中,這個buffer會被插入到全部的sub partition隊列中,經過增長引用計數控制buffer的回收。
這樣上游數據產出的開銷下降到了原來的1/n,極大的提高了broadcast的總體性能,這部分工做正在貢獻回社區,參考[2]。
如前面流控中提到的,整個shuffle流程上下游網絡端flink buffer各會經歷兩次數據拷貝:
上游flink buffer插入到partition隊列後,先拷貝到netty ByteBuffer中,再拷貝到socket send buffer中;
下游從socket read buffer先拷貝到netty ByteBuffer中,再拷貝到flink buffer中。
Netty自身ByteBuffer pool的管理致使進程direct memory的使用沒法準確評估,在socket channel數量特別多的場景下,進程的maxDirectMemory配置不合理很容易出現oom形成failover,所以咱們打算讓netty直接使用flink buffer,屏蔽掉netty內部的ByteBuffer使用。
Flink的buffer數據結構從原有的heap bytes改用off-heap direct memory實現,而且繼承自netty內部的ByteBuffer;
上游netty線程從partition隊列取出buffer直接寫入到socket send buffer中,下游netty線程從socket read buffer直接申請local buffer pool接收數據,再也不通過中間的netty buffer拷貝。
通過上述優化,進程的direct memory使用大大下降了,從以前的默認320m配置調整爲80m,總體的tps和穩定性都有了提升,社區的相關工做參考[3]。
上面介紹的一系列優化對於streaming和batch job都是適用的,尤爲對於streaming job目前的shuffle系統優點很明顯,但對於batch job的場景還有不少侷限性:
Streaming job上下游以pipelined方式並行運行,batch job每每分stage串行運行,上游運行結束後再啓動下游拉數據,上游產出的數據會持久化輸出到本地文件。因爲上游的container進程承擔了shuffle service服務,即便上游op運行結束,在數據沒有徹底傳輸到下游前,container資源依然不能回收,若是這部分資源不能用於調度下游節點,會形成資源上的浪費;
Flink batch job只支持一種文件輸出格式,即每一個sub partition單獨生成一個文件,當sub partition數量特別多,單個partition數據量又特別小的場景下,一是形成file handle數量不可控,二是對磁盤io的讀寫不友好,性能比較低。
針對上述兩個問題,咱們對shuffle提出了兩方面改造,一是實現了external shuffle service把shuffle服務和運行op的container進程解耦,二是定義了插件化的shuffle manager interface,在保留flink現有實現的基礎上,擴展了新的文件存儲格式。
External shuffle service能夠運行在flink框架外的任何container容器中,好比yarn模式下的NodeManager進程中,這樣每臺機器部署一個shuffle service統一服務於這臺服務器上全部job的數據傳輸,對本地磁盤的讀取能夠更合理高效的全局控制。
咱們從flink內置的internal shuffle service中提取了網絡層的相關組件,主要包括result partition manager和transport layer,封裝到external shuffle service中,上面提到的流控機制以及網絡內存拷貝等優化一樣收益於external shuffle service。
上游result partition經過內置shuffle service與遠程external shuffle service進行通訊,把shuffle相關信息註冊給result partition manager;
下游input gate也經過內置shuffle service與遠程external shuffle service通訊請求partitoin數據,result partition manager根據上游註冊的shuffle信息能夠正確解析文件格式,並按照credit流控模式向下遊發送數據。
基於external shuffle service運行的batch job,上游結束後container資源能夠馬上回收,資源利用率更加合理,external shuffle service根據磁盤類型和負載,合理控制讀取充分發揮硬件性能。
爲了解決flink batch job單一文件存儲格式的侷限性,咱們定義了shuffle manager interface支持可擴展的上下游shuffle讀寫模式。job拓撲支持在邊上設置不一樣的shuffle manager實現,來定義每條邊的上下游之間如何shuffle數據。shuffle manager有三個功能接口:
getResultPartitionWriter用來定義上游如何寫數據,即描述輸出文件的存儲格式,同時result partition本身決定是否須要註冊到shuffle service中,讓shuffle service理解輸出文件進行數據傳輸;
getResultPartitionLocation用來定義上游的輸出地址,job master在調度下游時會把這個信息攜帶給下游描述中,這樣下游就能夠按照這個地址請求上游的輸出數據;
getInputGateReader用來定義下游如何讀取上游的數據。
基於上述interface,咱們在上游新實現了一種sort-merge輸出格式,即全部sub partition數據會先寫到一個文件中,最終再merge成有限個文件,經過index文件索引來識別讀取不一樣sub partition的數據。這種模式在某些場景下的表現會優於flink原有的單partition文件形式,也做爲線上默認使用的模式。總體的重構工做也正在貢獻回社區,參考[4]。
上述shuffle的相關工做集結了淘江、雲騫、北牧和成陽等同窗的付出和努力,將來shuffle工做在流上會追求更高的極致性能,如何用更少的資源跑出最好的效果,在批上充分利用現有流上積累的優點,更好的充分利用和發揮硬件的性能以及架構的統一。
[1] issues.apache.org/jira/browse…
[2] issues.apache.org/jira/browse…
[3] issues.apache.org/jira/browse…
[4] issues.apache.org/jira/browse…
更多資訊請訪問 Apache Flink 中文社區網站