Flink系列之Metrics

 Flink是一個針對流數據和批處理的分佈式處理引擎,近兩年才真正的頻繁出如今數據處理領域 。其實Flink在2014年就已經成爲ASF(Apache Software Foundation)的頂級項目之一,也許以前是被spark掩蓋了光芒,spark在數據處理上的優點不能否認,可是我的通過對spark和flink的源碼研讀和項目實戰後,更偏心flink一些。在實時計算方面,相對於spark的微批處理(micro batch),flink的數據處理方式更真正稱得上流處理,不久前release的spark 2.3目前也已經提供了相似flink流處理的方式,可是目前尚未通過大型互聯網公司的實戰驗證,不知道其線上表現如何,咱們能夠拭目以待;除了流處理的方式之外,flink在內存管理,網絡傳輸方面也頗有其獨特之處,另外spark SQL和flink SQL相比,在代碼層面上,flink作了簡單的封裝後直接利用了calcite的API,讓SQL變的再也不那麼的神祕,更便於咱們自定義語義,定製咱們本身的SQL語句;Flink基於其狀態機制提供的CEP(Complex Event Processing)Library可讓咱們在流處理過程匹配出咱們定義的事件組合。這些我以後都會在flink系列裏一一作其原理說明和代碼解讀。
 
回到此篇文章的標題:flink-metrics。爲何要以這篇文章做爲系列的開頭,開頭不該該是flink的原理和組件說明此類的文章嗎?是的,本應該是從淺入深的去開啓flink系列,可是個人考慮有兩點:其一,此文章是以試水爲目的,想了解一下你們的興趣點在哪,若是你們大部分是身經百戰的流計算實踐者,那flink小白入門篇的文章貌似不用再去贅述了。但若是你們對flink很感興趣,可是對其基本原理和概念不是很瞭解,那我係列第二篇會補上。其二,目前我本身的項目須要,對metrics看的多了些,想記錄一下,也算是作個回顧吧。

 

Flink  Metrics指任務在flink集羣中運行過程當中的各項指標,包括機器系統指標:Hostname,CPU,Memory,Thread,GC,NetWork,IO 和 任務運行組件指標:JobManager,TaskManager,Job, Task,Operater相關指標。Flink提供metrics的目的有兩點:第一,實時採集metrics的數據供flink UI進行數據展現,用戶能夠在頁面上看到本身提交任務的狀態,延遲等信息。第二,對外提供metrics收集接口,用戶能夠將整個fllink集羣的metrics經過MetricsReport上報至第三方系統進行存儲,展現和監控。第二種對大型的互聯網公司頗有用,通常他們的集羣規模比較大,不可能經過flink UI進行全部任務的展現,因此就經過metrics上報的方式進行dashboard的展現,同時存儲下來的metrics能夠用於監控報警,更進一步來講,能夠用歷史數據進行數據挖掘產生更大的價值。Flink原生的提供了幾種主流的第三方上報方式:JMXReporter,GangliaReport,GraphiteReport等,用戶能夠直接配置使用。

 

Flink Metrics是經過引入com.codahale.metrics包實現的,它將收集的metrics分爲四大類:Counter,Gauge,Histogram和Meter下面分別說明:
  • Counter  計數器 ,用來統計一個metrics的總量。拿flink中的指標來舉例,像Task/Operator中的numRecordsIn(此task或者operator接收到的record總量)和numRecordsOut(此task或者operator發送的record總量)就屬於Counter。
  • Gauge   指標值  , 用來記錄一個metrics的瞬間值。拿flink中的指標舉例,像JobManager或者TaskManager中的JVM.Heap.Used就屬於Gauge,記錄某個時刻JobManager或者TaskManager所在機器的JVM堆使用量。
  • Histogram  直方圖, 有的時候咱們不知足於只拿到metrics的總量或者瞬時值,當想獲得metrics的最大值,最小值,中位數等信息時,咱們就能用到Histogram了。Flink中屬於Histogram的指標不多,可是最重要的一個是屬於operator的latency。此項指標會記錄數據處理的延遲信息,對任務監控起到很重要的做用。
  • Meter  平均值, 用來記錄一個metrics某個時間段內平均值。flink中相似指標有task/operator中的numRecordsInPerSecond,字面意思就能夠理解,指的是此task或者operator每秒接收的記錄數。
 
Metrics代碼解析
那Flink代碼中是怎樣對metrics進行收集的呢(具體代碼在flink-runtime的metrics包裏)。下面咱們就來按步驟說明:
  1. flink中先會定義好ScopeFormat,scopeFormat定義了各種組件metrics_group的範圍,而後各個組件(JobManager,TaskManager,Operator等)都會繼承ScopeFormat類進行各自的format實現。
  2. 然後開始定義各個組件的metricsGroup。每一個group中定義屬於這個組件中全部的metrics。好比TaskIOMetricGroup類,就定義了task執行過程當中有關IO的metrics。
  3. 定義好各個metricsGroup後,在初始化各個組件的時候,會將相應的metricsGroup當作參數放入構造函數中進行初始化。咱們拿JobManager爲例來講:                             class JobManager(protected val flinkConfiguration: Configuration,
    protected val futureExecutor: ScheduledExecutorService, protected val ioExecutor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val blobServer: BlobServer, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, protected val restartStrategyFactory: RestartStrategyFactory, protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, protected val jobRecoveryTimeout: FiniteDuration, protected val jobManagerMetricGroup: JobManagerMetricGroup, protected val optRestAddress: Option[String])
    初始化JobManager的時候帶上了JobManagerMetricGroup,後面此類在preStart()方法中調用了instantiateMetrics(jobManagerMetricGroup),咱們再看instantiateMetrics方法內容:   
  1.  private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
    jobManagerMetricGroup.gauge[Long, Gauge[Long]]("taskSlotsAvailable", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getNumberOfAvailableSlots }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("taskSlotsTotal", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getTotalNumberOfSlots }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRegisteredTaskManagers", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getNumberOfRegisteredTaskManagers }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] { override def getValue: Long = JobManager.this.currentJobs.size }) } 在instantiateMetrics方法內,把相應的metrics都加入到了jobManagerMetricGroup中,這樣就創建了metrics和metrics_group的映射關係。
  2.  隨後,在各個組件中實例化MetricRegistryImpl,而後利用MetricRegistry的startQueryService方法啓動metrics查詢服務(本質上是啓動相應的Akka Actor)
  3. 最後,利用flink的原生reporter(主要是上文介紹的三種方式)和MetricRegistry創建聯繫,這樣report裏就能夠拿出全部採集到的metrics,進而將metrics發往第三方系統。
 
Metrics配置
當咱們瞭解了flink metrics的具體實現步驟後,那就是上手操做了,怎樣配置才能讓metrics生效呢?接下來就介紹一下配置步驟:
  • flink目錄下有一個conf的文件夾,conf下有一個flink-conf.yaml文件,全部的flink有關配置都在這裏進行。
  • 配置metrics_scope,metrics_scope指定metrics上報時的組合方式。一共有6個scope須要配置:                                                    metrics.scope.jm      配置JobManager相關metrics,默認格式爲 <host>.jobmanager                                                   metrics.scope.jm.job   配置JobManager上Job的相關metrics,默認格式爲 <host>.jobmanager.<job_name>
               metrics.scope.tm         配置TaskManager上相關metrics,默認格式爲  <host>.taskmanager.<tm_id>     
               metrics.scope.tm.job   配置TaskManager上Job相關metrics,默認格式爲 <host>.taskmanager.<tm_id>.<job_name>
               metrics.scope.task   配置Task相關metrics,默認爲 <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
              metrics.scope.operator  配置Operator相關metrics,默認格式 爲 <host>.taskmanager.<tm_id>.<job_name>.            <operator_name>.<subtask_index>
          以上6種scope能夠根據用戶意願改變組合方式,例如 metrics.scope.operator,我能夠改爲  <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,修改後,接收到的operator的metrics就會是以下格式:<host>.<job_name>.<task_name>.<operator_name>.<subtask_index>.xxx =  xxxx(若是所有用默認,則能夠不須要在文件裏配置,源碼裏已經指定了默認值)
  • 配置Report,Report相關配置根據其不一樣的實現類有所不一樣,我就用項目目前使用的GraphiteReport爲例來講明:
          metrics.reporters: grph
          metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
          metrics.reporter.grph.host: xxx
          metrics.reporter.grph.port: xxx
          metrics.reporter.grph.protocol: TCP/UDP
      metrics.reporters指定report的名稱,metrics.reporter.grph.class指定具體的MetricsReport實現類,metrics.reporter.grph.host指定遠端graphite主機ip,metrics.reporter.grph.port指定遠端graphite監聽端口,metrics.reporter.grph.protocol指定graphite利用的協議。
  • 最後保存文件,重啓flink集羣便可生效
若是咱們不使用flink原生的MetricsReport,想本身實現定製的Report能夠嗎?答案是確定的,用戶能夠參照GraphiteReporter類,自定義類繼承 ScheduledDropwizardReporter類,重寫report方法便可。咱們如今除了利用GraphiteReport,也本身定義了KafkaReport上報定製的metrics來知足更多的用戶需求。
 
總結
Flink的metrics在整個項目裏是不可或缺的,社區裏Jira也常常有人提出各類improvement,要加入這樣那樣的metrics,但最終社區接受的不多,由於這些都是對我的公司定製化的代碼改動,meger進master的意義不大。合理的利用metrics能夠及時的發現集羣和任務的情況,從而進行相應的對策,能夠保證集羣的穩定性,避免沒必要要的損失。
相關文章
相關標籤/搜索