Apache Flink 進階(八):詳解 Metrics 原理與實戰

  • 做者:劉彪
  • 整理:毛鶴

本文由 Apache Flink Contributor 劉彪分享,本文對兩大問題進行了詳細的介紹,即什麼是 Metrics、如何使用 Metrics,並對 Metrics 監控實戰進行解釋說明。html

什麼是 Metrics?

Flink 提供的 Metrics 能夠在 Flink 內部收集一些指標,經過這些指標讓開發人員更好地理解做業或集羣的狀態。因爲集羣運行後很難發現內部的實際情況,跑得慢或快,是否異常等,開發人員沒法實時查看全部的 Task 日誌,好比做業很大或者有不少做業的狀況下,該如何處理?此時 Metrics 能夠很好的幫助開發人員瞭解做業的當前情況。java

Metric Types

Metrics 的類型以下:sql

  1. 首先,經常使用的如 Counter,寫過 mapreduce 做業的開發人員就應該很熟悉 Counter,其實含義都是同樣的,就是對一個計數器進行累加,即對於多條數據和多兆數據一直往上加的過程。
  2. 第二,Gauge,Gauge 是最簡單的 Metrics,它反映一個值。好比要看如今 Java heap 內存用了多少,就能夠每次實時的暴露一個 Gauge,Gauge 當前的值就是heap使用的量。
  3. 第三,Meter,Meter 是指統計吞吐量和單位時間內發生「事件」的次數。它至關於求一種速率,即事件次數除以使用的時間。
  4. 第四,Histogram,Histogram 比較複雜,也並不經常使用,Histogram 用於統計一些數據的分佈,好比說 Quantile、Mean、StdDev、Max、Min 等。

Metric Group

Metric 在 Flink 內部有多層結構,以 Group 的方式組織,它並非一個扁平化的結構,Metric Group + Metric Name 是 Metrics 的惟一標識。apache

Metric Group 的層級有 TaskManagerMetricGroup 和TaskManagerJobMetricGroup,每一個 Job 具體到某一個 task 的 group,task 又分爲 TaskIOMetricGroup 和 OperatorMetricGroup。Operator 下面也有 IO 統計和一些 Metrics,整個層級大概以下圖所示。Metrics 不會影響系統,它處在不一樣的組中,而且 Flink支持本身去加 Group,能夠有本身的層級。安全

•TaskManagerMetricGroup
    •TaskManagerJobMetricGroup
        •TaskMetricGroup
            •TaskIOMetricGroup
            •OperatorMetricGroup
                •${User-defined Group} / ${User-defined Metrics}
                •OperatorIOMetricGroup
•JobManagerMetricGroup
    •JobManagerJobMetricGroup

JobManagerMetricGroup 相對簡單,至關於 Master,它的層級也相對較少。網絡

Metrics 定義仍是比較簡單的,即指標的信息能夠本身收集,本身統計,在外部系統可以看到 Metrics 的信息,並可以對其進行聚合計算。多線程

如何使用 Metrics?

System Metrics

System Metrics,將整個集羣的狀態已經涵蓋得很是詳細。具體包括如下方面:架構

  • Master 級別和 Work 級別的 JVM 參數,如 load 和 time;其 Memory 劃分也很詳細,包括 heap 的使用狀況,non-heap 的使用狀況,direct 的使用狀況,以及 mapped 的使用狀況;Threads 能夠看到具體有多少線程;還有很是實用的 Garbage Collection。
  • Network 使用比較普遍,當須要解決一些性能問題的時候,Network 很是實用。Flink 不僅是網絡傳輸,仍是一個有向無環圖的結構,能夠看到它的每一個上下游都是一種簡單的生產者消費者模型。Flink 經過網絡至關於標準的生產者和消費者中間經過有限長度的隊列模型。若是想要評估定位性能,中間隊列會迅速縮小問題的範圍,可以很快的找到問題瓶頸。
•CPU
•Memory
•Threads
•Garbage Collection
•Network
•Classloader
•Cluster
•Availability
•Checkpointing
•StateBackend
•IO
•詳見: [https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html)
  • 運維集羣的人會比較關心 Cluster 的相關信息,若是做業太大,則須要很是關注 Checkpointing,它有可能會在一些常規的指標上沒法體現出潛在問題。好比 Checkpointing 長時間沒有工做,數據流看起來沒有延遲,此時可能會出現做業一切正常的假象。另外,若是進行了一輪 failover 重啓以後,由於 Checkpointing 長時間沒有工做,有可能會回滾到很長一段時間以前的狀態,整個做業可能就直接廢掉了。
  • RocksDB 是生產環境當中比較經常使用的 state backend 實現,若是數據量足夠大,就須要多關注 RocksDB 的 Metrics,由於它隨着數據量的增大,性能可能會降低。

User-defined Metrics

除了系統的 Metrics 以外,Flink 支持自定義 Metrics ,即 User-defined Metrics。上文說的都是系統框架方面,對於本身的業務邏輯也能夠用 Metrics 來暴露一些指標,以便進行監控。併發

User-defined Metrics 如今說起的都是 datastream 的 API,table、sql 可能須要 context 協助,但若是寫 UDF,它們實際上是大同小異的。app

Datastream 的 API 是繼承 RichFunction ,繼承 RichFunction 才能夠有 Metrics 的接口。而後經過 RichFunction 會帶來一個 getRuntimeContext().getMetricGroup().addGroup(…) 的方法,這裏就是 User-defined Metrics 的入口。經過這種方式,能夠自定義 user-defined Metric Group。若是想定義具體的 Metrics,一樣須要用getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…) 方法,它會有相應的構造函數,能夠定義到本身的 Metrics 類型中。

繼承 RichFunction
    •Register user-defined Metric Group: getRuntimeContext().getMetricGroup().addGroup(…)
    •Register user-defined Metric: getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…)

User-defined Metrics Example

下面經過一段簡單的例子說明如何使用 Metrics。好比,定義了一個 Counter 傳一個 name,Counter 默認的類型是 single counter(Flink 內置的一個實現),能夠對 Counter 進行 inc()操做,並在代碼裏面直接獲取。

Meter 也是這樣,Flink 有一個內置的實現是 Meterview,由於 Meter 是多長時間內發生事件的記錄,因此它是要有一個多長時間的窗口。日常用 Meter 時直接 markEvent(),至關於加一個事件不停地打點,最後用 getrate() 的方法直接把這一段時間發生的事件除一下給算出來。

Gauge 就比較簡單了,把當前的時間打出來,用 Lambda 表達式直接把 System::currentTimeMillis 打進去就能夠,至關於每次調用的時候都會去真正調一下系統當天時間進行計算。

Histogram 稍微複雜一點,Flink 中代碼提供了兩種實現,在此取一其中個實現,仍然須要一個窗口大小,更新的時候能夠給它一個值。

這些 Metrics 通常都不是線程安全的。若是想要用多線程,就須要加同步,更多詳情請參考下面連接。

•Counter processedCount = getRuntimeContext().getMetricGroup().counter("processed_count");
  processedCount.inc();
•Meter processRate = getRuntimeContext().getMetricGroup().meter("rate", new MeterView(60));
  processRate.markEvent();
•getRuntimeContext().getMetricGroup().gauge("current_timestamp", System::currentTimeMillis);
•Histogram histogram = getRuntimeContext().getMetricGroup().histogram("histogram", new DescriptiveStatisticsHistogram(1000));
  histogram.update(1024);
•[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#metric-types]

獲取 Metrics

獲取 Metrics 有三種方法,首先能夠在 WebUI 上看到;其次能夠經過 RESTful API 獲取,RESTful API 對程序比較友好,好比寫自動化腳本或程序,自動化運維和測試,經過 RESTful API 解析返回的 Json 格式對程序比較友好;最後,還能夠經過 Metric Reporter 獲取,監控主要使用 Metric Reporter 功能。

獲取 Metrics 的方式在物理架構上是怎樣實現的?

瞭解背景和原理會對使用有更深入的理解。WebUI 和 RESTful API 是經過中心化節點按期查詢把各個組件中的 Metrics 拉上來的實現方式。其中,fetch 不必定是實時更新的,默認爲 10 秒,因此有可能在 WebUI 和 RESTful API 中刷新的數據不是實時想要獲得的數據;此外,fetch 有可能不一樣步,好比兩個組件,一邊在加另外一邊沒有動,多是因爲某種緣由超時沒有拉過來,這樣是沒法更新相關值的,它是 try best 的操做,因此有時咱們看到的指標有可能會延遲,或許等待後相關值就更新了。

紅色的路徑經過 MetricFetcher,會有一箇中心化的節點把它們聚合在一塊兒展現。而 MetricReporter 不同,每個單獨的點直接彙報,它沒有中心化節點幫助作聚合。若是想要聚合,須要在第三方系統中進行,好比常見的 TSDB 系統。固然,不是中心化結構也是它的好處,它能夠免去中心化節點帶來的問題,好比內存放不下等,MetricReporter 把原始數據直接 Reporter 出來,用原始數據作處理會有更強大的功能。

img1

Metric Reporter

Flink 內置了不少 Reporter,對外部系統的技術選型能夠參考,好比 JMX 是 java 自帶的技術,不嚴格屬於第三方。還有InfluxDB、Prometheus、Slf4j(直接打 log 裏)等,調試時候很好用,能夠直接看 logger,Flink 自己自帶日誌系統,會打到 Flink 框架包裏面去。詳見:

•Flink 內置了不少 Reporter,對外部系統的技術選型能夠參考,詳見:[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#reporter](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html)
•Metric Reporter Configuration Example
 metrics.reporters: your_monitor,jmx
 metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
 metrics.reporter.jmx.port: 1025-10000
 metrics.reporter.your_monitor.class: com.your_company.YourMonitorClass
 metrics.reporter.your_monitor.interval: 10 SECONDS
 metrics.reporter.your_monitor.config.a: your_a_value
 metrics.reporter.your_monitor.config.b: your_b_value

Metric Reporter 是如何配置的?如上所示,首先 Metrics Reporters 的名字用逗號分隔,而後經過 metrics.reporter.jmx.class 的 classname 反射找 reporter,還須要拿到 metrics.reporter.jmx.port 的配置,好比像第三方系統經過網絡發送的比較多。但要知道往哪裏發,ip 地址、port 信息是比較常見的。此外還有 metrics.reporter.your_monitor.class 是必需要有的,能夠本身定義間隔時間,Flink 能夠解析,不須要自行去讀,而且還能夠寫本身的 config。

實戰:利用 Metrics 監控

經常使用 Metrics 作自動化運維和性能分析。

自動化運維

img2

自動化運維怎麼作?

  • 首先,收集一些關鍵的 Metrics 做爲決策依據,利用 Metric Reporter 收集 Metrics 到存儲/分析系統 (例如 TSDB),或者直接經過 RESTful API 獲取。
  • 有了數據以後,能夠定製監控規則,關注關鍵指標,Failover、Checkpoint,、業務 Delay 信息。定製規則用途最廣的是能夠用來報警,省去不少人工的工做,而且能夠定製 failover 多少次時須要人爲介入。
  • 當出現問題時,有釘釘報警、郵件報警、短信報警、電話報警等通知工具。
  • 自動化運維的優點是能夠經過大盤、報表的形式清晰的查看數據,經過大盤時刻了解做業整體信息,經過報表分析優化。

性能分析

性能分析通常遵循以下的流程:

img3

首先從發現問題開始,若是有 Metrics 系統,再配上監控報警,就能夠很快定位問題。而後對問題進行剖析,大盤看問題會比較方便,經過具體的 System Metrics 分析,縮小範圍,驗證假設,找到瓶頸,進而分析緣由,從業務邏輯、JVM、 操做系統、State、數據分佈等多維度進行分析;若是還不能找到問題緣由,就只能藉助 profiling 工具了。

實戰:「個人任務慢,怎麼辦」

「任務慢,怎麼辦?」能夠稱之爲沒法解答的終極問題之一。

其緣由在於這種問題是系統框架問題,好比看醫生時告訴醫生身體不舒服,而後就讓醫生下結論。而一般醫生須要經過一系列的檢查來縮小範圍,肯定問題。同理,任務慢的問題也須要通過多輪剖析才能獲得明確的答案。

除了不熟悉 Flink 機制之外,大多數人的問題是對於整個系統跑起來是黑盒,根本不知道系統在如何運行,缺乏信息,沒法瞭解系統狀態。此時,一個有效的策略是求助 Metrics 來了解系統內部的情況,下面經過一些具體的例子來講明。

發現問題

好比下圖 failover 指標,線上有一個不是 0,其它都是 0,此時就發現問題了。

img4

再好比下圖 Input 指標正常都在4、五百萬,忽然跌成 0,這裏也存在問題。

img5

業務延時問題以下圖,好比處理到的數據跟當前時間比對,發現處理的數據是一小時前的數據,平時都是處理一秒以前的數據,這也是有問題的。

img6

縮小範圍,定位瓶頸

當出現一個地方比較慢,可是不知道哪裏慢時,以下圖紅色部分,OUT_Q 併發值已經達到 100% 了,其它都還比較正常,甚至優秀。到這裏生產者消費者模型出現了問題,生產者 IN_Q 是滿的,消費者 OUT_Q 也是滿的,從圖中看出節點 4 已經很慢了,節點 1 產生的數據節點 4 處理不過來,而節點 5 的性能都很正常,說明節點 1 和節點 4 之間的隊列已經堵了,這樣咱們就能夠重點查看節點 1 和節點 4,縮小了問題範圍。

img7

500 個 InBps 都具備 256 個 PARALLEL ,這麼多個點不可能一一去看,所以須要在聚合時把 index 是第幾個併發作一個標籤。聚合按着標籤進行劃分,看哪個併發是 100%。在圖中能夠劃分出最高的兩個線,即線 324 和線 115,這樣就又進一步的縮小了範圍。

img8

利用 Metrics 縮小範圍的方式以下圖所示,就是用 Checkpoint Alignment 進行對齊,進而縮小範圍,但這種方法用的較少。

img9

多維度分析

分析任務有時候爲何特別慢呢?

當定位到某一個 Task 處理特別慢時,須要對慢的因素作出分析。分析任務慢的因素是有優先級的,能夠從上向下查,由業務方面向底層系統。由於大部分問題都出如今業務維度上,好比查看業務維度的影響能夠有如下幾個方面,併發度是否合理、數據波峯波谷、數據傾斜;其次依次從 Garbage Collection、Checkpoint Alignment、State Backend 性能角度進行分析;最後從系統性能角度進行分析,好比 CPU、內存、Swap、Disk IO、吞吐量、容量、Network IO、帶寬等。

Q&A

Metrics 是系統內部的監控,那是否能夠做爲 Flink 日誌分析的輸出?

能夠,可是沒有必要,都用 Flink 去處理其餘系統的日誌了,輸出或報警直接當作 sink 輸出就行了。由於 Metrics 是統計內部狀態,你這是處理正常輸入數據,直接輸出就能夠了

Reporter 是有專門的線程嗎?

每一個 Reporter 都有本身單獨的線程。在Flink的內部,線程其實仍是挺多的,若是跑一個做業,直接到 TaskManager 上,jstack 就能看到線程的詳情。

 

 

 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索