做者: Daniel Imberman (Bloomberg LP)html
做爲Bloomberg’s 持續提交來開發Kubernetes ecosystem,咱們高興地宣佈 Kubernetes Airflow Operator的誕生。做爲一種 Apache Airflow運行機制,一個流行的工做流程整合框架,能夠原生地使用Kubernetes API來任意調用Kubernetes Pods。python
Apache Airflow 是 DevOps 的 「Configuration As Code.」方法論的實現之一。Airflow容許用戶多個步驟的流水線,使用簡單的Python object DAG (Directed Acyclic Graph)來實現。你能夠定義dependencies,經過程序來構建複雜的workflows,而後監控調度執行的任務,具備易於查看的UI。git
列表方式呈現的工做流程:github
圖形方式呈現的工做流程:web
由於從一開始,Airflow的最大優點就是其靈活性。Airflow 提供了很是普遍的服務的整合,包括Spark 和 HBase, 以及其它的不一樣的雲服務提供者。Airflow 也經過器插件框架提供了很是好的擴展能力。可是,其限制在於Airflow users 被限制在其worker執行的框架和客戶端。一個組織可能有不一樣的Airflow workflows,從數據科學分析流程到應用開發。這些應用場景給依賴管理帶來問題,由於團隊可能須要在不一樣的流程中使用不一樣的支持庫。apache
爲了聚焦於該問題,咱們優化了Kubernetes容許用戶執行任意的 Kubernetes pods 和 configurations。Airflow 的用戶如今能夠得到全面能力,來使用運行環境、資源以及安全設置,從而將 Airflow 轉變爲一個能夠運行任何工做負載的workflow orchestrator。ubuntu
在開始下一步以前,咱們限制在Airflow中的 Operator 是一個任務的定義。 當用戶建立一個 DAG,他將使用一個 operator,如 「SparkSubmitOperator」 或者 「PythonOperator」 來分別提交/監控一個 Spark job 或者 Python函數。 Airflow 帶有框架內置的operators,如 Apache Spark, BigQuery, Hive, 和 EMR。而且提供了Plugins entrypoint,容許DevOps 工程師開發本身的鏈接器。安全
Airflow 用戶一直在尋找使開發和 ETL 流水線管理更簡單的方法。任何解耦流水線步驟,增長可監控性,都能減小未來的中斷和救火問題。下面是Airflow Kubernetes Operator提供的好處:架構
對開發過程增長的靈活性:Airflow’s plugin API 提供顯著的特性來幫助須要在DAGs上測試新功能的工程師。當開發者想要建立一個新的 operator,他們不得不開發一個完整的新 plugin.。如今,任何任務均可以運行在 Docker 容器之中,使用一致的operator來訪問,沒有額外的 Airflow 代碼須要維護。框架
配置和依賴關係的靈活性:對於運行在靜態的Airflow workers的operators,依賴管理變得至關的困難。若是開發者想要運行一個要求依賴 SciPy 的任務,而另一個要求 NumPy, 開發者就不得不維護兩者的依賴庫,使其可以適應全部的Airflow workers,或者將其分離到外部的機器。自定義的Docker鏡像容許用戶 確保運行環境、配置和依賴是徹底等價的。
使用 kubernetes secrets添加安全性:處理敏感數據是任何DevOps 工程都須要面對的職責所在。Airflow 用戶但願隔離API keys, database passwords, 以及 login credentials在一個嚴格的「僅需知道」 的環境中。經過Kubernetes operator,用戶可使用 Kubernetes Vault 技術來存儲全部敏感數據。這意味着Airflow workers永遠不會存取這些信息,只須要簡單地請求 pods,僅提供須要的 secrets 數據。
Kubernetes Operator 使用 Kubernetes Python Client 來提交請求,而後由 APIServer (1)處理。而後Kubernetes使用你定義的參數來啓動你的 pod(2). 容器鏡像將會在須要時被載入,並賦予須要的環境變量, secrets 和 dependencies, enacting a single command。一旦 job啓動,operator 只須要監控logs (3)。用戶能夠收集本地調度器的logs或者分佈式的 logging service(已經部署在 Kubernetes 集羣之中)。
下面的 DAG 是一個簡單的例子,用來演示 Kubernetes Operator 如何工做。該 DAG 建立了兩個 pods 到 Kubernetes:一個帶Python的Linux distro和一個Ubuntu distro(不帶python)。 該 Python pod 將正常運行 Python 請求, 沒有Python的將會報告一個失敗信息。若是Operator 正常工做,該 passing-task
pod 將完成而 failing-task
pod 將返回 failure 到Airflow webserver。
from airflow import DAG from datetime import datetime, timedelta from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.utcnow(), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10)) start = DummyOperator(task_id='run_this_first', dag=dag) passing = KubernetesPodOperator(namespace='default', image="Python:3.6", cmds=["Python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="passing-test", task_id="passing-task", get_logs=True, dag=dag ) failing = KubernetesPodOperator(namespace='default', image="ubuntu:1604", cmds=["Python","-c"], arguments=["print('hello world')"], labels={"foo": "bar"}, name="fail", task_id="failing-task", get_logs=True, dag=dag ) passing.set_upstream(start) failing.set_upstream(start)
顯示的任務圖以下:
鈣離子只用了兩個 images,Docker的神奇之處在於一樣的DAG能夠工做在任何image/command,只要你願意。下面是一些 CI/CD 流程,在Airflow DAG中輸出產品級的代碼。
使用 Travis 或者 Jenkins 來運行 unit 和 integration 測試,而後合併到 master 分之,觸發自動化的 CI build過程。
Generate your Docker images and bump release version within your Jenkins build.
最終,更新DAGs 來反映新版本的變化!
production_task = KubernetesPodOperator(namespace='default', # image="my-production-job:release-1.0.1", <-- old release image="my-production-job:release-1.0.2", cmds=["Python","-c"], arguments=["print('hello world')"], name="fail", task_id="failing-task", get_logs=True, dag=dag )
由於Kubernetes Operator尚未發佈,咱們尚未推出一個正式的 helm chart 和 operator (都在進行之中)。下面包含了一些基本的步驟,來實驗這些新的特徵。
運行 git clone https://github.com/apache/incubator-airflow.git
來複制Airflow代碼倉庫。
爲了運行基本的開發過程, 咱們採用一些腳原本驅動當前的 Kubernetes Executor (將在下一篇文章中介紹). 運行下面三個命令:
sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml ./scripts/ci/kubernetes/Docker/build.sh ./scripts/ci/kubernetes/kube/deploy.sh
在進一步開始以前,先討論一下這些命令的做用:
該 Kubernetes Executor是另一個Airflow 的特徵,容許等價pods的動態分配。我切換到 LocalExecutor的緣由是爲了更簡單地介紹這些特徵。你能夠跳過這些,若是願意嘗試一下 Kubernetes Executor, 但這將在將來的文章中講述。
該 script 將包裝 Airflow master 源碼,構建一個Docker container爲Airflow distribution。
最後,咱們將在集羣中建立一個完整的Airflow分發。包括 Airflow configs, 一個postgres backend, webserver + scheduler, 以及全部須要的服務。須要注意的一個事是 role binding 是 cluster-admin, 所以若是沒有集羣的權限, 你能夠修改 scripts/ci/kubernetes/kube/airflow.yaml。
如今Airflow 實例已經運行,咱們看一下 UI。UI 服務於Airflow pod端口 8080 ,簡單地運行:
WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1) kubectl port-forward $WEB 8080:8080
如今Airflow UI 將服務於 http://localhost:8080。爲了登陸,簡單地進入 airflow
/airflow
就能夠訪問 Airflow web UI。
爲了修改/添加本身的 DAGs, 你可使用 kubectl cp
上載本地文件到Airflow scheduler的DAG folder。Airflow將讀取新的DAG 而後自動上載本身的系統。下面的命令自動上載本地溫江到正確的目錄:
kubectl cp <local file> <namespace>/<pod>:/root/airflow/dags -c scheduler
這個特徵功能還在早起開發階段,咱們但願下幾個月內就能發佈一些版本,能夠更普遍地使用。
該特徵是促進Apache Airflow 集成進的 Kubernetes的諸多努力的一個開始。該 Kubernetes Operator 已經合併進 1.10 release branch of Airflow (executor在體驗模式), 完整的 k8s 原生調度器稱爲 Kubernetes Executor。
若是感興趣加入,建議先了解一下下面的信息:
特別感謝Apache Airflow 和 Kubernetes 社區,尤爲是 Grant Nicholas, Ben Goldberg, Anirudh Ramanathan, Fokko Dreisprong, 和 Bolke de Bruin。