本文由 Apache Flink Contributor 劉彪分享,本文對兩大問題進行了詳細的介紹,即什麼是 Metrics、如何使用 Metrics,並對 Metrics 監控實戰進行解釋說明。html
Flink 提供的 Metrics 能夠在 Flink 內部收集一些指標,經過這些指標讓開發人員更好地理解做業或集羣的狀態。因爲集羣運行後很難發現內部的實際情況,跑得慢或快,是否異常等,開發人員沒法實時查看全部的 Task 日誌,好比做業很大或者有不少做業的狀況下,該如何處理?此時 Metrics 能夠很好的幫助開發人員瞭解做業的當前情況。java
Metrics 的類型以下:sql
Metric 在 Flink 內部有多層結構,以 Group 的方式組織,它並非一個扁平化的結構,Metric Group + Metric Name 是 Metrics 的惟一標識。apache
Metric Group 的層級有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每一個 Job 具體到某一個 task 的 group,task 又分爲 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 統計和一些 Metrics,整個層級大概以下圖所示。Metrics 不會影響系統,它處在不一樣的組中,而且 Flink支持本身去加 Group,能夠有本身的層級。安全
•TaskManagerMetricGroup •TaskManagerJobMetricGroup •TaskMetricGroup •TaskIOMetricGroup •OperatorMetricGroup •${User-defined Group} / ${User-defined Metrics} •OperatorIOMetricGroup •JobManagerMetricGroup •JobManagerJobMetricGroup
JobManagerMetricGroup 相對簡單,至關於 Master,它的層級也相對較少。網絡
Metrics 定義仍是比較簡單的,即指標的信息能夠本身收集,本身統計,在外部系統可以看到 Metrics 的信息,並可以對其進行聚合計算。多線程
System Metrics,將整個集羣的狀態已經涵蓋得很是詳細。具體包括如下方面:架構
•CPU •Memory •Threads •Garbage Collection •Network •Classloader •Cluster •Availability •Checkpointing •StateBackend •IO •詳見: [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html)
除了系統的 Metrics 以外,Flink 支持自定義 Metrics ,即 User-defined Metrics。上文說的都是系統框架方面,對於本身的業務邏輯也能夠用 Metrics 來暴露一些指標,以便進行監控。併發
User-defined Metrics 如今說起的都是 datastream 的 API,table、sql 可能須要 context 協助,但若是寫 UDF,它們實際上是大同小異的。app
Datastream 的 API 是繼承 RichFunction ,繼承 RichFunction 才能夠有 Metrics 的接口。而後經過 RichFunction 會帶來一個 getRuntimeContext().getMetricGroup().addGroup(…) 的方法,這裏就是 User-defined Metrics 的入口。經過這種方式,能夠自定義 user-defined Metric Group。若是想定義具體的 Metrics,一樣須要用getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…) 方法,它會有相應的構造函數,能夠定義到本身的 Metrics 類型中。
繼承 RichFunction •Register user-defined Metric Group: getRuntimeContext().getMetricGroup().addGroup(…) •Register user-defined Metric: getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…)
下面經過一段簡單的例子說明如何使用 Metrics。好比,定義了一個 Counter 傳一個 name,Counter 默認的類型是 single counter(Flink 內置的一個實現),能夠對 Counter 進行 inc()操做,並在代碼裏面直接獲取。
Meter 也是這樣,Flink 有一個內置的實現是 Meterview,由於 Meter 是多長時間內發生事件的記錄,因此它是要有一個多長時間的窗口。日常用 Meter 時直接 markEvent(),至關於加一個事件不停地打點,最後用 getrate() 的方法直接把這一段時間發生的事件除一下給算出來。
Gauge 就比較簡單了,把當前的時間打出來,用 Lambda 表達式直接把 System::currentTimeMillis 打進去就能夠,至關於每次調用的時候都會去真正調一下系統當天時間進行計算。
Histogram 稍微複雜一點,Flink 中代碼提供了兩種實現,在此取一其中個實現,仍然須要一個窗口大小,更新的時候能夠給它一個值。
這些 Metrics 通常都不是線程安全的。若是想要用多線程,就須要加同步,更多詳情請參考下面連接。
•Counter processedCount = getRuntimeContext().getMetricGroup().counter("processed_count"); processedCount.inc(); •Meter processRate = getRuntimeContext().getMetricGroup().meter("rate", new MeterView(60)); processRate.markEvent(); •getRuntimeContext().getMetricGroup().gauge("current_timestamp", System::currentTimeMillis); •Histogram histogram = getRuntimeContext().getMetricGroup().histogram("histogram", new DescriptiveStatisticsHistogram(1000)); histogram.update(1024); •[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#metric-types]
獲取 Metrics 有三種方法,首先能夠在 WebUI 上看到;其次能夠經過 RESTful API 獲取,RESTful API 對程序比較友好,好比寫自動化腳本或程序,自動化運維和測試,經過 RESTful API 解析返回的 Json 格式對程序比較友好;最後,還能夠經過 Metric Reporter 獲取,監控主要使用 Metric Reporter 功能。
獲取 Metrics 的方式在物理架構上是怎樣實現的?
瞭解背景和原理會對使用有更深入的理解。WebUI 和 RESTful API 是經過中心化節點按期查詢把各個組件中的 Metrics 拉上來的實現方式。其中,fetch 不必定是實時更新的,默認爲 10 秒,因此有可能在 WebUI 和 RESTful API 中刷新的數據不是實時想要獲得的數據;此外,fetch 有可能不一樣步,好比兩個組件,一邊在加另外一邊沒有動,多是因爲某種緣由超時沒有拉過來,這樣是沒法更新相關值的,它是 try best 的操做,因此有時咱們看到的指標有可能會延遲,或許等待後相關值就更新了。
紅色的路徑經過 MetricFetcher,會有一箇中心化的節點把它們聚合在一塊兒展現。而 MetricReporter 不同,每個單獨的點直接彙報,它沒有中心化節點幫助作聚合。若是想要聚合,須要在第三方系統中進行,好比常見的 TSDB 系統。固然,不是中心化結構也是它的好處,它能夠免去中心化節點帶來的問題,好比內存放不下等,MetricReporter 把原始數據直接 Reporter 出來,用原始數據作處理會有更強大的功能。
Flink 內置了不少 Reporter,對外部系統的技術選型能夠參考,好比 JMX 是 java 自帶的技術,不嚴格屬於第三方。還有InfluxDB、Prometheus、Slf4j(直接打 log 裏)等,調試時候很好用,能夠直接看 logger,Flink 自己自帶日誌系統,會打到 Flink 框架包裏面去。詳見:
•Flink 內置了不少 Reporter,對外部系統的技術選型能夠參考,詳見:[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#reporter](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html) •Metric Reporter Configuration Example metrics.reporters: your_monitor,jmx metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter metrics.reporter.jmx.port: 1025-10000 metrics.reporter.your_monitor.class: com.your_company.YourMonitorClass metrics.reporter.your_monitor.interval: 10 SECONDS metrics.reporter.your_monitor.config.a: your_a_value metrics.reporter.your_monitor.config.b: your_b_value
Metric Reporter 是如何配置的?如上所示,首先 Metrics Reporters 的名字用逗號分隔,而後經過 metrics.reporter.jmx.class 的 classname 反射找 reporter,還須要拿到 metrics.reporter.jmx.port 的配置,好比像第三方系統經過網絡發送的比較多。但要知道往哪裏發,ip 地址、port 信息是比較常見的。此外還有 metrics.reporter.your_monitor.class 是必需要有的,能夠本身定義間隔時間,Flink 能夠解析,不須要自行去讀,而且還能夠寫本身的 config。
經常使用 Metrics 作自動化運維和性能分析。
自動化運維怎麼作?
性能分析通常遵循以下的流程:
首先從發現問題開始,若是有 Metrics 系統,再配上監控報警,就能夠很快定位問題。而後對問題進行剖析,大盤看問題會比較方便,經過具體的 System Metrics 分析,縮小範圍,驗證假設,找到瓶頸,進而分析緣由,從業務邏輯、JVM、 操做系統、State、數據分佈等多維度進行分析;若是還不能找到問題緣由,就只能藉助 profiling 工具了。
「任務慢,怎麼辦?」能夠稱之爲沒法解答的終極問題之一。
其緣由在於這種問題是系統框架問題,好比看醫生時告訴醫生身體不舒服,而後就讓醫生下結論。而一般醫生須要經過一系列的檢查來縮小範圍,肯定問題。同理,任務慢的問題也須要通過多輪剖析才能獲得明確的答案。
除了不熟悉 Flink 機制之外,大多數人的問題是對於整個系統跑起來是黑盒,根本不知道系統在如何運行,缺乏信息,沒法瞭解系統狀態。此時,一個有效的策略是求助 Metrics 來了解系統內部的情況,下面經過一些具體的例子來講明。
好比下圖 failover 指標,線上有一個不是 0,其它都是 0,此時就發現問題了。
再好比下圖 Input 指標正常都在4、五百萬,忽然跌成 0,這裏也存在問題。
業務延時問題以下圖,好比處理到的數據跟當前時間比對,發現處理的數據是一小時前的數據,平時都是處理一秒以前的數據,這也是有問題的。
當出現一個地方比較慢,可是不知道哪裏慢時,以下圖紅色部分,OUT_Q 併發值已經達到 100% 了,其它都還比較正常,甚至優秀。到這裏生產者消費者模型出現了問題,生產者 IN_Q 是滿的,消費者 OUT_Q 也是滿的,從圖中看出節點 4 已經很慢了,節點 1 產生的數據節點 4 處理不過來,而節點 5 的性能都很正常,說明節點 1 和節點 4 之間的隊列已經堵了,這樣咱們就能夠重點查看節點 1 和節點 4,縮小了問題範圍。
500 個 InBps 都具備 256 個 PARALLEL ,這麼多個點不可能一一去看,所以須要在聚合時把 index 是第幾個併發作一個標籤。聚合按着標籤進行劃分,看哪個併發是 100%。在圖中能夠劃分出最高的兩個線,即線 324 和線 115,這樣就又進一步的縮小了範圍。
利用 Metrics 縮小範圍的方式以下圖所示,就是用 Checkpoint Alignment 進行對齊,進而縮小範圍,但這種方法用的較少。
分析任務有時候爲何特別慢呢?
當定位到某一個 Task 處理特別慢時,須要對慢的因素作出分析。分析任務慢的因素是有優先級的,能夠從上向下查,由業務方面向底層系統。由於大部分問題都出如今業務維度上,好比查看業務維度的影響能夠有如下幾個方面,併發度是否合理、數據波峯波谷、數據傾斜;其次依次從 Garbage Collection、Checkpoint Alignment、State Backend 性能角度進行分析;最後從系統性能角度進行分析,好比 CPU、內存、Swap、Disk IO、吞吐量、容量、Network IO、帶寬等。
Metrics 是系統內部的監控,那是否能夠做爲 Flink 日誌分析的輸出?
能夠,可是沒有必要,都用 Flink 去處理其餘系統的日誌了,輸出或報警直接當作 sink 輸出就行了。由於 Metrics 是統計內部狀態,你這是處理正常輸入數據,直接輸出就能夠了
Reporter 是有專門的線程嗎?
每一個 Reporter 都有本身單獨的線程。在Flink的內部,線程其實仍是挺多的,若是跑一個做業,直接到 TaskManager 上,jstack 就能看到線程的詳情。
本文爲雲棲社區原創內容,未經容許不得轉載。