airflow已經成爲了任務編排系統的事實標準,使用和terraform同樣的代碼及配置的任務開發方式。html
airflow使用python做爲開發語言,很是簡單易學、容易上手。python
完整案例代碼已上傳github:github.com/neatlife/my…git
可使用docker一鍵啓動github
git clone https://github.com/puckel/docker-airflow
cd docker-airflow
docker-compose -f docker-compose-LocalExecutor.yml up -d
複製代碼
訪問ip:8080查看效果 web
能夠看到airflow已經可用了docker
這個dag文件就是用來定義任務和任務之間的前後、依賴關係的。shell
參考:airflow.apache.org/tutorial.ht…apache
其中幾個比較重要的參數以下:vim
參數名 | 做用 |
---|---|
start_date | 任務開始時間 |
end_date | 任務結束時間,不填表明永遠 |
retries | 任務執行失敗重試次數 |
retry_delay | 重試的間隔時間 |
代碼實現以下 vim mydag.py
api
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"start_date": datetime(2019, 7, 10),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))
複製代碼
上面的"schedule_interval=timedelta(1)"表明一天觸發一次這個任務,也可使用crontab的語法,參考:airflow.apache.org/scheduler.h…
這個新的dag文件須要使用python執行這個腳本導入,可使用下面的命令進行導入
docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash
python dags/mydag.py
複製代碼
刷新這個web界面,就能夠看到這個新加的mydag任務了
這裏定義三個任務,分別echo出1, 2, 3,關係以下
代碼實現以下
t1 = BashOperator(task_id="echo1", bash_command="echo 1", dag=dag)
t2 = BashOperator(task_id="echo2", bash_command="echo 2", dag=dag)
t3 = BashOperator(task_id="echo3", bash_command="echo 3", dag=dag)
複製代碼
上面使任務使用airflow執行的bash,airflow還能夠執行不少輸入,完整列表參考:airflow.apache.org/_api/airflo…
t2.set_upstream(t1)
t3.set_upstream(t1)
複製代碼
首次修改這個dag文件,airflow回自動加載,點擊"Refresh"按鈕能夠手動加載這個配置文件
操做效果以下
能夠看到這個echo1, echo2, echo3的依賴關係定義成功了
先啓用這個任務,而後點擊執行按鈕,操做效果以下
也能夠只啓用這個任務,而後等這個airflow按照設定的時間,自動觸發,手動觸發能夠快速驗證
這個任務的執行狀態用不一樣的顏色進行表示
單擊 "Browser" > "Task Instance"能夠查看全部被執行的任務列表
單擊具體的task id就能夠進入到task執行狀況詳情頁面了,單擊詳情頁的"Log"就能夠看到任務的日誌輸出了,操做效果以下:
若是使用了這個airflow的模板功能,能夠在任務執行詳情頁面,查看這個被執行任務的模板渲染結果,操做效果以下:
須要先刪除dag定義文件,好比 rm dags/mydag.py,而後在web界面上刪除
這個airflow依賴很是的多,由於沒到1.0版本,存在一些bug,因此建議使用docker啓動,主要的dag任務編排功能使用是ok的,其它功能能夠先了解下
airflow可使用模板功能生成腳本,參考:airflow.apache.org/tutorial.ht…
jinjia模板語法參考:jinja.pocoo.org/docs/dev/
airflow默認以utc時區運行,若是須要計算正確的時間,須要把時間進行時區轉換,核心代碼以下
#將本地時間轉換爲utc時間,再設置爲start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
複製代碼