使用 Airflow 替代你的 crontab

Airflow 是什麼

Airflow 是 Airbnb 開發的用於工做流管理的開源項目,自帶 web UI 和調度。如今 Apache 下作孵化,地址是 github.com/apache/incu…python

airflow
airflow

Airflow 解決什麼問題

Airflow 主要解決的問題能夠參考 Airbnb 官方的博客: airflow-a-workflow-management-platform,簡單來講就是管理和調度各類離線定時 Job ,能夠替代 crontab。mysql

當 cron job 規模達到數百上千時,其對人的要求將會很是高的,若是你的團隊經歷過這樣的事情,應該能體會其中痛苦,因此使用相似 airflow 這樣的工具代替 cron 來作定時任務將會極大提升工做效率。git

開始使用 airflow 以前須要知道和準備的

Airflow 在 pip 上已經改名爲 apache-airflow,下載最新版請使用後者 pip install apache-airflowgithub

Airflow 1.8 版本依賴的是 MySQL 5.6 以上,5.7 如下報 1071, u'Specified key was too long; max key length is 767 bytes,若是你使用 MySQL 做爲你的 airflow backend 請升級你的 MySQL 到最新版。web

MySQL 5.6 升級到 5.7 在使用 airflow 時會報 1146, u"Table 'performance_schema.session_variables' doesn't exist",執行 mysql_upgrade -u root -p --force 解決。sql

Airflow 的 mysql driver 使用的是 mysqlclient mysql://root:@127.0.0.1/sqlalchemy_lab?charset=utf8,若是使用其餘 driver 將報 syntax error。shell

基礎概念

Airflow 中最基本的兩個概念是:DAG 和 task。DAG 的全稱是 Directed Acyclic Graph 是全部你想執行的任務的集合,在這個集合中你定義了他們的依賴關係,一個 DAG 是指一個 DAG object,一個 DAG object 能夠在 Python 腳本中配置完成。數據庫

好比一個簡單的的 DAG 包含三個 task:A、B、C,A 執行成功以後 B 才能執行,C 不依賴 A 和 B 便可執行。在這個簡單的 DAG 中 A B C 能夠是任何你想要執行的任務。apache

DAG 的定義使用 Python 完成的,其實就是一個 Python 文件,存放在 DAG 目錄,Airflow 會動態的從這個目錄構建 DAG object,每一個 DAG object 表明了一個 workflow,每一個 workflow 均可以包含任意個 task。bash

安裝和使用

Airflow 是基於 Python 構建的,能夠很容易用 pip 安裝使用,pip install apache-airflow,默認狀況下 airflow 會在 ~/airflow 目錄存放相關配置。

Airflow 提供了一些列命令來完成 airflow 的初始化工做來和它的正確使用。

# 在 airflow 目錄初始化數據庫和 airflow 配置
airflow initdb
# 啓動 airflow web
airflow webserver
# 開始調度
airflow scheduler複製代碼

更詳細的信息請參考文檔 airflow.incubator.apache.org/

第一個 DAG

DAG 的配置用 Python 完成像這樣:

""" Code that goes along with the Airflow tutorial located at: https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py """
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1) # t2 依賴 t1
t3.set_upstream(t1)複製代碼

DAG 腳本的目的只是定義 DAG 的配置,並不包含任何的數據處理,在這裏 operator 就是 task。

DAG 的實例化

一個 DAG 腳本是由 DAG object 的實例化和對應的 operator 組成的,除此以外咱們還能夠定義默認的參數提供給每一個任務。

DAG 對象實例化能夠根據咱們的須要提供對應的初始化參數,實例化 DAG 對象須要提供惟一的 dag_id:

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))複製代碼

Task 的實例化

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)複製代碼

task 對象的定義的就是 operator 的實例化,operator 有 task_id,用來區分任務,能夠按照須要定製 bash_command,也能夠傳遞參數等。

Task 的依賴

Task 之間是能相互創建依賴的,形如:

t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')複製代碼

Airflow 會自動檢測環形依賴以防止 task 沒法工做的狀況出現,更復雜的狀況請參考文檔。

執行和測試

和 airflow.cfg 同級目錄下創建 dag 目錄,用來存放第一個 DAG 腳本,而後執行 python tutorial.py ,若是沒有報錯說明 tutorial 創建成功了。

Airflow 的命令行

Airflow 提供了一些列的命令行用來查看 DAG 和 task

# print the list of active DAGs
airflow list_dags
 # prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
 # prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree複製代碼

測試任務的執行

執行任務很簡單,指定 DAG 並去指定 task 和執行的日期

# command layout: command subcommand dag_id task_id date
 # testing print_date
airflow test tutorial print_date 2015-06-01
 # testing sleep
airflow test tutorial sleep 2015-06-01複製代碼

test 命令會執行任務而且輸出到控制檯,不會把任務的執行狀態進行持久化

執行任務和並記錄狀態

執行任務在 Airflow 中稱之爲 backfill,以 backfill 執行會真正開始追蹤任務的執行狀態和依賴,而且會記錄日誌

# optional, start a web server in debug mode in the background
# airflow webserver --debug &
 # start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07複製代碼

更多關於 DAG 和 operator

DAG 的 scope

Airflow 會默認加載任意它能導入到餓 DAG object,這就意味着只要是全局的 DAG object 均可以被導入,可是有時候爲了讓 DAG 不被導入,好比 SubDagOperator 就可使用 local 的做用域。

dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
    dag_2 = DAG('but_this_dag_will_not')

my_function()複製代碼

DAG 能夠指定默認的參數

DAG 的默認參數會應用到全部的 operator 中。

default_args=dict(
    start_date=datetime(2016, 1, 1),
    owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow複製代碼

擴展性極強的 operator

Airflow operator 很容易擴展,這也是 airflow 幾乎支持任何形式 task 重要緣由。雖然 Airflow 支持不一樣的 task 能夠傳輸數據,可是若是你的兩個 task 之間確實須要共享數據,最好的辦法是把他們寫在一塊兒。

參考資料

更多原創文章,關注 wecatch 公衆號

相關文章
相關標籤/搜索