近期將Flink Job從Standalone遷移至了OnYarn,隨後發現Job性能較以前有所下降:遷移前有8.3W+/S的數據消費速度,遷移到Yarn後分配一樣的資源但消費速度降爲7.8W+/S,且較以前的消費速度有輕微的抖動。通過緣由分析和測試驗證,最終採用了在保持分配給Job的資源不變的狀況下將總Container數量減半、每一個Container持有的資源從1C2G 1Slot
變動爲2C4G 2Slot
的方式,使該問題得以解決。html
經歷該問題後,發現深刻理解Slot和Flink Runtime Graph是十分必要的,因而撰寫了這篇文章。本文內容分爲兩大部分,第一部分詳細的分析Flink Slot與Job運行的關係,第二部詳細的介紹遇到的問題和解決方案。java
Flink集羣是由JobManager(JM)、TaskManager(TM)兩大組件組成的,每一個JM/TM都是運行在一個獨立的JVM進程中。JM至關於Master,是集羣的管理節點,TM至關於Worker,是集羣的工做節點,每一個TM最少持有1個Slot,Slot是Flink執行Job時的最小資源分配單位,在Slot中運行着具體的Task任務。apache
對TM而言:它佔用着必定數量的CPU和Memory資源,具體可經過taskmanager.numberOfTaskSlots
, taskmanager.heap.size
來配置,實際上taskmanager.numberOfTaskSlots
只是指定TM的Slot數量,並不能隔離指定數量的CPU給TM使用。在不考慮Slot Sharing(下文詳述)的狀況下,一個Slot內運行着一個SubTask(Task實現Runable,SubTask是一個執行Task的具體實例),因此官方建議taskmanager.numberOfTaskSlots
配置的Slot數量和CPU相等或成比例。api
固然,咱們能夠藉助Yarn等調度系統,用Flink On Yarn的模式來爲Yarn Container分配指定數量的CPU資源,以達到較嚴格的CPU隔離(Yarn採用Cgroup作基於時間片的資源調度,每一個Container內運行着一個JM/TM實例)。而taskmanager.heap.size
用來配置TM的Memory,若是一個TM有N個Slot,則每一個Slot分配到的Memory大小爲整個TM Memory的1/N,同一個TM內的Slots只有Memory隔離,CPU是共享的。網絡
對Job而言:一個Job所需的Slot數量大於等於Operator配置的最大Parallelism數,在保持全部Operator的slotSharingGroup
一致的前提下Job所需的Slot數量與Job中Operator配置的最大Parallelism相等。app
關於TM/Slot之間的關係能夠參考以下從官方文檔截取到的三張圖:函數
圖一:Flink On Yarn的Job提交過程,從圖中咱們能夠了解到每一個JM/TM實例都分屬於不一樣的Yarn Container,且每一個Container內只會有一個JM或TM實例;經過對Yarn的學習咱們能夠了解到,每一個Container都是一個獨立的進程,一臺物理機能夠有多個Container存在(多個進程),每一個Container都持有必定數量的CPU和Memory資源,並且是資源隔離的,進程間不共享,這就能夠保證同一臺機器上的多個TM之間是資源隔離的(Standalone模式下,同一臺機器下如有多個TM,是作不到TM之間的CPU資源隔離的)。 性能
圖一學習
圖二:Flink Job運行圖,圖中有兩個TM,各自有3個Slot,2個Slot內有Task在執行,1個Slot空閒。若這兩個TM在不一樣Container或容器上,則其佔用的資源是互相隔離的。在TM內多個Slot間是各自擁有 1/3 TM的Memory,共享TM的CPU、網絡(Tcp:ZK、 Akka、Netty服務等)、心跳信息、Flink結構化的數據集等。測試
圖二
圖三:Task Slot的內部結構圖,Slot內運行着具體的Task,它是在線程中執行的Runable對象(每一個虛線框表明一個線程),這些Task實例在源碼中對應的類是org.apache.flink.runtime.taskmanager.Task
。每一個Task都是由一組Operators Chaining在一塊兒的工做集合,Flink Job的執行過程可看做一張DAG圖,Task是DAG圖上的頂點(Vertex),頂點之間經過數據傳遞方式相互連接構成整個Job的Execution Graph。
圖三
Operator Chain是指將Job中的Operators按照必定策略(例如:single output operator能夠chain在一塊兒)連接起來並放置在一個Task線程中執行。Operator Chain默認開啓,可經過StreamExecutionEnvironment.disableOperatorChaining()
關閉,Flink Operator相似Storm中的Bolt,在Strom中上游Bolt到下游會通過網絡上的數據傳遞,而Flink的Operator Chain將多個Operator連接到一塊兒執行,減小了數據傳遞/線程切換等環節,下降系統開銷的同時增長了資源利用率和Job性能。實際開發過程當中須要開發者瞭解這些原理,並能合理分配Memory和CPU給到每一個Task線程。
注: 【一個須要注意的地方】Chained的Operators之間的數據傳遞默認須要通過數據的拷貝(例如:kryo.copy(...)),將上游Operator的輸出序列化出一個新對象並傳遞給下游Operator,能夠經過ExecutionConfig.enableObjectReuse()
開啓對象重用,這樣就關閉了這層copy操做,能夠減小對象序列化開銷和GC壓力等,具體源碼可閱讀org.apache.flink.streaming.runtime.tasks.OperatorChain
與org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput
。官方建議開發人員在徹底瞭解reuse內部機制後才使用該功能,冒然使用可能會給程序帶來bug。
Operator Chain效果可參考以下官方文檔截圖:
圖四:圖的上半部分是StreamGraph視角,有Task類別無並行度,如圖:Job Runtime時有三種類型的Task,分別是Source->Map
、keyBy/window/apply
、Sink
,其中Source->Map
是Source()
和Map()chaining
在一塊兒的Task;圖的下半部分是一個Job Runtime期的實際狀態,Job最大的並行度爲2,有5個SubTask(即5個執行線程)。若沒有Operator Chain,則Source()
和Map()
分屬不一樣的Thread,Task線程數會增長到7,線程切換和數據傳遞開銷等較以前有所增長,處理延遲和性能會較以前差。補充:在slotSharingGroup
用默認或相同組名時,當前Job運行需2個Slot(與Job最大Parallelism相等)。
圖四
Slot Sharing是指,來自同一個Job且擁有相同slotSharingGroup
(默認:default)名稱的不一樣Task的SubTask之間能夠共享一個Slot,這使得一個Slot有機會持有Job的一整條Pipeline,這也是上文提到的在默認slotSharing的條件下Job啓動所需的Slot數和Job中Operator的最大parallelism相等的緣由。經過Slot Sharing機制能夠更進一步提升Job運行性能,在Slot數不變的狀況下增長了Operator可設置的最大的並行度,讓相似window這種消耗資源的Task以最大的並行度分佈在不一樣TM上,同時像map、filter這種較簡單的操做也不會獨佔Slot資源,下降資源浪費的可能性。
具體Slot Sharing效果可參考以下官方文檔截圖:
圖五:圖的左下角是一個soure-map-reduce
模型的Job,source和map是4 parallelism
,reduce是3 parallelism
,總計11個SubTask;這個Job最大Parallelism是4,因此將這個Job發佈到左側上面的兩個TM上時獲得圖右側的運行圖,一共佔用四個Slot,有三個Slot擁有完整的source-map-reduce
模型的Pipeline,如右側圖所示;注:map
的結果會shuffle
到reduce
端,右側圖的箭頭只是說Slot內數據Pipline,沒畫出Job的數據shuffle
過程。
圖五
圖六:圖中包含source-map[6 parallelism]
、keyBy/window/apply[6 parallelism]
、sink[1 parallelism]
三種Task,總計佔用了6個Slot;由左向右開始第一個slot內部運行着3個SubTask[3 Thread],持有Job的一條完整pipeline;剩下5個Slot內分別運行着2個SubTask[2 Thread],數據最終經過網絡傳遞給Sink
完成數據處理。
圖六
Flink在默認狀況下有策略對Job進行Operator Chain 和 Slot Sharing的控制,好比:將並行度相同且連續的SingleOutputStreamOperator操做chain在一塊兒(chain的條件較苛刻,不止單一輸出這一條,具體可閱讀org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(...))
,Job的全部Task都採用名爲default的slotSharingGroup
作Slot Sharing。但在實際的需求場景中,咱們可能會遇到需人爲干預Job的Operator Chain 或 Slot Sharing策略的狀況,本段就重點關注下用於改變默認Chain 和 Sharing策略的API。
disableOperatorChaining
則source->map
會拆開爲source()
,map()
兩種Task,Job實際的Task數會增長到7。這個設置會下降Job性能,在非生產環境的測試或profiling時能夠藉助以更好分析問題,實際生產過程當中不建議使用。startNewChain()
是指從當前Operator[map]
開始一個新的chain,即:兩個map會chaining在一塊兒而filter不會(由於startNewChain的存在使得第一次map與filter斷開了chain)。disableChaining()
是指當前Operator[map]
禁用Operatordefault
,能夠經過slotSharingGroup()
進行自定義,Flink會將擁有相同slotGroup名稱的Operators運行在相同Slot內,不一樣slotGroup名稱的Operators運行在其餘Slot內。Operator Chain有三種策略ALWAYS
、NEVER
、HEAD
,詳細可查看org.apache.flink.streaming.api.operators.ChainingStrategy
。startNewChain()
對應的策略是ChainingStrategy.HEAD
(StreamOperator的默認策略),disableChaining()
對應的策略是ChainingStrategy.NEVER,ALWAYS
是儘量的將Operators chaining在一塊兒;在一般狀況下ALWAYS是效率最高,不少Operator會將默認策略覆蓋爲ALWAYS
,如filter、map、flatMap等函數。
JOB說明:
相似StreamETL,100 parallelism,即:一個流式的ETL Job,不包含window等操做,Job的並行度爲100;
環境說明:
10TMs * 10Slots-per-TM
,即:Job的Task運行在10個TM節點上,每一個TM上佔用10個Slot,每一個Slot可用1C2G資源,GCConf: -XX:+UseG1GC -XX:MaxGCPauseMillis=100
。100TMs*1Slot-per-TM
,即:Job的Task運行在100個Container上,每一個Container上的TM持有1個Slot,每一個Container分配1C2G
資源,GCConf:-XX:+UseG1GC
-XX:MaxGCPauseMillis=100
。50TMs*2Slot-per-TM
,即:Job的Task運行在50個Container上,每一個Container上的TM持有2個Slot,每一個Container分配2C4G資源,GCConfig:-XX:+UseG1GC
-XX:MaxGCPauseMillis=100
。注:OnYarn下使用了與Standalone一致的GC配置,當前Job在Standalone或OnYarn環境中運行時,YGC、FGC頻率基本相同,OnYarn下單個Container的堆內存較小使得單次GC耗時減小。生產環境中你們最好對比下CMS和G1,選擇更好的GC策略,當前上下文中暫時認爲GC對Job性能影響可忽略不計。
問題分析:
引發Job性能下降的緣由不難定位,從這張Container的線程圖(VisualVM中的截圖)可見:
圖七:在一個1C2G的Container內有126個活躍線程,守護線程78個。首先,在一個1C2G的Container中運行着126個活躍線程,頻繁的線程切換是會常常出現的,這讓原本就不充裕的CPU顯得更加的匱乏。其次,真正與數據處理相關的線程是紅色畫筆圈出的14條線程(2條Kafka Partition Consumer
、Consumers和Operators包含在這個兩個線程內;12條Kafka Producer
線程,將處理好的數據sink到Kafka Topic),這14條線程以外的大多數線程在相同TM、不一樣Slot間能夠共用,好比:ZK-Curator、Dubbo-Client、GC-Thread、Flink-Akka、Flink-Netty、Flink-Metrics等線程,徹底能夠經過增長TM下Slot數量達到多個SubTask共享的目的。
此時咱們會很天然的得出一個解決辦法:在Job使用資源不變的狀況下,在減小Container數量的同時增長單個Container持有的CPU、Memory、Slot數量,好比上文環境說明中從方案2調整到方案3,實際調整後的Job運行穩定了許多且消費速度與Standalone基本持平。
圖七
注:當前問題是內部遷移相似StreamETL的Job時遇到的,解決方案簡單但不具備普適性,對於帶有window算子的Job須要更仔細縝密的問題分析。目前Deploy到Yarn集羣的Job都配置了JMX/Prometheus兩種監控,單個Container下Slot數量越多、每次scrape的數據越多,實際生成環境中需觀測是否會影響Job正常運行,在測試時將Container配置爲3C6G 3Slot
時發現一次java.lang.OutOfMemoryError: Direct buffer memory
的異常,初步判斷與Prometheus Client相關,可適當調整JVM的MaxDirectMemorySize
來解決。
所出現異常如圖八:
圖八
Operator Chain是將多個Operator連接在一塊兒放置在一個Task中,只針對Operator;Slot Sharing是在一個Slot中執行多個Task,針對的是Operator Chain以後的Task。這兩種優化都充分利用了計算資源,減小了沒必要要的開銷,提高了Job的運行性能。此外,Operator Chain的源碼在streaming包下,只在流處理任務中有這個機制;Slot Sharing在flink-runtime包下,彷佛應用更普遍一些(具體還有待考究)。
最後,只有充分的瞭解Slot、Operator Chain、Slot Sharing是什麼,以及各自的做用和相互間的關係,才能編寫出優秀的代碼並高效的運行在集羣上。
參考資料:
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://ci.apache.org/project...
https://flink.apache.org/visu...
做者:TalkingData數據工程師 王成龍