1)Spark Web UI主要依賴於流行的Servlet容器Jetty實現;css
2)Spark Web UI(Spark2.3以前)是展現運行情況、資源狀態和監控指標的前端,而這些數據都是由度量系統(MetricsSystem)收集來的;html
3)Spark Web UI(spark2.3以後)呈現的數據應該與事件總線和ElementTrackingStore關係緊密,而MetricsSystem是一個向外部提供測量指標的存在
前端具體Spark UI存儲更改能夠經過spark issue查看:java
Key-value store abstraction and implementation for storing application datanode
Use key-value store to keep History Server application listinggit
Hook up Spark UI to the new key-value store backendgithub
Implement listener for saving application status data in key-value storeweb
Make Environment page use new app state storeapache
Make Executors page use new app state storejson
Spark UI界面能夠包含選項卡:Jobs,Stages,Storage,Enviroment,Executors,SQL
接下來讓咱們從源碼入手查看下Spark UI(http server)是如何被啓動的,頁面中的數據從哪裏獲取到。
Spark UI中用到的http server是jetty,jetty採用java編寫,是比較不錯的servlet engine和http server,能嵌入到用戶程序中執行,不用用tomcat或jboss那樣須要運行在獨立jvm進程中。
Spark UI(http server)在SparkContext初始化的時候被建立:
...
private var _listenerBus: LiveListenerBus = _
private var _statusStore: AppStatusStore = _
... private[spark] def ui: Option[SparkUI] = _ui _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get)
。。。。
_ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) 。。。 _ui.foreach(_.setAppId(_applicationId))
...
其中,_statusStore是AppStatusStore初始化對象,它內部包裝了KVStore和AppStatusListener:
_env.securityManager則是SparkEnv中初始化的安全管理器。
SparkContext經過調用SparkUI伴生對象中的create()方法來直接new出SparkUI實例,而後調用bind()方法將SparkUI綁定到Jetty服務。
SparkUI調用create方法後會初始化一個SparkUI對象,在SparkUI對象被初始化時,會調用SparkUI的initialize()方法
private[spark] class SparkUI private ( val store: AppStatusStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, var appName: String, val basePath: String, val startTime: Long, val appSparkVersion: String) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging with UIRoot { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) var appId: String = _ private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ def initialize(): Unit = { val jobsTab = new JobsTab(this, store) attachTab(jobsTab) val stagesTab = new StagesTab(this, store) attachTab(stagesTab) attachTab(new StorageTab(this, store)) attachTab(new EnvironmentTab(this, store)) attachTab(new ExecutorsTab(this)) addStaticHandler(SparkUI.STATIC_RESOURCE_DIR) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) // These should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) }
SparkUI類中有3個屬性成員:
在initialize()方法中,
備註:ServletContextHandler是Jetty中一個功能完善的處理器,負責接收並處理HTTP請求,再投遞給Servlet。
上邊每一個JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab除了包含有渲染頁面類,還包含資源html&js&css&其餘(圖片)(https://github.com/apache/spark/tree/branch-2.4/core/src/main/resources/org/apache/spark/ui/static)
在上邊SparkContext初始化時,建立了SparkUI對象,將會調用bind()方法將SparkUI綁定到Jetty服務,這個bind()方法SparkUI子類WebUI中的一個方法。
protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs def getHandlers: Seq[ServletContextHandler] = handlers def getSecurityManager: SecurityManager = securityManager
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
WebUI屬性成員有6個:
Getter方法有4個:
這些方法都是成對出現,一共有3對:
/** Attaches a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab): Unit = { tab.pages.foreach(attachPage) tabs += tab } /** Detaches a tab from this UI, along with all of its attached pages. */ def detachTab(tab: WebUITab): Unit = { tab.pages.foreach(detachPage) tabs -= tab }
/** Detaches a page from this UI, along with all of its attached handlers. */ def detachPage(page: WebUIPage): Unit = { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) } /** Attaches a page to this UI. */ def attachPage(page: WebUIPage): Unit = { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler }
/** Attaches a handler to this UI. */ def attachHandler(handler: ServletContextHandler): Unit = { handlers += handler serverInfo.foreach(_.addHandler(handler)) } /** Detaches a handler from this UI. */ def detachHandler(handler: ServletContextHandler): Unit = { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) } def detachHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) }
def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
attachPage()方法流程:
/** Binds to the HTTP server behind this web interface. */ def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
這個bind()方法,包含節點信息:
a)其中調用startJettyServer(...)方法,該方法是JettyUtils.scala中的一個方法,這點也說明了SparkUI運行時基於jetty實現的。
b)調用startjettyServer(...)方法傳遞了host,port參數,這兩個參數也是Spark UI訪問的ip和端口,咱們須要瞭解下這兩個參數具體的配置在哪裏。
√ host的獲取代碼:
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
spark-env.sh包含了參數(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):
# Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
https://github.com/apache/spark/blob/branch-2.4/conf/spark-env.sh.template
√ ip的獲取代碼在SparkUI object靜態類中
private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } /** * Create a new UI backed by an AppStatusStore. */ def create( sc: Option[SparkContext], store: AppStatusStore, conf: SparkConf, securityManager: SecurityManager, appName: String, basePath: String, startTime: Long, appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = { new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion) } }
Spark Web UI其實是一個三層的樹形結構,樹根節點爲WebUI,中層節點是WebUITab,葉子節點是WebUIPage。
UI界面的展現就主要靠WebUITab與WebUIPage來實現。在Spark UI界面中,一個Tab(WebUITab)能夠包含一個或多個Page(WebUIPage),且Tab(WebUITab)是可選的。
WebUI定義:上邊有講解,這裏就再也不貼代碼。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
WebUITab定義:
/** * A tab that represents a collection of pages. * The prefix is appended to the parent address to form a full path, and must not contain slashes. */ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } /** Get a list of header tabs from the parent UI. */ def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。
WebUIPage定義:
/** * A page that represents the leaf node in the UI hierarchy. * * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab. * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path. * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent * to form a relative path. The prefix must not contain slashes. */ private[spark] abstract class WebUIPage(var prefix: String) { def render(request: HttpServletRequest): Seq[Node] def renderJson(request: HttpServletRequest): JValue = JNothing }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。
WebUITab與WebUIPage各有一系列實現類,具體請參考代碼:https://github.com/apache/spark/tree/branch-2.4/core/src/main/scala/org/apache/spark/ui/exec
以Executors這個Tab頁爲例,由於這個頁面具備表明性,一個Tab下能夠展現兩個Page(ExecutorsPage、ExecutorThreadDumpPage)
在Spark UI中Tab下包含頁面以下:
executors->ExecutorsPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=1
executors->ExecutorThreadDumpPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=[executorId或者driver]
首先看下ExecutorsTab的代碼
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { init() private def init(): Unit = { val threadDumpEnabled = parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) attachPage(new ExecutorsPage(this, threadDumpEnabled)) if (threadDumpEnabled) { attachPage(new ExecutorThreadDumpPage(this, parent.sc)) } } }
其中SparkUITab就是對WebUITab的簡單封裝,加上了Application名稱和Spark版本屬性。ExecutorsTab類包含了init()方法,在構造函數中調用了該init()方法,init()方法內部調用了SparkUITab類預約義好的attachPage(...)方法,將ExecutorsPage加入,當屬性threadDumpEnabled爲true時,也將ExecutorThreadDumpPage加入。
再來看下ExecutorsPage的代碼
private[ui] class ExecutorsPage( parent: SparkUITab, threadDumpEnabled: Boolean) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val content = <div> { <div id="active-executors" class="row-fluid"></div> ++ <script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++ <script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++ <script>setThreadDumpEnabled({threadDumpEnabled})</script> } </div> UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true) } }
render()方法用來渲染頁面內容,其流程以下:
1)將content內容封裝好;
2)調用UIUtils.headerSparkPage()方法,將content內容響應給瀏覽器;
3)瀏覽器加載過程當中會調用executorspage.js,該JS內部會經過Rest服務器根據當前applicationId,去獲取allexecutos等信息,並將allexecutos信息按照模板executorspage-template.html渲染到executors頁面上。
spark rest 服務實現代碼路徑:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/status/api/v1/
返回allexecutors信息的方法:
@GET @Path("allexecutors") def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
這裏的store正式SparkUI的store。從該方法能夠看出來:實際上,spark rest服務提供的數據是存儲在SparkContext的AppStatusStore對象store上。
最後看下ExecutorThreadDumpPage的代碼
private[ui] class ExecutorThreadDumpPage( parent: SparkUITab, sc: Option[SparkContext]) extends WebUIPage("threadDump") { // stripXSS is called first to remove suspicious characters used in XSS attacks def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId => UIUtils.decodeURLParameter(executorId) }.getOrElse { throw new IllegalArgumentException(s"Missing executorId parameter") } val time = System.currentTimeMillis() val maybeThreadDump = sc.get.getExecutorThreadDump(executorId) val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.map { thread => val threadId = thread.threadId val blockedBy = thread.blockedByThreadId match { case Some(_) => <div> Blocked by <a href={s"#${thread.blockedByThreadId}_td_id"}> Thread {thread.blockedByThreadId} {thread.blockedByLock}</a> </div> case None => Text("") } val heldLocks = thread.holdingLocks.mkString(", ") <tr id={s"thread_${threadId}_tr"} class="accordion-heading" onclick={s"toggleThreadStackTrace($threadId, false)"} onmouseover={s"onMouseOverAndOut($threadId)"} onmouseout={s"onMouseOverAndOut($threadId)"}> <td id={s"${threadId}_td_id"}>{threadId}</td> <td id={s"${threadId}_td_name"}>{thread.threadName}</td> <td id={s"${threadId}_td_state"}>{thread.threadState}</td> <td id={s"${threadId}_td_locking"}>{blockedBy}{heldLocks}</td> <td id={s"${threadId}_td_stacktrace"} class="hidden">{thread.stackTrace.html}</td> </tr> } <div class="row-fluid"> <p>Updated at {UIUtils.formatDate(time)}</p> { // scalastyle:off <p><a class="expandbutton" onClick="expandAllThreadStackTrace(true)"> Expand All </a></p> <p><a class="expandbutton hidden" onClick="collapseAllThreadStackTrace(true)"> Collapse All </a></p> <div class="form-inline"> <div class="bs-example" data-example-id="simple-form-inline"> <div class="form-group"> <div class="input-group"> Search: <input type="text" class="form-control" id="search" oninput="onSearchStringChange()"></input> </div> </div> </div> </div> <p></p> // scalastyle:on } <table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}> <thead> <th onClick="collapseAllThreadStackTrace(false)">Thread ID</th> <th onClick="collapseAllThreadStackTrace(false)">Thread Name</th> <th onClick="collapseAllThreadStackTrace(false)">Thread State</th> <th onClick="collapseAllThreadStackTrace(false)">Thread Locks</th> </thead> <tbody>{dumpRows}</tbody> </table> </div> }.getOrElse(Text("Error fetching thread dump")) UIUtils.headerSparkPage(request, s"Thread dump for executor $executorId", content, parent) } }
該頁面主要展現當前executor中thread運行狀況。
參考:
《Apache Spark源碼走讀之21 -- WEB UI和Metrics初始化及數據更新過程分析》
《Spark Structrued Streaming源碼分析--(四)ProgressReporter每一個流處理進度計算、StreamQueryManager管理運行的流》