任何系統都須要提供監控功能,不然在運行期間發生一些異常時,咱們將會一籌莫展。也許有人說,能夠增長日誌來解決這個問題。日誌只能解決你的程序邏輯在運行期的監控,進而發現Bug,以及提供對業務有幫助的調試信息。當你的JVM進程奔潰或者程序響應速度很慢時,這些日誌將毫無用處。好在JVM提供了jstat、jstack、jinfo、jmap、jhat等工具幫助咱們分析,更有VisualVM的可視化界面以更加直觀的方式對JVM運行期的情況進行監控。此外,像Tomcat、Hadoop等服務都提供了基於Web的監控頁面,用瀏覽器能訪問具備樣式及佈局,並提供豐富監控數據的頁面無疑是一種簡單、高效的方式。css
Spark天然也提供了Web頁面來瀏覽監控數據,並且Master、Worker、Driver根據自身功能提供了不一樣內容的Web監控頁面。不管是Master、Worker,仍是Driver,它們都使用了統一的Web框架WebUI。Master、Worker及Driver分別使用MasterWebUI、WorkerWebUI及SparkUI提供的Web界面服務,後三者都繼承自WebUI,並增長了個性化的功能。此外,在Yarn或Mesos模式下還有WebUI的另外一個擴展實現HistoryServer。HistoryServer將會展示已經運行完成的應用程序信息。本章以SparkUI爲例,並深刻分析WebUI的框架體系。html
在大型分佈式系統中,採用事件監聽機制是最多見的。爲何要使用事件監聽機制?假如Spark UI採用Scala的函數調用方式,那麼隨着整個集羣規模的增長,對函數的調用會愈來愈多,最終會受到Driver所在JVM的線程數量限制而影響監控數據的更新,甚至出現監控數據沒法及時顯示給用戶的狀況。因爲函數調用多數狀況下是同步調用,這就致使線程被阻塞,在分佈式環境中,還可能由於網絡問題,致使線程被長時間佔用。將函數調用更換爲發送事件,事件的處理是異步的,當前線程能夠繼續執行後續邏輯進而被快速釋放。線程池中的線程還能夠被重用,這樣整個系統的併發度會大大增長。發送的事件會存入緩存,由定時調度器取出後,分配給監聽此事件的監聽器對監控數據進行更新。Spark UI就是這樣的服務,它的構成如圖1所示。web
圖1 SparkUI的組成apache
圖1展現了SparkUI中的各個組件,這裏對這些組件做簡單介紹:json
Spark UI構建在WebUI的框架體系之上,所以應當首先了解WebUI。WebUI定義了一種Web界面展示的框架,並提供返回Json格式數據的Web服務。WebUI用於展現一組標籤頁,WebUITab定義了標籤頁的規範。每一個標籤頁中包含着一組頁面,WebUIPage定義了頁面的規範。咱們將首先了解WebUIPage和WebUITab,最後從總體來看WebUI。後端
任何的Web界面每每由多個頁面組成,每一個頁面都將提供不一樣的內容展現。WebUIPage是WebUI框架體系的頁節點,定義了全部頁面應當遵循的規範。抽象類WebUIPage的定義見代碼清單1。api
代碼清單1 WebUIPage的定義數組
private[spark] abstract class WebUIPage(var prefix: String) { def render(request: HttpServletRequest): Seq[Node] def renderJson(request: HttpServletRequest): JValue = JNothing }
WebUIPage定義了兩個方法。瀏覽器
WebUIPage在WebUI框架體系中的上一級節點(也能夠稱爲父親)能夠是WebUI或者WebUITab,其成員屬性prefix將與上級節點的路徑一塊兒構成當前WebUIPage的訪問路徑。緩存
有時候Web界面須要將多個頁面做爲一組內容放置在一塊兒,這時候標籤頁是常見的展示形式。標籤頁WebUITab定義了全部標籤頁的規範,並用於展示一組WebUIPage。抽象類WebUITab的定義見代碼清單2。
代碼清單2 WebUITab的定義
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }
根據代碼清單2,能夠看到WebUITab有四個成員屬性:
此外,WebUITab還有三個成員方法,下面介紹它們的做用:
WebUI是Spark實現的用於提供Web界面展示的框架,凡是須要頁面展示的地方均可以繼承它來完成。WebUI定義了WebUI框架體系的規範。爲便於理解,首先明確WebUI中各個成員屬性的含義:
瞭解了WebUI的成員屬性,如今就能夠理解其提供的各個方法了。WebUI提供的方法有:
代碼清單3 attachHandler的實現
def attachHandler(handler: ServletContextHandler) { handlers += handler serverInfo.foreach(_.addHandler(handler)) }
代碼清單4 detachHandler的實現
def detachHandler(handler: ServletContextHandler) { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) }
代碼清單5 attachPage的實現
def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler }
代碼清單6 detachPage的實現
def detachPage(page: WebUIPage) { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) }
代碼清單7 attachTab的實現
def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab }
代碼清單8 detachTab的實現
def detachTab(tab: WebUITab) { tab.pages.foreach(detachPage) tabs -= tab }
代碼清單9 addStaticHandler的實現
def addStaticHandler(resourceBase: String, path: String): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) }
代碼清單10 removeStaticHandler的實現
def removeStaticHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) }
代碼清單11 bind的實現
def bind() { assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } }
def webUrl: String = shttp://$publicHostName:$boundPort
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
代碼清單12 stop方法的實現
def stop() { assert(serverInfo.isDefined, s"Attempted to stop $className before binding to a server!") serverInfo.get.stop() }
在SparkContext的初始化過程當中,會建立SparkUI。有了對WebUI的整體認識,如今是時候瞭解SparkContext是如何構造SparkUI的了。SparkUI是WebUI框架的使用範例,瞭解了SparkUI的建立過程,讀者對MasterWebUI、WorkerWebUI及HistoryServer的建立過程也必然瞭然於心。建立SparkUI的代碼以下:
_statusTracker = new SparkStatusTracker(this) _progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None } _ui.foreach(_.bind())
這段代碼的執行步驟以下。
1) 建立Spark狀態跟蹤器SparkStatusTracker。
2) 建立ConsoleProgressBar。能夠配置spark.ui.showConsoleProgress屬性爲false取消對ConsoleProgressBar的建立,此屬性默認爲true。
3) 調用SparkUI的createLiveUI方法建立SparkUI。
4) 給SparkUI綁定端口。SparkUI繼承自WebUI,所以調用了代碼清單4-12中WebUI的bind方法啓動SparkUI底層的Jetty服務。
上述步驟中,第1)、2)、4)步都很簡單,因此着重來分析第3)步。SparkUI的createLiveUI的實現以下。
def createLiveUI( sc: SparkContext, conf: SparkConf, listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String, startTime: Long): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener), startTime = startTime) }
能夠看到SparkUI的createLiveUI方法中調用了create方法。create的實現以下。
private def create( sc: Option[SparkContext], conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) }
能夠看到create方法裏除了JobProgressListener是外部傳入的以外,又增長了一些SparkListener,例如用於對JVM參數、Spark屬性、Java系統屬性、classpath等進行監控的EnvironmentListener;用於維護Executor的存儲狀態的StorageStatusListener;用於準備將Executor的信息展現在ExecutorsTab的ExecutorsListener;用於準備將Executor相關存儲信息展現在BlockManagerUI的StorageListener;用於構建RDD的DAG(有向無關圖)的RDDOperationGraphListener等。這5個SparkListener的實現添加到listenerBus的監聽器列表中。最後使用SparkUI的構造器建立SparkUI。
調用SparkUI的構造器建立SparkUI,實際也是對SparkUI的初始化過程。在介紹初始化以前,先來看看SparkUI中的兩個成員屬性。
SparkUI的構造過程當中會執行initialize方法,其實現見代碼清單13。
代碼清單13 SparkUI的初始化
def initialize() { val jobsTab = new JobsTab(this) attachTab(jobsTab) val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) // These should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) } initialize()
根據代碼清單13,SparkUI的初始化步驟以下。
1) 構建頁面佈局並給每一個WebUITab中的全部WebUIPage建立對應的ServletContextHandler。這一步使用了代碼清單4-8中展現的attachTab方法。
2) 調用JettyUtils的createStaticHandler方法建立對靜態目錄org/apache/spark/ui/static提供文件服務的ServletContextHandler,並使用attachHandler方法追加到SparkUI的服務中。
3) 調用JettyUtils的createRedirectHandler方法建立幾個將用戶對源路徑的請求重定向到目標路徑的ServletContextHandler。例如,將用戶對根路徑"/"的請求重定向到目標路徑"/jobs/"的ServletContextHandler。
SparkUI到底是如何實現頁面佈局及展現的? 因爲全部標籤頁都繼承了SparkUITab,因此咱們先來看看SparkUITab的實現:
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) extends WebUITab(parent, prefix) { def appName: String = parent.getAppName }
根據上述代碼,咱們知道SparkUITab繼承了WebUITab,並在實現中增長了一個用於獲取當前應用名稱的方法appName。EnvironmentTab是用於展現JVM、Spark屬性、系統屬性、類路徑等相關信息的標籤頁,因爲其實現簡單且能說明問題,因此本節挑選EnvironmentTab做爲示例解答本節一開始提出的問題。
EnvironmentTab的實現見代碼清單14。
代碼清單14 EnvironmentTab的實現
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") { val listener = parent.environmentListener attachPage(new EnvironmentPage(this)) }
根據代碼清單14,咱們知道EnvironmentTab引用了SparkUI的environmentListener(類型爲EnvironmentListener),而且包含EnvironmentPage這個頁面。EnvironmentTab經過調用attachPage方法將EnvironmentPage與Jetty服務關聯起來。根據代碼清單5中attachPage的實現,建立的renderHandler將採用偏函數(request: HttpServletRequest) => page.render(request) 處理請求,於是會調用EnvironmentPage的render方法。EnvironmentPage的render方法將會渲染頁面元素。EnvironmentPage的實現見代碼清單15。
代碼清單15 EnvironmentPage的實現
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener private def removePass(kv: (String, String)): (String, String) = { if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) { (kv._1, "******") } else kv } def render(request: HttpServletRequest): Seq[Node] = { // 調用UIUtils的listingTable方法生成JVM運行時信息、Spark屬性信息、系統屬性信息、類路徑信息的表格 val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) val sparkPropertiesTable = UIUtils.listingTable( propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true) val systemPropertiesTable = UIUtils.listingTable( propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true) val classpathEntriesTable = UIUtils.listingTable( classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true) val content = <span> <h4>Runtime Information</h4> {runtimeInformationTable} <h4>Spark Properties</h4> {sparkPropertiesTable} <h4>System Properties</h4> {systemPropertiesTable} <h4>Classpath Entries</h4> {classpathEntriesTable} </span> // 調用UIUtils的headerSparkPage方法封裝好css、js、header及頁面佈局等 UIUtils.headerSparkPage("Environment", content, parent) } // 定義JVM運行時信息、Spark屬性信息、系統屬性信息的表格頭部propertyHeader和類路徑信息的表格頭部 // classPathHeaders private def propertyHeader = Seq("Name", "Value") private def classPathHeaders = Seq("Resource", "Source") // 定義JVM運行時信息的表格中每行數據的生成方法jvmRow private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr> private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr> private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr> }
根據代碼清單15,EnvironmentPage的render方法利用從父節點EnvironmentTab中獲得的EnvironmentListener中的統計監控數據生成JVM運行時、Spark屬性、系統屬性以及類路徑等狀態的摘要信息。以JVM運行時爲例,頁面渲染的步驟以下:
1) 定義JVM運行時信息、Spark屬性信息、系統屬性信息的表格頭部propertyHeader和類路徑信息的表格頭部classPathHeaders。
2) 定義JVM運行時信息的表格中每行數據的生成方法jvmRow。
3) 調用UIUtils的listingTable方法生成JVM運行時信息、Spark屬性信息、系統屬性信息、類路徑信息的表格。
4) 調用UIUtils的headerSparkPage方法封裝好css、js、header及頁面佈局等。
UIUtils工具類的實現細節留給感興趣的讀者自行查閱,本文很少贅述。
[1]本節內容用到JettyUtils中的不少方法,讀者能夠在附錄C中找到相應的實現與說明。
通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
紙質版售賣連接以下: