做者 | 阿里雲智能事業羣技術專家 莫源node
[](https://www.atatech.org/artic... Operator的內部實現
在深刻解析Spark Operator以前,咱們先補充一些關於kubernetes operator的知識。2018年能夠說是kubernetes operator氾濫的一年,各類operator如雨後春筍般出現。operator是擴展kubernetes以及與kubernetes集成的最佳方式之一。在kubernetes的設計理念中,有很重要的一條就是進行了抽象,好比對存儲進行抽象、對應用負載進行抽象、對接入層進行抽象等等。每一個抽象又對應了各自生命週期管理的controller,開發者提交的Yaml其實是對抽象終態的描述,而controller會監聽抽象的變化、解析並進行處理,最終嘗試將狀態修正到終態。api
那麼對於在kubernetes中未定義的抽象該如何處理呢,答案就是operator。一個標準operator一般包含以下幾個部分:1. CRD抽象的定義,負責描述抽象所能包含的功能。 2.CRD Controller ,負責解析CRD定義的內容以及生命週期的管理。3.clent-go的SDK,負責提供代碼集成時使用的SDK。app
有了這個知識儲備,那麼咱們回過頭來看Spark Operator的代碼,結構基本就比較明晰了。核心的代碼邏輯都在pkg下,其中apis下面主要是定義了不一樣版本的API;client目錄下主要是自動生成的client-go的SDK;crd目錄下主要是定義的兩個自定義資源sparkapplication和scheduledsparkapplication的結構。controller目錄下主要定義的就是這個operator的生命週期管理的邏輯;config目錄下主要處理spark config的轉換。瞭解一個Operator能力最快捷的方式,就是查看CRD的定義。在Spark Operator中定義了sparkapplication和scheduledsparkapplication兩個CRD,他們之間有什麼區別呢?
sparkapplication 是對常規spark任務的抽象,做業是單次運行的,做業運行完畢後,全部的Pod會進入Succeed或者Failed的狀態。而scheduledsparkapplication是對離線定時任務的一種抽象,開發者能夠在scheduledsparkapplication中定義相似crontab的任務,實現spark離線任務的週期性定時調度。ui
上面這張圖是Spark中kubernetes的集成圖,也就是說當咱們經過spark-submit提交做業的時候,會自動生成driver pod與exector pods。那麼引入了Spark Operator後,這個流程變成了什麼呢?this
func (c Controller) submitSparkApplication(app v1beta1.SparkApplication) *v1beta1.SparkApplication {阿里雲
// prometheus的監控指標的暴露 appToSubmit := app.DeepCopy() if appToSubmit.Spec.Monitoring != nil && appToSubmit.Spec.Monitoring.Prometheus != nil { if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err != nil { glog.Error(err) } }
// 將CRD中的定義轉變爲spark-submit的命令spa
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit) if err != nil { app.Status = v1beta1.SparkApplicationStatus{ AppState: v1beta1.ApplicationState{ State: v1beta1.FailedSubmissionState, ErrorMessage: err.Error(), }, SubmissionAttempts: app.Status.SubmissionAttempts + 1, LastSubmissionAttemptTime: metav1.Now(), } return app } // 在operator容器內經過spark-submit提交做業 submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit)) if err != nil { app.Status = v1beta1.SparkApplicationStatus{ AppState: v1beta1.ApplicationState{ State: v1beta1.FailedSubmissionState, ErrorMessage: err.Error(), }, SubmissionAttempts: app.Status.SubmissionAttempts + 1, LastSubmissionAttemptTime: metav1.Now(), } c.recordSparkApplicationEvent(app) glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err) return app } // 由於Pod的狀態也會被Spark Operator進行觀測,所以driver pod宕掉會被從新拉起 // 這是和直接跑spark-submit的一大區別,提供了故障恢復的能力。 if !submitted { // The application may not have been submitted even if err == nil, e.g., when some // state update caused an attempt to re-submit the application, in which case no // error gets returned from runSparkSubmit. If this is the case, we simply return. return app } glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name) app.Status = v1beta1.SparkApplicationStatus{ AppState: v1beta1.ApplicationState{ State: v1beta1.SubmittedState, }, SubmissionAttempts: app.Status.SubmissionAttempts + 1, ExecutionAttempts: app.Status.ExecutionAttempts + 1, LastSubmissionAttemptTime: metav1.Now(), } c.recordSparkApplicationEvent(app) // 經過service暴露spark-ui service, err := createSparkUIService(app, c.kubeClient) if err != nil { glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err) } else { app.Status.DriverInfo.WebUIServiceName = service.serviceName app.Status.DriverInfo.WebUIPort = service.nodePort // Create UI Ingress if ingress-format is set. if c.ingressURLFormat != "" { ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient) if err != nil { glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err) } else { app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL app.Status.DriverInfo.WebUIIngressName = ingress.ingressName } } } return app
}
其實到此,咱們就已經基本瞭解Spark Operator作的事情了,首先定義了兩種不一樣的CRD對象,分別對應普通的計算任務與定時週期性的計算任務,而後解析CRD的配置文件,拼裝成爲spark-submit的命令,經過prometheus暴露監控數據採集接口,建立Service提供spark-ui的訪問。而後經過監聽Pod的狀態,不斷回寫更新CRD對象,實現了spark做業任務的生命週期管理。設計
[](https://www.atatech.org/artic... Operator的任務狀態機
當咱們瞭解了Spark Operator的設計思路和基本流程後,還須要深刻了解的就是sparkapplication的狀態都包含哪些,他們之間是如何進行轉換的,由於這是Spark Operator對於生命週期管理加強最重要的部分。日誌
一個Spark的做業任務能夠經過上述的狀態機轉換圖進行表示,一個正常的做業任務經歷以下幾個狀態:code
New -> Submitted -> Running -> Succeeding -> Completed
而當任務失敗的時候會進行重試,若重試超過最大重試次數則會失敗。也就是說若是在任務的執行過程當中,因爲資源、調度等因素形成Pod被驅逐或者移除,Spark Operator都會經過自身的狀態機狀態轉換進行重試。
[](https://www.atatech.org/artic... Operator的狀態排查
咱們已經知道了Spark Operator最核心的功能就是將CRD的配置轉換爲spark-submit的命令,那麼當一個做業運行不預期的時候,咱們該如何判斷是哪一層出現的問題呢?首先咱們要判斷的就是spark-submit時所生成的參數是不是預期的,由於CRD的Yaml配置雖然能夠加強表達能力,可是提升了配置的難度與出錯的可能性。
func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar) if !present { glog.Error("SPARK_HOME is not specified") } var command = filepath.Join(sparkHome, "/bin/spark-submit") cmd := execCommand(command, submission.args...) glog.V(2).Infof("spark-submit arguments: %v", cmd.Args) if _, err := cmd.Output(); err != nil { var errorMsg string if exitErr, ok := err.(*exec.ExitError); ok { errorMsg = string(exitErr.Stderr) } // The driver pod of the application already exists. if strings.Contains(errorMsg, podAlreadyExistsErrorCode) { glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name) return false, nil } if errorMsg != "" { return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg) } return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err) } return true, nil
}
默認狀況下Spark Operator會經過glog level=2等級對外輸出每次做業提交後轉換的提交命令。而默認狀況下,glog的level即爲2,所以經過檢查Spark Operator的Pod日誌能夠協助開發者快速排查問題。此外在sparkapplication上面也會經過event的方式進行狀態的記錄,上述狀態機之間的轉換都會經過event的方式體如今sparkapplication的對象上。掌握這兩種方式進行問題排查,能夠節省大量排錯時間。
[](https://www.atatech.org/artic...使用Spark Operator是在kubernetes上實踐spark的最佳方式,和傳統的spark-submit相比提供了更多的故障恢復與可靠性保障,而且提供了監控、日誌、UI等能力的集成與支持。在下一篇中,會爲你們介紹在kubernetes集羣中,提交spark做業時的如何使用外部存儲存儲的最佳實踐。