Spark Operator淺析

本文做者: 林武康(花名:知瑕),阿里巴巴計算平臺事業部技術專家,Apache HUE Contributor, 參與了多個開源項目的研發工做,對於分佈式系統設計應用有較豐富的經驗,目前主要專一於EMR數據開發相關的產品的研發工做。java

本文介紹Spark Operator的設計和實現相關的內容.git

Spark運行時架構

通過近幾年的高速發展,分佈式計算框架的架構逐漸趨同. 資源管理模塊做爲其中最通用的模塊逐漸與框架解耦,獨立成通用的組件.目前大部分分佈式計算框架都支持接入多款不一樣的資源管理器. 資源管理器負責集羣資源的管理和調度,爲計算任務分配資源容器並保證資源隔離.Apache Spark做爲通用分佈式計算平臺,目前同時支持多款資源管理器,包括:github

  • YARN
  • Mesos
  • Kubernetes(K8s)
  • Spark Standalone(自帶的資源管理器)

Apache Spark的運行時框架以下圖所示, 其與各種資源調度器的交互流程比較相似.
1
圖1 Spark運行時框架(Client模式)
其中,Driver負責做業邏輯的調度和任務的監控, 資源管理器負責資源分配和監控.Driver根據部署模式的不一樣,啓動和運行的物理位置有所不一樣. 其中,Client模式下,Driver模塊運行在Spark-Submit進程中, Cluster模式下,Driver的啓動過程和Executor相似,運行在資源調度器分配的資源容器內.docker

K8s是Spark在2.3開始支持資源管理器,而相關討論早在2016年就已經開始展開(https://issues.apache.org/jira/browse/SPARK-18278). Spark對K8s的支持隨着版本的迭代也逐步深刻, 在即將發佈的3.0中,Spark on K8s提供了更好的Kerberos支持和資源動態支持的特性.apache

Spark on K8s

Kubernetes是由Google開源的一款面向應用的容器集羣部署和管理系統,近年來發展十分迅猛,相關生態已經日趨完善. 在Spark官方接入K8s前,社區一般經過在K8s集羣上部署一個Spark Standalone集羣的方式來實如今K8s集羣上運行Spark任務的目的.方案架構以下圖所示:
2
圖2 Spark Standalone on K8s
這個模式簡單易用,但存在至關大的缺陷:api

  • 沒法按需擴展, Spark Standalone部署後集羣規模固定,沒法根據做業需求自動擴展集羣;
  • 沒法利用K8s原生能力, Spark Standalone內建的資源調度器不支持擴展,難以接入K8s調度,沒法利用K8s提供的雲原生特性;
  • Spark Standalone集羣在多租戶資源隔離上天生存在短板;

爲此,Spark社區進行了深刻而普遍的討論,在2.3版本提供了對K8s的官方支持.Spark接入K8s的好處是十分明顯的:網絡

  • 直接和K8s對接,能夠更加高效和快捷的獲取集羣資源;
  • 利用K8s原生能力(如namespace等)能夠更好的實現資源隔離和管控.

Spark on K8s方案架構以下圖所示, 設計細節能夠參考:SPARK-18278
3
圖3 Spark on K8s (Native)
在這個方案中, 架構

  1. Spark-Submit經過調用K8s API在K8s集羣中啓動一個Spark Driver Pod;
  2. Driver經過調用K8s API啓動相應的Executor Pod, 組成一個Spark Application集羣,並指派做業任務到這些Executor中執行;
  3. 做業結束後,Executor Pod會被銷燬, 而Driver Pod會持久化相關日誌,並保持在'completed'狀態,直到用戶手清理或被K8s集羣的垃圾回收機制回收.

當前的方案已經解決了Spark Standalone on K8s方案的部分缺陷,然而,Spark Application的生命週期管理方式和調度方式與K8s內置的工做負載(如Deployments、DaemonSets、StatefulSets等)存在較大差別,在K8s上執行做業仍然存在很多問題:app

  1. Spark-submit在K8s集羣以外,使用非聲明式的提交接口;
  2. Spark Application之間沒有協同調度,在小集羣中很容易出現調度餓死的狀況;
  3. 須要手動配置網絡,來訪問WebUI;
  4. 任務監控比較麻煩,沒有接入Prometheus集羣監控;

固然Spark on K8s方案目前還在快速開發中,更多特性不久會發布出來,相信將來和K8s的集成會更加緊密和Native, 這些特性包括:框架

  • 動態資源分配和外部Shullfe服務
  • 本地文件依賴管理器
  • Spark Application管理器
  • 做業隊列和資源管理器

Spark Operator淺析

在分析Spark Operator的實現以前, 先簡單梳理下Kubernetes Operator的基本概念. Kubernetes Operator是由CoreOS開發的Kubernetes擴展特性, 目標是經過定義一系列CRD(自定義資源)和實現控制器,將特定領域的應用程序運維技術和知識(如部署方法、監控、故障恢復等)經過代碼的方式固化下來. Spark Operator是Google基於Operator模式開發的一款的工具(https://github.com/GoogleCloudPlatform/spark-on-k8s-operator), 用於經過聲明式的方式向K8s集羣提交Spark做業.使用Spark Operator管理Spark應用,能更好的利用K8s原生能力控制和管理Spark應用的生命週期,包括應用狀態監控、日誌獲取、應用運行控制等,彌補Spark on K8s方案在集成K8s上與其餘類型的負載之間存在的差距.
下面簡單分析下Spark Operator的實現細節.

系統架構

4
圖4 Spark Operator架構
能夠看出,Spark Operator相比Spark on K8s,架構上要複雜一些,實際上Spark Operator集成了Spark on K8s的方案,提供了更全面管控特性.經過Spark Operator,用戶可使用更加符合K8s理念的方式來控制Spark應用的生命週期.Spark Operator包括以下幾個組件:

  1. SparkApplication控制器, 該控制器用於建立、更新、刪除SparkApplication對象,同時控制器還會監控相應的事件,執行相應的動做;
  2. Submission Runner, 負責調用spark-submit提交Spark做業, 做業提交的流程徹底複用Spark on K8s的模式;
  3. Spark Pod Monitor, 監控Spark做業相關Pod的狀態,並同步到控制器中;
  4. Mutating Admission Webhook: 可選模塊,基於註解來實現Driver/Executor Pod的一些定製化需求;
  5. SparkCtl: 用於和Spark Operator交互的命令行工具

Spark Operator除了實現基本的做業提交外,還支持以下特性:

  • 聲明式的做業管理;
  • 支持更新SparkApplication對象後自動從新提交做業;
  • 支持可配置的重啓策略;
  • 支持失敗重試;
  • 集成prometheus, 能夠收集和轉發Spark應用級別的度量和Driver/Executor的度量到prometheus中.

工程結構

Spark Operator的項目是標準的K8s Operator結構, 其中最重要的包括:

  • manifest: 定義了Spark相關的CRD,包括:

    • ScheduledSparkApplication: 表示一個定時執行的Spark做業
    • SparkApplication: 表示一個Spark做業
  • pkg: 具體的Operator邏輯實現

    • api: 定義了Operator的多個版本的API
    • client: 用於對接的client-go SDK
    • controller: 自定義控制器的實現,包括:

      • ScheduledSparkApplication控制器
      • SparkApplication控制器
    • batchscheduler: 批處理調度器集成模塊, 目前已經集成了K8s volcano調度器
  • spark-docker: spark docker 鏡像
  • sparkctl: spark operator命令行工具

下面主要介紹下Spark Operator是如何管理Spark做業的.

Spark Application控制器

控制器的代碼主要位於"pkg/controller/sparkapplication/controller.go"中.

提交流程

提交做業的提交做業的主流程在submitSparkApplication方法中.

// controller.go
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication {
    if app.PrometheusMonitoringEnabled() {
        ...
        configPrometheusMonitoring(app, c.kubeClient)
    }

    // Use batch scheduler to perform scheduling task before submitting (before build command arguments).
    if needScheduling, scheduler := c.shouldDoBatchScheduling(app); needScheduling {
        newApp, err := scheduler.DoBatchSchedulingOnSubmission(app)
        ...
        //Spark submit will use the updated app to submit tasks(Spec will not be updated into API server)
        app = newApp
    }

    driverPodName := getDriverPodName(app)
    submissionID := uuid.New().String()
    submissionCmdArgs, err := buildSubmissionCommandArgs(app, driverPodName, submissionID)
    ...
    // Try submitting the application by running spark-submit.
    submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, app))
    ...
    app.Status = v1beta2.SparkApplicationStatus{
        SubmissionID: submissionID,
        AppState: v1beta2.ApplicationState{
            State: v1beta2.SubmittedState,
        },
        DriverInfo: v1beta2.DriverInfo{
            PodName: driverPodName,
        },
        SubmissionAttempts:        app.Status.SubmissionAttempts + 1,
        ExecutionAttempts:         app.Status.ExecutionAttempts + 1,
        LastSubmissionAttemptTime: metav1.Now(),
    }
    c.recordSparkApplicationEvent(app)

    service, err := createSparkUIService(app, c.kubeClient)
    ...
    ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
    return app
}

提交做業的核心邏輯在submission.go這個模塊中:

// submission.go
func runSparkSubmit(submission *submission) (bool, error) {
    sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
    if !present {
        glog.Error("SPARK_HOME is not specified")
    }
    var command = filepath.Join(sparkHome, "/bin/spark-submit")

    cmd := execCommand(command, submission.args...)
    glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
    output, err := cmd.Output()
    glog.V(3).Infof("spark-submit output: %s", string(output))
    if err != nil {
        var errorMsg string
        if exitErr, ok := err.(*exec.ExitError); ok {
            errorMsg = string(exitErr.Stderr)
        }
        // The driver pod of the application already exists.
        if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
            glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
            return false, nil
        }
        if errorMsg != "" {
            return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
        }
        return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
    }

    return true, nil
}
func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName string, submissionID string) ([]string, error) {
    ...
    options, err := addDriverConfOptions(app, submissionID)
    ...
    options, err = addExecutorConfOptions(app, submissionID)
    ...
}
func getMasterURL() (string, error) {
    kubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
    ...
    kubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
    ...
    return fmt.Sprintf("k8s://https://%s:%s", kubernetesServiceHost, kubernetesServicePort), nil
}

能夠看出,

  1. 能夠配置控制器啓用Prometheus進行度量收集;
  2. Spark Operator經過拼裝一個spark-submit命令並執行,實現提交Spark做業到K8s集羣中的目的;
  3. 在每次提交前,Spark Operator都會生成一個UUID做爲Session Id,並經過Spark相關配置對driver/executor的pod進行標記.咱們可使用這個id來跟蹤和控制這個Spark做業;
  4. Controller經過監控相關做業的pod的狀態來更新SparkApplication的狀態,同時驅動SparkApplication對象的狀態流轉.
  5. 提交成功後,還會作以下幾件事情:

    1. 更新做業的狀態
    2. 啓動一個Service,並配置好Ingress,方便用戶訪問Spark WebUI

另外,若是對Spark on K8s的使用文檔比較困惑,這段代碼是比較好的一個示例.

狀態流轉控制

在Spark Operator中,Controller使用狀態機來維護Spark Application的狀態信息, 狀態流轉和Action的關係以下圖所示:
5
圖5 _State Machine for SparkApplication_
做業提交後,Spark Application的狀態更新都是經過getAndUpdateAppState()方法來實現的.

// controller.go
func (c *Controller) getAndUpdateAppState(app *v1beta2.SparkApplication) error {
    if err := c.getAndUpdateDriverState(app); err != nil {
        return err
    }
    if err := c.getAndUpdateExecutorState(app); err != nil {
        return err
    }
    return nil
}
// getAndUpdateDriverState finds the driver pod of the application
// and updates the driver state based on the current phase of the pod.
func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) error {
    // Either the driver pod doesn't exist yet or its name has not been updated.
    ...
    driverPod, err := c.getDriverPod(app)
    ...
    if driverPod == nil {
        app.Status.AppState.ErrorMessage = "Driver Pod not found"
        app.Status.AppState.State = v1beta2.FailingState
        app.Status.TerminationTime = metav1.Now()
        return nil
    }
    
    app.Status.SparkApplicationID = getSparkApplicationID(driverPod)
    ...
    newState := driverStateToApplicationState(driverPod.Status)
    // Only record a driver event if the application state (derived from the driver pod phase) has changed.
    if newState != app.Status.AppState.State {
        c.recordDriverEvent(app, driverPod.Status.Phase, driverPod.Name)
    }
    app.Status.AppState.State = newState

    return nil
}

// getAndUpdateExecutorState lists the executor pods of the application
// and updates the executor state based on the current phase of the pods.
func (c *Controller) getAndUpdateExecutorState(app *v1beta2.SparkApplication) error {
    pods, err := c.getExecutorPods(app)
    ...
    executorStateMap := make(map[string]v1beta2.ExecutorState)
    var executorApplicationID string
    for _, pod := range pods {
        if util.IsExecutorPod(pod) {
            newState := podPhaseToExecutorState(pod.Status.Phase)
            oldState, exists := app.Status.ExecutorState[pod.Name]
            // Only record an executor event if the executor state is new or it has changed.
            if !exists || newState != oldState {
                c.recordExecutorEvent(app, newState, pod.Name)
            }
            executorStateMap[pod.Name] = newState

            if executorApplicationID == "" {
                executorApplicationID = getSparkApplicationID(pod)
            }
        }
    }

    // ApplicationID label can be different on driver/executors. Prefer executor ApplicationID if set.
    // Refer https://issues.apache.org/jira/projects/SPARK/issues/SPARK-25922 for details.
    ...
    if app.Status.ExecutorState == nil {
        app.Status.ExecutorState = make(map[string]v1beta2.ExecutorState)
    }
    for name, execStatus := range executorStateMap {
        app.Status.ExecutorState[name] = execStatus
    }

    // Handle missing/deleted executors.
    for name, oldStatus := range app.Status.ExecutorState {
        _, exists := executorStateMap[name]
        if !isExecutorTerminated(oldStatus) && !exists {
            // If ApplicationState is SUCCEEDING, in other words, the driver pod has been completed
            // successfully. The executor pods terminate and are cleaned up, so we could not found
            // the executor pod, under this circumstances, we assume the executor pod are completed.
            if app.Status.AppState.State == v1beta2.SucceedingState {
                app.Status.ExecutorState[name] = v1beta2.ExecutorCompletedState
            } else {
                glog.Infof("Executor pod %s not found, assuming it was deleted.", name)
                app.Status.ExecutorState[name] = v1beta2.ExecutorFailedState
            }
        }
    }

    return nil
}

從這段代碼能夠看到, Spark Application提交後,Controller會經過監聽Driver Pod和Executor Pod狀態來計算Spark Application的狀態,推進狀態機的流轉.

度量監控

若是一個SparkApplication示例配置了開啓度量監控特性,那麼Spark Operator會在Spark-Submit提交參數中向Driver和Executor的JVM參數中添加相似"-javaagent:/prometheus/jmx_prometheus_javaagent-0.11.0.jar=8090:/etc/metrics/conf/prometheus.yaml"的JavaAgent參數來開啓SparkApplication度量監控,實現經過JmxExporter向Prometheus發送度量數據.
6
圖6 Prometheus架構

WebUI

在Spark on K8s方案中, 用戶須要經過kubectl port-forward命令創建臨時通道來訪問Driver的WebUI,這對於須要頻繁訪問多個做業的WebUI的場景來講很是麻煩. 在Spark Operator中,Spark Operator會在做業提交後,啓動一個Spark WebUI Service,並配置Ingress來提供更爲天然和高效的訪問途徑.

小結

本文總結了Spark計算框架的基礎架構,介紹了Spark on K8s的多種方案,着重介紹了Spark Operator的設計和實現.K8s Operator尊從K8s設計理念,極大的提升了K8s的擴展能力.Spark Operator基於Operator範式實現了更爲完備的管控特性,是對官方Spark on K8s方案的有力補充.隨着K8s的進一步完善和Spark社區的努力,能夠預見Spark on K8s方案會逐漸吸納Spark Operator的相關特性,進一步提高雲原生體驗.

參考資料:

[1] Kubernetes Operator for Apache Spark Design
[2] What is Prometheus?
[3] Spark on Kubernetes 的現狀與挑戰
[4] Spark in action on Kubernetes - Spark Operator的原理解析
[5] Operator pattern
[6] Custom Resources

 

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索