三分鐘上手apache頂級任務編排系統airflow

airflow已經成爲了任務編排系統的事實標準,使用和terraform同樣的代碼及配置的任務開發方式。html

airflow使用python做爲開發語言,很是簡單易學、容易上手。python

完整案例代碼已上傳github:github.com/neatlife/my…git

獲取airflow實例

可使用docker一鍵啓動github

git clone https://github.com/puckel/docker-airflow
cd docker-airflow
docker-compose -f docker-compose-LocalExecutor.yml up -d
複製代碼

訪問ip:8080查看效果 web

能夠看到airflow已經可用了docker

編輯dag文件

這個dag文件就是用來定義任務和任務之間的前後、依賴關係的。shell

參數設置

參考:airflow.apache.org/tutorial.ht…apache

其中幾個比較重要的參數以下:vim

參數名 做用
start_date 任務開始時間
end_date 任務結束時間,不填表明永遠
retries 任務執行失敗重試次數
retry_delay 重試的間隔時間

代碼實現以下 vim mydag.pyapi

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    "start_date": datetime(2019, 7, 10),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))
複製代碼

上面的"schedule_interval=timedelta(1)"表明一天觸發一次這個任務,也可使用crontab的語法,參考:airflow.apache.org/scheduler.h…

這個新的dag文件須要使用python執行這個腳本導入,可使用下面的命令進行導入

docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash
python dags/mydag.py
複製代碼

刷新這個web界面,就能夠看到這個新加的mydag任務了

這個airflow的任務加載比較慢,若是顯示狀態和上面不一樣須要多等待一下子

定義案例任務

這裏定義三個任務,分別echo出1, 2, 3,關係以下

代碼實現以下

定義任務執行內容

t1 = BashOperator(task_id="echo1", bash_command="echo 1", dag=dag)
t2 = BashOperator(task_id="echo2", bash_command="echo 2", dag=dag)
t3 = BashOperator(task_id="echo3", bash_command="echo 3", dag=dag)
複製代碼

上面使任務使用airflow執行的bash,airflow還能夠執行不少輸入,完整列表參考:airflow.apache.org/_api/airflo…

定義任務間關係

t2.set_upstream(t1)                                                                                                                                                                                 
t3.set_upstream(t1)
複製代碼

加載修改過的airflow任務編排文件

首次修改這個dag文件,airflow回自動加載,點擊"Refresh"按鈕能夠手動加載這個配置文件

操做效果以下

能夠看到這個echo1, echo2, echo3的依賴關係定義成功了

啓動任務

先啓用這個任務,而後點擊執行按鈕,操做效果以下

也能夠只啓用這個任務,而後等這個airflow按照設定的時間,自動觸發,手動觸發能夠快速驗證

能夠看到任務已經開始執行了

查看任務執行狀態

這個任務的執行狀態用不一樣的顏色進行表示

能夠看到全部的任務都已經成功執行完成了

查看任務執行輸出

單擊 "Browser" > "Task Instance"能夠查看全部被執行的任務列表

單擊具體的task id就能夠進入到task執行狀況詳情頁面了,單擊詳情頁的"Log"就能夠看到任務的日誌輸出了,操做效果以下:

查看jinjia模板渲染結果

若是使用了這個airflow的模板功能,能夠在任務執行詳情頁面,查看這個被執行任務的模板渲染結果,操做效果以下:

刪除任務

須要先刪除dag定義文件,好比 rm dags/mydag.py,而後在web界面上刪除

一些注意的點

這個airflow依賴很是的多,由於沒到1.0版本,存在一些bug,因此建議使用docker啓動,主要的dag任務編排功能使用是ok的,其它功能能夠先了解下

airflow可使用模板功能生成腳本,參考:airflow.apache.org/tutorial.ht…

jinjia模板語法參考:jinja.pocoo.org/docs/dev/

airflow默認以utc時區運行,若是須要計算正確的時間,須要把時間進行時區轉換,核心代碼以下

#將本地時間轉換爲utc時間,再設置爲start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
複製代碼

參考資料

  1. www.infoq.cn/article/WdJ…
  2. www.cnblogs.com/cord/p/9450…
  3. airflow.apache.org
  4. yangcongchufang.com/airflow/air…
相關文章
相關標籤/搜索