Flink Slot詳解與Job Execution Graph優化

前言

近期將Flink Job從Standalone遷移至了OnYarn,隨後發現Job性能較以前有所下降:遷移前有8.3W+/S的數據消費速度,遷移到Yarn後分配一樣的資源但消費速度降爲7.8W+/S,且較以前的消費速度有輕微的抖動。通過緣由分析和測試驗證,最終採用了在保持分配給Job的資源不變的狀況下將總Container數量減半、每一個Container持有的資源從1C2G 1Slot變動爲2C4G 2Slot的方式,使該問題得以解決。html

經歷該問題後,發現深刻理解Slot和Flink Runtime Graph是十分必要的,因而撰寫了這篇文章。本文內容分爲兩大部分,第一部分詳細的分析Flink Slot與Job運行的關係,第二部詳細的介紹遇到的問題和解決方案。java

Flink Slot

Flink集羣是由JobManager(JM)、TaskManager(TM)兩大組件組成的,每一個JM/TM都是運行在一個獨立的JVM進程中。JM至關於Master,是集羣的管理節點,TM至關於Worker,是集羣的工做節點,每一個TM最少持有1個Slot,Slot是Flink執行Job時的最小資源分配單位,在Slot中運行着具體的Task任務。apache

對TM而言:它佔用着必定數量的CPU和Memory資源,具體可經過taskmanager.numberOfTaskSlots, taskmanager.heap.size來配置,實際上taskmanager.numberOfTaskSlots只是指定TM的Slot數量,並不能隔離指定數量的CPU給TM使用。在不考慮Slot Sharing(下文詳述)的狀況下,一個Slot內運行着一個SubTask(Task實現Runable,SubTask是一個執行Task的具體實例),因此官方建議taskmanager.numberOfTaskSlots配置的Slot數量和CPU相等或成比例。api

固然,咱們能夠藉助Yarn等調度系統,用Flink On Yarn的模式來爲Yarn Container分配指定數量的CPU資源,以達到較嚴格的CPU隔離(Yarn採用Cgroup作基於時間片的資源調度,每一個Container內運行着一個JM/TM實例)。而taskmanager.heap.size用來配置TM的Memory,若是一個TM有N個Slot,則每一個Slot分配到的Memory大小爲整個TM Memory的1/N,同一個TM內的Slots只有Memory隔離,CPU是共享的。網絡

對Job而言:一個Job所需的Slot數量大於等於Operator配置的最大Parallelism數,在保持全部Operator的slotSharingGroup一致的前提下Job所需的Slot數量與Job中Operator配置的最大Parallelism相等。app

關於TM/Slot之間的關係能夠參考以下從官方文檔截取到的三張圖:函數

圖一:Flink On Yarn的Job提交過程,從圖中咱們能夠了解到每一個JM/TM實例都分屬於不一樣的Yarn Container,且每一個Container內只會有一個JM或TM實例;經過對Yarn的學習咱們能夠了解到,每一個Container都是一個獨立的進程,一臺物理機能夠有多個Container存在(多個進程),每一個Container都持有必定數量的CPU和Memory資源,並且是資源隔離的,進程間不共享,這就能夠保證同一臺機器上的多個TM之間是資源隔離的(Standalone模式下,同一臺機器下如有多個TM,是作不到TM之間的CPU資源隔離的)。 性能

圖一圖片描述學習

圖二:Flink Job運行圖,圖中有兩個TM,各自有3個Slot,2個Slot內有Task在執行,1個Slot空閒。若這兩個TM在不一樣Container或容器上,則其佔用的資源是互相隔離的。在TM內多個Slot間是各自擁有 1/3 TM的Memory,共享TM的CPU、網絡(Tcp:ZK、 Akka、Netty服務等)、心跳信息、Flink結構化的數據集等。測試

圖二圖片描述

圖三:Task Slot的內部結構圖,Slot內運行着具體的Task,它是在線程中執行的Runable對象(每一個虛線框表明一個線程),這些Task實例在源碼中對應的類是org.apache.flink.runtime.taskmanager.Task。每一個Task都是由一組Operators Chaining在一塊兒的工做集合,Flink Job的執行過程可看做一張DAG圖,Task是DAG圖上的頂點(Vertex),頂點之間經過數據傳遞方式相互連接構成整個Job的Execution Graph。

圖三圖片描述

Operator Chain

Operator Chain是指將Job中的Operators按照必定策略(例如:single output operator能夠chain在一塊兒)連接起來並放置在一個Task線程中執行。Operator Chain默認開啓,可經過StreamExecutionEnvironment.disableOperatorChaining()關閉,Flink Operator相似Storm中的Bolt,在Strom中上游Bolt到下游會通過網絡上的數據傳遞,而Flink的Operator Chain將多個Operator連接到一塊兒執行,減小了數據傳遞/線程切換等環節,下降系統開銷的同時增長了資源利用率和Job性能。實際開發過程當中須要開發者瞭解這些原理,並能合理分配Memory和CPU給到每一個Task線程。

注: 【一個須要注意的地方】Chained的Operators之間的數據傳遞默認須要通過數據的拷貝(例如:kryo.copy(...)),將上游Operator的輸出序列化出一個新對象並傳遞給下游Operator,能夠經過ExecutionConfig.enableObjectReuse()開啓對象重用,這樣就關閉了這層copy操做,能夠減小對象序列化開銷和GC壓力等,具體源碼可閱讀org.apache.flink.streaming.runtime.tasks.OperatorChainorg.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。官方建議開發人員在徹底瞭解reuse內部機制後才使用該功能,冒然使用可能會給程序帶來bug。

Operator Chain效果可參考以下官方文檔截圖:

圖四:圖的上半部分是StreamGraph視角,有Task類別無並行度,如圖:Job Runtime時有三種類型的Task,分別是Source->MapkeyBy/window/applySink,其中Source->MapSource()Map()chaining在一塊兒的Task;圖的下半部分是一個Job Runtime期的實際狀態,Job最大的並行度爲2,有5個SubTask(即5個執行線程)。若沒有Operator Chain,則Source()Map()分屬不一樣的Thread,Task線程數會增長到7,線程切換和數據傳遞開銷等較以前有所增長,處理延遲和性能會較以前差。補充:在slotSharingGroup用默認或相同組名時,當前Job運行需2個Slot(與Job最大Parallelism相等)。

圖四圖片描述

Slot Sharing

Slot Sharing是指,來自同一個Job且擁有相同slotSharingGroup(默認:default)名稱的不一樣Task的SubTask之間能夠共享一個Slot,這使得一個Slot有機會持有Job的一整條Pipeline,這也是上文提到的在默認slotSharing的條件下Job啓動所需的Slot數和Job中Operator的最大parallelism相等的緣由。經過Slot Sharing機制能夠更進一步提升Job運行性能,在Slot數不變的狀況下增長了Operator可設置的最大的並行度,讓相似window這種消耗資源的Task以最大的並行度分佈在不一樣TM上,同時像map、filter這種較簡單的操做也不會獨佔Slot資源,下降資源浪費的可能性。

具體Slot Sharing效果可參考以下官方文檔截圖:

圖五:圖的左下角是一個soure-map-reduce模型的Job,source和map是4 parallelism,reduce是3 parallelism,總計11個SubTask;這個Job最大Parallelism是4,因此將這個Job發佈到左側上面的兩個TM上時獲得圖右側的運行圖,一共佔用四個Slot,有三個Slot擁有完整的source-map-reduce模型的Pipeline,如右側圖所示;注:map的結果會shufflereduce端,右側圖的箭頭只是說Slot內數據Pipline,沒畫出Job的數據shuffle過程。

圖五圖片描述

圖六:圖中包含source-map[6 parallelism]keyBy/window/apply[6 parallelism]sink[1 parallelism]三種Task,總計佔用了6個Slot;由左向右開始第一個slot內部運行着3個SubTask[3 Thread],持有Job的一條完整pipeline;剩下5個Slot內分別運行着2個SubTask[2 Thread],數據最終經過網絡傳遞給Sink完成數據處理。

圖六圖片描述

Operator Chain & Slot Sharing API

Flink在默認狀況下有策略對Job進行Operator Chain 和 Slot Sharing的控制,好比:將並行度相同且連續的SingleOutputStreamOperator操做chain在一塊兒(chain的條件較苛刻,不止單一輸出這一條,具體可閱讀org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(...)),Job的全部Task都採用名爲default的slotSharingGroup作Slot Sharing。但在實際的需求場景中,咱們可能會遇到需人爲干預Job的Operator Chain 或 Slot Sharing策略的狀況,本段就重點關注下用於改變默認Chain 和 Sharing策略的API。

  • StreamExecutionEnvironment.disableOperatorChaining():關閉整個Job的Operator
    Chain,每一個Operator獨自佔有一個Task,如上圖四所描述的Job,若是disableOperatorChaining
    source->map會拆開爲source(),
    map()兩種Task,Job實際的Task數會增長到7。這個設置會下降Job性能,在非生產環境的測試或profiling時能夠藉助以更好分析問題,實際生產過程當中不建議使用。
  • someStream.filter(...).map(...).startNewChain().map():startNewChain()是指從當前Operator[map]開始一個新的chain,即:兩個map會chaining在一塊兒而filter不會(由於startNewChain的存在使得第一次map與filter斷開了chain)。
  • someStream.map(...).disableChaining():disableChaining()是指當前Operator[map]禁用Operator
    Chain,即:Operator[map]會獨自佔用一個Task。
  • someStream.map(...).slotSharingGroup("name"):默認狀況下全部Operator的slotGroup都爲default,能夠經過slotSharingGroup()進行自定義,Flink會將擁有相同slotGroup名稱的Operators運行在相同Slot內,不一樣slotGroup名稱的Operators運行在其餘Slot內。

Operator Chain有三種策略ALWAYSNEVERHEAD,詳細可查看org.apache.flink.streaming.api.operators.ChainingStrategystartNewChain()對應的策略是ChainingStrategy.HEAD(StreamOperator的默認策略),disableChaining()對應的策略是ChainingStrategy.NEVER,ALWAYS是儘量的將Operators chaining在一塊兒;在一般狀況下ALWAYS是效率最高,不少Operator會將默認策略覆蓋爲ALWAYS,如filter、map、flatMap等函數。

遷移OnYarn後Job性能降低的問題

JOB說明:
相似StreamETL,100 parallelism,即:一個流式的ETL Job,不包含window等操做,Job的並行度爲100;

環境說明:

  1. Standalone下的Job Execution Graph:10TMs * 10Slots-per-TM,即:Job的Task運行在10個TM節點上,每一個TM上佔用10個Slot,每一個Slot可用1C2G資源,GCConf: -XX:+UseG1GC -XX:MaxGCPauseMillis=100
  2. OnYarn下初始狀態的Job Execution Graph:100TMs*1Slot-per-TM,即:Job的Task運行在100個Container上,每一個Container上的TM持有1個Slot,每一個Container分配1C2G資源,GCConf:-XX:+UseG1GC -XX:MaxGCPauseMillis=100
  3. OnYarn下調整後的Job Execution Graph:50TMs*2Slot-per-TM,即:Job的Task運行在50個Container上,每一個Container上的TM持有2個Slot,每一個Container分配2C4G資源,GCConfig:-XX:+UseG1GC -XX:MaxGCPauseMillis=100

注:OnYarn下使用了與Standalone一致的GC配置,當前Job在Standalone或OnYarn環境中運行時,YGC、FGC頻率基本相同,OnYarn下單個Container的堆內存較小使得單次GC耗時減小。生產環境中你們最好對比下CMS和G1,選擇更好的GC策略,當前上下文中暫時認爲GC對Job性能影響可忽略不計。

問題分析:
引發Job性能下降的緣由不難定位,從這張Container的線程圖(VisualVM中的截圖)可見:

圖七:在一個1C2G的Container內有126個活躍線程,守護線程78個。首先,在一個1C2G的Container中運行着126個活躍線程,頻繁的線程切換是會常常出現的,這讓原本就不充裕的CPU顯得更加的匱乏。其次,真正與數據處理相關的線程是紅色畫筆圈出的14條線程(2條Kafka Partition Consumer、Consumers和Operators包含在這個兩個線程內;12條Kafka Producer線程,將處理好的數據sink到Kafka Topic),這14條線程以外的大多數線程在相同TM、不一樣Slot間能夠共用,好比:ZK-Curator、Dubbo-Client、GC-Thread、Flink-Akka、Flink-Netty、Flink-Metrics等線程,徹底能夠經過增長TM下Slot數量達到多個SubTask共享的目的。

此時咱們會很天然的得出一個解決辦法:在Job使用資源不變的狀況下,在減小Container數量的同時增長單個Container持有的CPU、Memory、Slot數量,好比上文環境說明中從方案2調整到方案3,實際調整後的Job運行穩定了許多且消費速度與Standalone基本持平。

圖七圖片描述

注:當前問題是內部遷移相似StreamETL的Job時遇到的,解決方案簡單但不具備普適性,對於帶有window算子的Job須要更仔細縝密的問題分析。目前Deploy到Yarn集羣的Job都配置了JMX/Prometheus兩種監控,單個Container下Slot數量越多、每次scrape的數據越多,實際生成環境中需觀測是否會影響Job正常運行,在測試時將Container配置爲3C6G 3Slot時發現一次java.lang.OutOfMemoryError: Direct buffer memory的異常,初步判斷與Prometheus Client相關,可適當調整JVM的MaxDirectMemorySize來解決。

所出現異常如圖八:

圖八圖片描述

總結

Operator Chain是將多個Operator連接在一塊兒放置在一個Task中,只針對Operator;Slot Sharing是在一個Slot中執行多個Task,針對的是Operator Chain以後的Task。這兩種優化都充分利用了計算資源,減小了沒必要要的開銷,提高了Job的運行性能。此外,Operator Chain的源碼在streaming包下,只在流處理任務中有這個機制;Slot Sharing在flink-runtime包下,彷佛應用更普遍一些(具體還有待考究)。

最後,只有充分的瞭解Slot、Operator Chain、Slot Sharing是什麼,以及各自的做用和相互間的關係,才能編寫出優秀的代碼並高效的運行在集羣上。


參考資料:
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://flink.apache.org/visu...

做者:TalkingData數據工程師 王成龍

相關文章
相關標籤/搜索