簡介: 目前有贊實時計算平臺對於 Flink 任務資源優化探索已經走出第一步。
隨着 Flink K8s 化以及實時集羣遷移完成,有贊愈來愈多的 Flink 實時任務運行在 K8s 集羣上,Flink K8s 化提高了實時集羣在大促時彈性擴縮容能力,更好的下降大促期間機器擴縮容的成本。同時,因爲 K8s 在公司內部有專門的團隊進行維護, Flink K8s 化也可以更好的減低公司的運維成本。java
不過當前 Flink K8s 任務資源是用戶在實時平臺端進行配置,用戶自己對於實時任務具體配置多少資源經驗較少,因此存在用戶資源配置較多,但實際使用不到的情形。好比一個 Flink 任務實際上 4 個併發可以知足業務處理需求,結果用戶配置了 16 個併發,這種狀況會致使實時計算資源的浪費,從而對於實時集羣資源水位以及底層機器成本,都有必定影響。基於這樣的背景,本文從 Flink 任務內存以及消息能力處理方面,對 Flink 任務資源優化進行探索與實踐。web
1.1 Flink 計算資源類型算法
一個 Flink 任務的運行,所須要的資源我認爲可以分爲 5 類:性能優化
目前 Flink 任務使用最主要的仍是內存和 CPU 資源,本地磁盤、依賴的外部存儲資源以及網卡資源通常都不會是瓶頸,因此本文咱們是從 Flink 任務的內存和 CPU 資源,兩個方面來對 Flink 實時任務資源進行優化。多線程
1.2 Flink 實時任務資源優化思路併發
對於 Flink 實時任務資源分析思路,咱們認爲主要包含兩點:運維
以後再結合實時任務內存分析所得相關指標、實時任務併發度的合理性,得出一個實時任務資源預設值,在和業務方充分溝通後,調整實時任務資源,最終達到實時任務資源配置合理化的目的,從而更好的下降機器使用成本。工具
■ 1.2.1 任務內存視角性能
那麼如何分析 Flink 任務的堆內存呢?這裏咱們是結合 Flink 任務 GC 日誌來進行分析。GC 日誌包含了每次 GC 堆內不一樣區域內存的變化和使用狀況。同時根據 GC 日誌,也可以獲取到一個 Taskmanager 每次 Full GC 後,老年代剩餘空間大小。能夠說,獲取實時任務的 GC 日誌,使咱們進行實時任務內存分析的前提。優化
GC 日誌內容分析,這裏咱們藉助開源的 GC Viewer 工具來進行具體分析,每次分析完,咱們可以獲取到 GC 相關指標,下面是經過 GC Viewer 分析一次 GC 日誌的部分結果:
上面經過 GC 日誌分析出單個 Flink Taskmanager 堆總大小、年輕代、老年代分配的內存空間、Full GC 後老年代剩餘大小等,固然還有不少其餘指標,相關指標定義能夠去 Github 具體查看。
這裏最重要的仍是Full GC 後老年代剩餘大小這個指標,按照《Java 性能優化權威指南》這本書 Java 堆大小計算法則,設 Full GC 後老年代剩餘大小空間爲 M,那麼堆的大小建議 3 ~ 4倍 M,新生代爲 1 ~ 1.5 倍 M,老年代應爲 2 ~ 3 倍 M,固然,真實對內存配置,你能夠按照實際狀況,將相應比例再調大些,用以防止流量暴漲情形。
因此經過 Flink 任務的 GC 日誌,咱們能夠計算出實時任務推薦的堆內存總大小,當發現推薦的堆內存和實際實時任務的堆內存大小相差過大時,咱們就認爲可以去下降業務方實時任務的內存配置,從而下降機器內存資源的使用。
■ 1.2.2 任務消息處理能力視角
對於 Flink 任務消息處理能力分析,咱們主要是看實時任務消費的數據源單位時間的輸入,和實時任務各個 Operator / Task 消息處理能力是否匹配。Operator 是 Flink 任務的一個算子,Task 則是一個或者多個算子 Chain 起來後,一塊兒執行的物理載體。
數據源咱們內部通常使用 Kafka,Kafka Topic 的單位時間輸入能夠經過調用 Kafka Broker JMX 指標接口進行獲取,固然你也能夠調用 Flink Rest Monitoring 相關 API 獲取實時任務全部 Kafka Source Task 單位時間輸入,而後相加便可。不過因爲反壓可能會對 Source 端的輸入有影響,這裏咱們是直接使用 Kafka Broker 指標 JMX 接口獲取 Kafka Topic 單位時間輸入。
在獲取到實時任務 Kafka Topic 單位時間輸入後,下面就是判斷實時任務的消息處理能力是否與數據源輸入匹配。一個實時任務總體的消息處理能力,會受處處理最慢的 Operator / Task 的影響。打個比方,Flink 任務消費的 Kafka Topic 輸入爲 20000 Record / S,可是有一個 Map 算子,其併發度爲 10 ,Map 算子中業務方調用了 Dubbo,一個 Dubbo 接口從請求到返回爲 10 ms,那麼 Map 算子處理能力 1000 Record / S (1000 ms / 10 ms * 10 ),從而實時任務處理能力會降低爲 1000 Record / S。
因爲一條消息記錄的處理會在一個 Task 內部流轉,因此咱們試圖找出一個實時任務中,處理最慢的 Task 邏輯。若是 Source 端到 Sink 端所有 Chain 起來的話,咱們則是會找出處理最慢的 Operator 的邏輯。在源碼層,咱們針對 Flink Task 以及 Operator 增長了單條記錄處理時間的自定義 Metric,以後該 Metric 能夠經過 Flink Rest API 獲取。咱們會遍歷一個 Flink 任務中全部的 Task , 查詢處理最慢的 Task 所在的 JobVertex(JobGraph 的點),而後獲取到該 JobVertex 全部 Task 的總輸出,最終會和 Kafka Topic 單位時間輸入進行比對,判斷實時任務消息處理能力是否合理。
設實時任務 Kafka Topic 單位時間的輸入爲 S,處理最慢的 Task 表明的 JobVertex 的併發度爲 P,處理最慢的 Task 所在的 JobVertex 單位時間輸出爲 O,處理最慢的 Task 的最大消息處理時間爲 T,那麼經過下面邏輯進行分析:
目前主要是 1 這種狀況在 CPU 使用方面不合理,固然,因爲不一樣時間段,實時任務的流量不一樣,因此咱們會有一個週期性檢測的的任務,若是檢測到某個實時任務連續屢次都符合 1 這種狀況時,會自動報警提示平臺管理員進行資源優化調整。
下圖是從 Flink 任務的內存以及消息處理能力兩個視角分析資源邏輯圖:
2.1 Flink 任務垃圾回收器選擇
Flink 任務本質仍是一個 Java 任務,因此也就會涉及到垃圾回收器的選擇。選擇垃圾回收器通常須要從兩個角度進行參考:
Flink 任務我認爲仍是偏重吞吐量的一類 Java 任務,因此會從吞吐量角度進行更多的考量。固然並非說徹底不考慮延遲,畢竟 JobManager、TaskManager、ResourceManager 之間存在心跳,延遲過大,可能會有心跳超時的可能性。
目前咱們 JDK 版本爲內部 JDK 1.8 版本,新生代垃圾回收器使用 Parallel Scavenge,那麼老年代垃圾回收器只能從 Serial Old 或者 Parallel Old 中選擇。因爲咱們 Flink k8s 任務每一個 Pod 的 CPU 限制爲 0.6 - 1 core ,最大也只能使用 1 個 core,因此老年代的垃圾回收器咱們使用的是 Serial Old ,多線程垃圾回收在單 Core 之間,可能會有線程切換的消耗。
2.2 實時任務 GC 日誌獲取
設置完垃圾回收器後,下一步就是獲取 Flink 任務的 GC 日誌。Flink 任務構成通常是單個 JobManager + 多個 TaskManger ,這裏須要獲取到 TaskManager 的 GC 日誌進行分析。那是否是要對全部 TaskManager 進行獲取呢。這裏咱們按照 TaskManager 的 Young GC 次數,按照次數大小進行排序,取排名前 16 的 TaskManager 進行分析。YoungGC 次數能夠經過 Flink Rest API 進行獲取。
Flink on Yarn 實時任務的 GC 日誌,直接點開 TaskManager 的日誌連接就可以看到,而後經過 HTTP 訪問,就能下載到本地。Flink On k8s 任務的 GC 日誌,會先寫到 Pod 所掛載的雲盤,基於 k8s hostpath volume 進行掛載。咱們內部使用 Filebeat 進行日誌文件變動監聽和採集,最終輸出到下游的 Kafka Topic。咱們內部會有自定義日誌服務端,它會消費 Kafka 的日誌記錄,自動進行落盤和管理,同時向外提供日誌下載接口。經過日誌下載的接口,便可以下載到須要分析的 TaskManager 的 GC 日誌。
2.3 基於 GC Viewer 分析 Flink 任務內存
GC Viewer 是一個開源的 GC 日誌分析工具。使用 GC Viewer 以前,須要先把 GC Viewer 項目代碼 clone 到本地,而後進行編譯打包,就可使用其功能。
在對一個實時任務堆內存進行分析時,先把 Flink TaskManager 的日誌下載到本地,而後經過 GC Viewer 對日誌進行。若是你以爲多個 Taskmanager GC 日誌分析較慢時,可使用多線程。上面全部這些操做,能夠將其代碼化,自動化產出分析結果。下面是經過 GC Viewer 分析的命令行:
java -jar gcviewer-1.37-SNAPSHOT.jar gc.log summary.csv
上面參數 gc.log 表示一個 Taskmanager 的 GC 日誌文件名稱,summary.csv 表示日誌分析的結果。下面是咱們平臺對於某個實時任務內存分析的結果:
下面是上面截圖中,部分參數說明:
上述大部份內存分析結果,經過 GC Viewer 分析都能獲得,不過推薦堆大小、推薦新生代堆大小、推薦老年代堆大小則是根據 1.2.1 小節的內存優化規則來設置。
3.1 實時任務 Kafka Topic 單位時間輸入獲取
想要對 Flink 任務的消息處理能力進行分析,第一步即是獲取該實時任務的 Kafka 數據源 Topic,目前若是數據源不是 Kafka 的話,咱們不會進行分析。Flink 任務整體分爲兩類:Flink Jar 任務和 Flink SQL 任務。Flink SQL 任務獲取 Kafka 數據源比較簡單,直接解析 Flink SQL 代碼,而後獲取到 With 後面的參數,在過濾掉 Sink 表以後,若是 SQLCreateTable 的 Conector 類型爲 Kafka,就可以經過 SQLCreateTable with 後的參數,拿到具體 Kafka Topic。
Flink Jar 任務的 Kafka Topic 數據源獲取相對繁瑣一些,咱們內部有一個實時任務血緣解析服務,經過對 Flink Jar 任務自動構建其 PackagedProgram,PackagedProgram 是 Flink 內部的一個類,而後經過 PackagedProgram ,咱們能夠獲取一個 Flink Jar 任務的 StreamGraph,StreamGraph 裏面有 Source 和 Sink 的全部 StreamNode,經過反射,咱們能夠獲取 StreamNode 裏面具體的 Source Function,若是是 Kafka Source Sunction,咱們就會獲取其 Kafka Topic。下面是 StreamGraph 類截圖:
獲取到 Flink 任務的 Kafka Topic 數據源以後,下一步即是獲取該 Topic 單位時間輸入的消息記錄數,這裏能夠經過 Kafka Broker JMX Metric 接口獲取,咱們則是經過內部 Kafka 管理平臺提供的外部接口進行獲取。
3.2 自動化檢測 Flink 消息處理最慢 Task
首先,咱們在源碼層增長了 Flink Task 單條記錄處理時間的 Metric,這個 Metric 能夠經過 Flink Rest API 獲取。接下來就是藉助 Flink Rest API,遍歷要分析的 Flink 任務的全部的 Task。Flink Rest Api 有這樣一個接口:
base_flink_web_ui_url/jobs/:jobid
這個接口可以獲取一個任務的全部 Vertexs,一個 Vertex 能夠簡單理解爲 Flink 任務 JobGraph 裏面的一個 JobVertex。JobVertex 表明着實時任務中一段執行邏輯。
獲取完 Flink 任務全部的 Vertex 以後,接下來就是獲取每一個 Vertex 具體 Task 處理單條記錄的 metric,可使用下面的接口:
須要在上述 Rest API 連接 metrics 以後添加 ?get=(具體meitric ),好比:metrics?get=0.Filter.numRecordsOut,0 表示該 Vertex Task 的 id,Filter.numRecordsOut 則表示具體的指標名稱。咱們內部使用 taskOneRecordDealTime 表示Task 處理單條記錄時間 Metric,而後用 0.taskOneRecordDealTime 去獲取某個 Task 的單條記錄處理時間的指標。上面接口支持多個指標查詢,即 get 後面使用逗號隔開便可。
最終自動化檢測 Flink 消息處理最慢 Task 總體步驟以下:
下面是咱們實時平臺對於一個 Flink 實時任務分析的結果:
既然 Flink 任務的內存以及消息處理能力分析的方式已經有了,那接下來就是在實時平臺端進行具體實踐。咱們實時平臺天天會定時掃描全部正在運行的 Flink 任務,在任務內存方面,咱們可以結合 實時任務 GC 日誌,同時根據內存優化規則,計算出 Flink 任務推薦的堆內存大小,並與實際分配的 Flink 任務的堆內存進行比較,若是二者相差的倍數過大時,咱們認爲 Flink 任務的內存配置存在浪費的狀況,接下來咱們會報警提示到平臺管理員進行優化。
平臺管理員再收到報警提示後,同時也會斷定實時任務消息能力是否合理,若是消息處理最慢的 Vertex (某段實時邏輯),其全部 Task 單位時間處理消息記錄數的總和約等於實時任務消費的 Kafka Topic 單位時間的輸入,但經過 Vertex 的併發度,以及單條消息處理 Metric ,算出該 Vertex 單位時間處理的消息記錄數遠大於 Kafka Topic 的單位輸入時,則認爲 Flink 任務能夠適當調小併發度。具體調整多少,會和業務方溝通以後,在進行調整。總體 Flink 任務資源優化操做流程以下:
目前有贊實時計算平臺對於 Flink 任務資源優化探索已經走出第一步。經過自動化發現可以優化的實時任務,而後平臺管理員介入分析,最終判斷是否可以調整 Flink 任務的資源。在整個實時任務資源優化的鏈路中,目前仍是不夠自動化,由於在後半段還須要人爲因素。將來咱們計劃 Flink 任務資源的優化所有自動化,會結合實時任務歷史不一樣時段的資源使用狀況,自動化推測和調整實時任務的資源配置,從而達到提高整個實時集羣資源利用率的目的。
同時將來也會和元數據平臺的同窗進行合做,一塊兒從更多方面來分析實時任務是否存在資源優化的可能性,他們在原來離線任務資源方面積攢了不少優化經驗,將來也能夠參考和借鑑,應用到實時任務資源的優化中。
固然,最理想化就是實時任務的資源使用可以本身自動彈性擴縮容,以前聽到過社區同窗有這方面的聲音,同時也歡迎你可以和我一塊兒探討。
做者:沈磊
原文連接本文爲阿里雲原創內容,未經容許不得轉載