Airflow在Kubernetes上的操做器

Airflow在Kubernetes (第一部分): 一種不一樣類型的Operator

做者: Daniel Imberman (Bloomberg LP)html

介紹

做爲Bloomberg’s 持續提交來開發Kubernetes ecosystem,咱們高興地宣佈 Kubernetes Airflow Operator的誕生。做爲一種 Apache Airflow運行機制,一個流行的工做流程整合框架,能夠原生地使用Kubernetes API來任意調用Kubernetes Pods。python

什麼是Airflow?

Apache Airflow 是 DevOps 的 「Configuration As Code.」方法論的實現之一。Airflow容許用戶多個步驟的流水線,使用簡單的Python object DAG (Directed Acyclic Graph)來實現。你能夠定義dependencies,經過程序來構建複雜的workflows,而後監控調度執行的任務,具備易於查看的UI。git

列表方式呈現的工做流程:github

Airflow DAGs 

圖形方式呈現的工做流程:web

Airflow UI

爲何將 Airflow 運行在 Kubernetes之上?

由於從一開始,Airflow的最大優點就是其靈活性。Airflow 提供了很是普遍的服務的整合,包括Spark 和 HBase, 以及其它的不一樣的雲服務提供者。Airflow 也經過器插件框架提供了很是好的擴展能力。可是,其限制在於Airflow users 被限制在其worker執行的框架和客戶端。一個組織可能有不一樣的Airflow workflows,從數據科學分析流程到應用開發。這些應用場景給依賴管理帶來問題,由於團隊可能須要在不一樣的流程中使用不一樣的支持庫。apache

爲了聚焦於該問題,咱們優化了Kubernetes容許用戶執行任意的 Kubernetes pods 和 configurations。Airflow 的用戶如今能夠得到全面能力,來使用運行環境、資源以及安全設置,從而將 Airflow 轉變爲一個能夠運行任何工做負載的workflow orchestrator。ubuntu

Kubernetes 的 Operator

在開始下一步以前,咱們限制在Airflow中的 Operator 是一個任務的定義。 當用戶建立一個 DAG,他將使用一個 operator,如 「SparkSubmitOperator」 或者 「PythonOperator」 來分別提交/監控一個 Spark job 或者 Python函數。 Airflow 帶有框架內置的operators,如 Apache Spark, BigQuery, Hive, 和 EMR。而且提供了Plugins entrypoint,容許DevOps 工程師開發本身的鏈接器。安全

Airflow 用戶一直在尋找使開發和 ETL 流水線管理更簡單的方法。任何解耦流水線步驟,增長可監控性,都能減小未來的中斷和救火問題。下面是Airflow Kubernetes Operator提供的好處:架構

  • 對開發過程增長的靈活性:Airflow’s plugin API 提供顯著的特性來幫助須要在DAGs上測試新功能的工程師。當開發者想要建立一個新的 operator,他們不得不開發一個完整的新 plugin.。如今,任何任務均可以運行在 Docker 容器之中,使用一致的operator來訪問,沒有額外的 Airflow 代碼須要維護。框架

  • 配置和依賴關係的靈活性:對於運行在靜態的Airflow workers的operators,依賴管理變得至關的困難。若是開發者想要運行一個要求依賴 SciPy 的任務,而另一個要求 NumPy, 開發者就不得不維護兩者的依賴庫,使其可以適應全部的Airflow workers,或者將其分離到外部的機器。自定義的Docker鏡像容許用戶 確保運行環境、配置和依賴是徹底等價的。

  • 使用 kubernetes secrets添加安全性:處理敏感數據是任何DevOps 工程都須要面對的職責所在。Airflow 用戶但願隔離API keys, database passwords, 以及 login credentials在一個嚴格的「僅需知道」 的環境中。經過Kubernetes operator,用戶可使用 Kubernetes Vault 技術來存儲全部敏感數據。這意味着Airflow workers永遠不會存取這些信息,只須要簡單地請求 pods,僅提供須要的 secrets 數據。

架構

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python Client 來提交請求,而後由 APIServer (1)處理。而後Kubernetes使用你定義的參數來啓動你的 pod(2). 容器鏡像將會在須要時被載入,並賦予須要的環境變量, secrets 和 dependencies, enacting a single command。一旦 job啓動,operator 只須要監控logs (3)。用戶能夠收集本地調度器的logs或者分佈式的 logging service(已經部署在 Kubernetes 集羣之中)。

使用 Kubernetes Operator

基本例程

下面的 DAG 是一個簡單的例子,用來演示 Kubernetes Operator 如何工做。該 DAG 建立了兩個 pods 到 Kubernetes:一個帶Python的Linux distro和一個Ubuntu distro(不帶python)。 該 Python pod 將正常運行 Python 請求, 沒有Python的將會報告一個失敗信息。若是Operator 正常工做,該 passing-task pod 將完成而 failing-task pod 將返回 failure 到Airflow webserver。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

顯示的任務圖以下:

Basic DAG Run

如何結合到 workflow?

鈣離子只用了兩個 images,Docker的神奇之處在於一樣的DAG能夠工做在任何image/command,只要你願意。下面是一些 CI/CD 流程,在Airflow DAG中輸出產品級的代碼。

1: PR in github

使用 Travis 或者 Jenkins 來運行 unit 和 integration 測試,而後合併到 master 分之,觸發自動化的 CI build過程。

2: CI/CD via Jenkins -> Docker Image

Generate your Docker images and bump release version within your Jenkins build.

3: Airflow launches task

最終,更新DAGs 來反映新版本的變化!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

啓動 test deployment

由於Kubernetes Operator尚未發佈,咱們尚未推出一個正式的 helm chart 和 operator (都在進行之中)。下面包含了一些基本的步驟,來實驗這些新的特徵。

Step 1: Set your kubeconfig to point to a kubernetes cluster

Step 2: Clone the Airflow Repo:

運行 git clone https://github.com/apache/incubator-airflow.git 來複制Airflow代碼倉庫。

Step 3: Run

爲了運行基本的開發過程, 咱們採用一些腳原本驅動當前的 Kubernetes Executor (將在下一篇文章中介紹). 運行下面三個命令:

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在進一步開始以前,先討論一下這些命令的做用:

sed -ie 「s/KubernetesExecutor/LocalExecutor/g」 scripts/ci/kubernetes/kube/configmaps.yaml

該 Kubernetes Executor是另一個Airflow 的特徵,容許等價pods的動態分配。我切換到 LocalExecutor的緣由是爲了更簡單地介紹這些特徵。你能夠跳過這些,若是願意嘗試一下 Kubernetes Executor, 但這將在將來的文章中講述。

./scripts/ci/kubernetes/Docker/build.sh

該 script 將包裝 Airflow master 源碼,構建一個Docker container爲Airflow distribution。

./scripts/ci/kubernetes/kube/deploy.sh

最後,咱們將在集羣中建立一個完整的Airflow分發。包括 Airflow configs, 一個postgres backend,  webserver + scheduler, 以及全部須要的服務。須要注意的一個事是 role binding 是 cluster-admin, 所以若是沒有集羣的權限, 你能夠修改 scripts/ci/kubernetes/kube/airflow.yaml。

Step 4: Log into your webserver

如今Airflow 實例已經運行,咱們看一下 UI。UI 服務於Airflow pod端口 8080 ,簡單地運行:

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

如今Airflow UI 將服務於 http://localhost:8080。爲了登陸,簡單地進入 airflow/airflow就能夠訪問 Airflow web UI。

Step 5: Upload a test document

爲了修改/添加本身的 DAGs, 你可使用 kubectl cp 上載本地文件到Airflow scheduler的DAG folder。Airflow將讀取新的DAG 而後自動上載本身的系統。下面的命令自動上載本地溫江到正確的目錄:

kubectl cp <local file> <namespace>/<pod>:/root/airflow/dags -c scheduler

Step 6: Enjoy!

 

何時能夠用呢?

這個特徵功能還在早起開發階段,咱們但願下幾個月內就能發佈一些版本,能夠更普遍地使用。

參與進來

該特徵是促進Apache Airflow 集成進的 Kubernetes的諸多努力的一個開始。該 Kubernetes Operator 已經合併進 1.10 release branch of Airflow (executor在體驗模式), 完整的 k8s 原生調度器稱爲 Kubernetes Executor。

若是感興趣加入,建議先了解一下下面的信息:

  • 加入airflow-dev的郵件列表 dev@airflow.apache.org。
  • 提交問題到 Apache Airflow JIRA。
  • 加入咱們的 SIG-BigData 會議,在 Wednesdays at 10am PST。
  • 經過slack聯繫:#sig-big-data 在 kubernetes.slack.com。

特別感謝Apache Airflow 和 Kubernetes 社區,尤爲是 Grant Nicholas, Ben Goldberg, Anirudh Ramanathan, Fokko Dreisprong, 和 Bolke de Bruin。

相關文章
相關標籤/搜索