本文做者: 林武康(花名:知瑕),阿里巴巴計算平臺事業部技術專家,Apache HUE Contributor, 參與了多個開源項目的研發工做,對於分佈式系統設計應用有較豐富的經驗,目前主要專一於EMR數據開發相關的產品的研發工做。java
本文介紹Spark Operator的設計和實現相關的內容.git
通過近幾年的高速發展,分佈式計算框架的架構逐漸趨同. 資源管理模塊做爲其中最通用的模塊逐漸與框架解耦,獨立成通用的組件.目前大部分分佈式計算框架都支持接入多款不一樣的資源管理器. 資源管理器負責集羣資源的管理和調度,爲計算任務分配資源容器並保證資源隔離.Apache Spark做爲通用分佈式計算平臺,目前同時支持多款資源管理器,包括:github
Apache Spark的運行時框架以下圖所示, 其與各種資源調度器的交互流程比較相似.
圖1 Spark運行時框架(Client模式)
其中,Driver負責做業邏輯的調度和任務的監控, 資源管理器負責資源分配和監控.Driver根據部署模式的不一樣,啓動和運行的物理位置有所不一樣. 其中,Client模式下,Driver模塊運行在Spark-Submit進程中, Cluster模式下,Driver的啓動過程和Executor相似,運行在資源調度器分配的資源容器內.docker
K8s是Spark在2.3開始支持資源管理器,而相關討論早在2016年就已經開始展開(https://issues.apache.org/jira/browse/SPARK-18278). Spark對K8s的支持隨着版本的迭代也逐步深刻, 在即將發佈的3.0中,Spark on K8s提供了更好的Kerberos支持和資源動態支持的特性.apache
Kubernetes是由Google開源的一款面向應用的容器集羣部署和管理系統,近年來發展十分迅猛,相關生態已經日趨完善. 在Spark官方接入K8s前,社區一般經過在K8s集羣上部署一個Spark Standalone集羣的方式來實如今K8s集羣上運行Spark任務的目的.方案架構以下圖所示:
圖2 Spark Standalone on K8s
這個模式簡單易用,但存在至關大的缺陷:api
爲此,Spark社區進行了深刻而普遍的討論,在2.3版本提供了對K8s的官方支持.Spark接入K8s的好處是十分明顯的:網絡
Spark on K8s方案架構以下圖所示, 設計細節能夠參考:SPARK-18278
圖3 Spark on K8s (Native)
在這個方案中, 架構
當前的方案已經解決了Spark Standalone on K8s方案的部分缺陷,然而,Spark Application的生命週期管理方式和調度方式與K8s內置的工做負載(如Deployments、DaemonSets、StatefulSets等)存在較大差別,在K8s上執行做業仍然存在很多問題:app
固然Spark on K8s方案目前還在快速開發中,更多特性不久會發布出來,相信將來和K8s的集成會更加緊密和Native, 這些特性包括:框架
在分析Spark Operator的實現以前, 先簡單梳理下Kubernetes Operator的基本概念. Kubernetes Operator是由CoreOS開發的Kubernetes擴展特性, 目標是經過定義一系列CRD(自定義資源)和實現控制器,將特定領域的應用程序運維技術和知識(如部署方法、監控、故障恢復等)經過代碼的方式固化下來. Spark Operator是Google基於Operator模式開發的一款的工具(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator), 用於經過聲明式的方式向K8s集羣提交Spark做業.使用Spark Operator管理Spark應用,能更好的利用K8s原生能力控制和管理Spark應用的生命週期,包括應用狀態監控、日誌獲取、應用運行控制等,彌補Spark on K8s方案在集成K8s上與其餘類型的負載之間存在的差距.
下面簡單分析下Spark Operator的實現細節.
圖4 Spark Operator架構
能夠看出,Spark Operator相比Spark on K8s,架構上要複雜一些,實際上Spark Operator集成了Spark on K8s的方案,提供了更全面管控特性.經過Spark Operator,用戶可使用更加符合K8s理念的方式來控制Spark應用的生命週期.Spark Operator包括以下幾個組件:
Spark Operator除了實現基本的做業提交外,還支持以下特性:
Spark Operator的項目是標準的K8s Operator結構, 其中最重要的包括:
manifest: 定義了Spark相關的CRD,包括:
pkg: 具體的Operator邏輯實現
controller: 自定義控制器的實現,包括:
下面主要介紹下Spark Operator是如何管理Spark做業的.
控制器的代碼主要位於"pkg/controller/sparkapplication/controller.go"中.
提交做業的提交做業的主流程在submitSparkApplication方法中.
// controller.go // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication { if app.PrometheusMonitoringEnabled() { ... configPrometheusMonitoring(app, c.kubeClient) } // Use batch scheduler to perform scheduling task before submitting (before build command arguments). if needScheduling, scheduler := c.shouldDoBatchScheduling(app); needScheduling { newApp, err := scheduler.DoBatchSchedulingOnSubmission(app) ... //Spark submit will use the updated app to submit tasks(Spec will not be updated into API server) app = newApp } driverPodName := getDriverPodName(app) submissionID := uuid.New().String() submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID) ... // Try submitting the application by running spark-submit. submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, app)) ... app.Status = v1beta2.SparkApplicationStatus{ SubmissionID: submissionID, AppState: v1beta2.ApplicationState{ State: v1beta2.SubmittedState, }, DriverInfo: v1beta2.DriverInfo{ PodName: driverPodName, }, SubmissionAttempts: app.Status.SubmissionAttempts + 1, ExecutionAttempts: app.Status.ExecutionAttempts + 1, LastSubmissionAttemptTime: metav1.Now(), } c.recordSparkApplicationEvent(app) service, err := createSparkUIService(app, c.kubeClient) ... ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient) return app }
提交做業的核心邏輯在submission.go這個模塊中:
// submission.go func runSparkSubmit(submission *submission) (bool, error) { sparkHome, present := os.LookupEnv(sparkHomeEnvVar) if !present { glog.Error("SPARK_HOME is not specified") } var command = filepath.Join(sparkHome, "/bin/spark-submit") cmd := execCommand(command, submission.args...) glog.V(2).Infof("spark-submit arguments: %v", cmd.Args) output, err := cmd.Output() glog.V(3).Infof("spark-submit output: %s", string(output)) if err != nil { var errorMsg string if exitErr, ok := err.(*exec.ExitError); ok { errorMsg = string(exitErr.Stderr) } // The driver pod of the application already exists. if strings.Contains(errorMsg, podAlreadyExistsErrorCode) { glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name) return false, nil } if errorMsg != "" { return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg) } return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err) } return true, nil } func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName string, submissionID string) ([]string, error) { ... options, err := addDriverConfOptions(app, submissionID) ... options, err = addExecutorConfOptions(app, submissionID) ... } func getMasterURL() (string, error) { kubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar) ... kubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar) ... return fmt.Sprintf("k8s://https://%s:%s", kubernetesServiceHost, kubernetesServicePort), nil }
能夠看出,
提交成功後,還會作以下幾件事情:
另外,若是對Spark on K8s的使用文檔比較困惑,這段代碼是比較好的一個示例.
在Spark Operator中,Controller使用狀態機來維護Spark Application的狀態信息, 狀態流轉和Action的關係以下圖所示:
圖5 _State Machine for SparkApplication_
做業提交後,Spark Application的狀態更新都是經過getAndUpdateAppState()方法來實現的.
// controller.go func (c *Controller) getAndUpdateAppState(app *v1beta2.SparkApplication) error { if err := c.getAndUpdateDriverState(app); err != nil { return err } if err := c.getAndUpdateExecutorState(app); err != nil { return err } return nil } // getAndUpdateDriverState finds the driver pod of the application // and updates the driver state based on the current phase of the pod. func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) error { // Either the driver pod doesn't exist yet or its name has not been updated. ... driverPod, err := c.getDriverPod(app) ... if driverPod == nil { app.Status.AppState.ErrorMessage = "Driver Pod not found" app.Status.AppState.State = v1beta2.FailingState app.Status.TerminationTime = metav1.Now() return nil } app.Status.SparkApplicationID = getSparkApplicationID(driverPod) ... newState := driverStateToApplicationState(driverPod.Status) // Only record a driver event if the application state (derived from the driver pod phase) has changed. if newState != app.Status.AppState.State { c.recordDriverEvent(app, driverPod.Status.Phase, driverPod.Name) } app.Status.AppState.State = newState return nil } // getAndUpdateExecutorState lists the executor pods of the application // and updates the executor state based on the current phase of the pods. func (c *Controller) getAndUpdateExecutorState(app *v1beta2.SparkApplication) error { pods, err := c.getExecutorPods(app) ... executorStateMap := make(map[string]v1beta2.ExecutorState) var executorApplicationID string for _, pod := range pods { if util.IsExecutorPod(pod) { newState := podPhaseToExecutorState(pod.Status.Phase) oldState, exists := app.Status.ExecutorState[pod.Name] // Only record an executor event if the executor state is new or it has changed. if !exists || newState != oldState { c.recordExecutorEvent(app, newState, pod.Name) } executorStateMap[pod.Name] = newState if executorApplicationID == "" { executorApplicationID = getSparkApplicationID(pod) } } } // ApplicationID label can be different on driver/executors. Prefer executor ApplicationID if set. // Refer https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25922 for details. ... if app.Status.ExecutorState == nil { app.Status.ExecutorState = make(map[string]v1beta2.ExecutorState) } for name, execStatus := range executorStateMap { app.Status.ExecutorState[name] = execStatus } // Handle missing/deleted executors. for name, oldStatus := range app.Status.ExecutorState { _, exists := executorStateMap[name] if !isExecutorTerminated(oldStatus) && !exists { // If ApplicationState is SUCCEEDING, in other words, the driver pod has been completed // successfully. The executor pods terminate and are cleaned up, so we could not found // the executor pod, under this circumstances, we assume the executor pod are completed. if app.Status.AppState.State == v1beta2.SucceedingState { app.Status.ExecutorState[name] = v1beta2.ExecutorCompletedState } else { glog.Infof("Executor pod %s not found, assuming it was deleted.", name) app.Status.ExecutorState[name] = v1beta2.ExecutorFailedState } } } return nil }
從這段代碼能夠看到, Spark Application提交後,Controller會經過監聽Driver Pod和Executor Pod狀態來計算Spark Application的狀態,推進狀態機的流轉.
若是一個SparkApplication示例配置了開啓度量監控特性,那麼Spark Operator會在Spark-Submit提交參數中向Driver和Executor的JVM參數中添加相似"-javaagent:/prometheus/jmx_prometheus_javaagent-0.11.0.jar=8090:/etc/metrics/conf/prometheus.yaml"的JavaAgent參數來開啓SparkApplication度量監控,實現經過JmxExporter向Prometheus發送度量數據.
圖6 Prometheus架構
在Spark on K8s方案中, 用戶須要經過kubectl port-forward
命令創建臨時通道來訪問Driver的WebUI,這對於須要頻繁訪問多個做業的WebUI的場景來講很是麻煩. 在Spark Operator中,Spark Operator會在做業提交後,啓動一個Spark WebUI Service,並配置Ingress來提供更爲天然和高效的訪問途徑.
本文總結了Spark計算框架的基礎架構,介紹了Spark on K8s的多種方案,着重介紹了Spark Operator的設計和實現.K8s Operator尊從K8s設計理念,極大的提升了K8s的擴展能力.Spark Operator基於Operator範式實現了更爲完備的管控特性,是對官方Spark on K8s方案的有力補充.隨着K8s的進一步完善和Spark社區的努力,能夠預見Spark on K8s方案會逐漸吸納Spark Operator的相關特性,進一步提高雲原生體驗.
[1] Kubernetes Operator for Apache Spark Design
[2] What is Prometheus?
[3] Spark on Kubernetes 的現狀與挑戰
[4] Spark in action on Kubernetes - Spark Operator的原理解析
[5] Operator pattern
[6] Custom Resources
本文爲雲棲社區原創內容,未經容許不得轉載。