對於一個系統而言,首先考慮要知足一些業務場景,並實現功能。隨着系統功能愈來愈多,代碼量級愈來愈高,系統的可維護性、可測試性、性能都會成爲新的挑戰,這時監控功能就變得愈來愈重要了。在國內,絕大多數IT公司的項目都以業務爲導向,以完成功能爲目標,這些項目在立項、設計、開發、上線的各個階段,不多有人會考慮到監控的問題。在國內,開發人員可以認真的在代碼段落中打印日誌,就已經屬於最優秀的程序員了。然而,在國外的不少項目則不會這樣,看看久負盛名的Hadoop的監控系統就可見一斑,尤爲是在Facebook,更是把功能、日誌以及監控列爲同等重要,做爲一個合格工程師的三駕馬車。html
Spark做爲優秀的開源系統,在監控方面也有本身的一整套體系。一個系統有了監控功能後將收穫諸多益處,如可測試性、性能優化、運維評估、數據統計等。Spark的度量系統使用codahale提供的第三方度量倉庫Metrics,本節將着重介紹Spark基於Metrics構建度量系統的原理與實現。對於Metrics感興趣的讀者,能夠參考閱讀《附錄D Metrics簡介》中的內容。前端
Spark的度量系統中有三個概念:程序員
爲了更加直觀的表現上述概念,咱們以圖1來表示Spark中度量系統的工做流程。django
圖1 度量系統的工做流程瀏覽器
任何監控都離不開度量數據的採集,離線的數據採集很容易作到和被採集模塊之間的解耦,可是對於實時度量數據,尤爲是那些內存中數據的採集就很難解耦。這就相似於網頁監控數據的埋點同樣,你要在網頁中加入一段額外的js代碼(例如Google分析,即使你只是引入一個js文件,這很難讓前端工程師感到開心)。還有一類監控,好比在Java Web中增長一個負責監控的Servlet或者一個基於Spring3.0的攔截器,這種方式雖然將耦合度從代碼級別下降到配置級別,但卻沒法有效的對內存中的數據結構進行監控。Spark的度量系統對系統功能來講是在代碼層面耦合的,這種犧牲對於可以換取對實時的、處於內存中的數據進行更有效的監控是值得的。性能優化
Spark將度量來源抽象爲Source,其定義見代碼清單1。服務器
代碼清單1 度量源的定義前端工程師
private[spark] trait Source { def sourceName: String def metricRegistry: MetricRegistry }
Spark中有不少Source的具體實現,能夠經過圖2來了解。數據結構
圖2 Source的繼承體系架構
爲了說明Source該如何實現,咱們選擇ApplicationSource(也是由於其實現簡單明瞭,足以說明問題)爲例,其實現見代碼清單2。
代碼清單2 ApplicationSource的實現
private[master] class ApplicationSource(val application: ApplicationInfo) extends Source { override val metricRegistry = new MetricRegistry() override val sourceName = "%s.%s.%s".format("application", application.desc.name, System.currentTimeMillis()) metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] { override def getValue: String = application.state.toString }) metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] { override def getValue: Long = application.duration }) metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) }
望文生義,ApplicationSource用於採集Spark應用程序相關的度量。代碼清單2中ApplicationSource重載了metricRegistry和sourceName,而且向自身的註冊表註冊了status(即應用狀態,包括:WAITING, RUNNING, FINISHED, FAILED, KILLED, UNKNOWN)、runtime_ms(運行持續時長)、cores(受權的內核數)等度量。這三個度量的取值分別來自於ApplicationInfo的state、duration和coresGranted三個屬性。這三個度量都由Gauge的匿名內部類實現,Gauge是Metrics提供的用於估計度量值的特質。有關Gauge、MetricRegistry、MetricRegistry註冊度量的方法register及命名方法name的更詳細介紹請閱讀《附錄D Metrics簡介》。
Source準備好度量數據後,咱們就須要考慮如何輸出和使用的問題。這裏介紹一些常見的度量輸出方式:阿里數據部門採用的一種度量使用方式就是輸出到日誌;在命令行運行過Hadoop任務(例如:mapreduce)的使用者也會發現控制檯打印的內容中也包含度量信息;用戶可能但願將有些度量信息保存到文件(例如CSV),以便未來可以查看;若是以爲使用CSV或者控制檯等方式不夠直觀,還能夠將採集到的度量數據輸出到專用的監控系統界面。這些最終對度量數據的使用,或者說是輸出方式,Spark將它們統一抽象爲Sink。Sink的定義見代碼清單3。
代碼清單3 度量輸出的定義
private[spark] trait Sink { def start(): Unit def stop(): Unit def report(): Unit }
從代碼清單3能夠看到Sink是一個特質,包含三個接口方法:
從這三個方法的解釋來看,很難讓讀者得到更多的信息。咱們先把這些困惑放在一邊,來看看Spark中Sink的類繼承體系,如圖3所示。
圖3 Sink的類繼承體系
圖3中展現了6種Sink的具體實現。
瞭解了Sink的類繼承體系,咱們挑選Slf4jSink做爲Spark中Sink實現類的例子,來了解Sink具體該如何實現。Slf4jSink的實現見代碼清單4。
代碼清單4 Slf4jSink的實現
private[spark] class Slf4jSink( val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { val SLF4J_DEFAULT_PERIOD = 10 val SLF4J_DEFAULT_UNIT = "SECONDS" val SLF4J_KEY_PERIOD = "period" val SLF4J_KEY_UNIT = "unit" val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { case Some(s) => s.toInt case None => SLF4J_DEFAULT_PERIOD } val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) } MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() override def start() { reporter.start(pollPeriod, pollUnit) } override def stop() { reporter.stop() } override def report() { reporter.report() } }
從Slf4jSink的實現能夠看到Slf4jSink的start、stop及report實際都是代理了Metrics庫中的Slf4jReporter的start、stop及report方法。Slf4jReporter的start方法實際是其父類ScheduledReporter的start實現。而傳遞的兩個參數pollPeriod和pollUnit,正是被ScheduledReporter使用做爲定時器獲取數據的週期和時間單位。有關ScheduledReporter中start、stop及Slf4jReporter的report方法的實現能夠參閱《附錄D Metrics簡介》。
通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
紙質版售賣連接以下: