教程:https://airflow.apache.org/docs/stable/index.htmlhtml
官網: http://airflow.incubator.apache.org/index.htmlpython
airflow源碼:https://github.com/apache/incubator-airflowmysql
參考資料:http://www.open-open.com/lib/view/open1452002876105.htmlgit
簡介:http://www.cnblogs.com/xianzhedeyu/p/8047828.htmlgithub
重要參數介紹:http://www.cnblogs.com/skyrim/p/7456166.htmlweb
http://blog.csdn.net/permike/article/details/52184621redis
FAQ :http://blog.csdn.net/yingkongshi99/article/details/52658660sql
容器:docker pull puckel/docker-airflowdocker
啓動dag調度器, 注意啓動調度器, 並不意味着dag會被立刻觸發, dag觸發須要符合它本身的schedule規則數據庫
若是缺省了END_DATE參數, END_DATE等同於START_DATE.
使用 DummyOperator 來匯聚分支
使用 ShortCircuitOperator/BranchPythonOperator 作分支
使用 SubDagOperator 嵌入一個子dag
使用 TriggerDagRunOperator 直接trigger 另外一個dag
在建立MyBashOperator的實例時候, 爲on_failure_callback和on_success_callback參數設置兩個回調函數, 咱們在回調函數中, 將success或failed狀態記錄到本身的表中.
DAG的schedule_interval參數設置成None, 代表這個DAG始終是由外部觸發。
若是將default_args
字典傳遞給DAG,DAG將會將字典應用於其內部的任何Operator上。這很容易的將經常使用參數應用於多個Operator,而無需屢次鍵入。
default_args=dict( start_date=datetime(2016, 1, 1), owner='Airflow') dag = DAG('my_dag', default_args=default_args) op = DummyOperator(task_id='dummy', dag=dag) print(op.owner) # Airflow
initdb,初始化元數據DB,元數據包括了DAG自己的信息、運行信息等;
resetdb,清空元數據DB;
list_dags,列出全部DAG;
list_tasks,列出某DAG的全部task;
test,測試某task的運行情況;
backfill,測試某DAG在設定的日期區間的運行情況;
webserver,開啓webserver服務;
scheduler,用於監控與觸發DAG。
$ cd ${AIRFLOW_HOME}/dags $ python test_import.py # 保證代碼無語法錯誤 $ airflow list_dags # 查看dag是否成功加載 airflow list_tasks test_import_dag –tree # 查看dag的樹形結構是否正確 $ airflow test test_import_dag \ test_import_task 2016-3-7 # 測試具體的dag的某個task在某個時間的運行是否正常 $ airflow backfill test_import_dag -s 2016-3-4 \ -e 2016-3-7 # 對dag進行某段時間內的完整測試
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
請注意,airflow test命令在本地運行任務實例,將其日誌輸出到stdout(屏幕上),不會影響依賴關係,而且不會將狀態(運行,成功,失敗,...)發送到數據庫。 它只是容許簡單的測試單個任務實例。
若是使用depends_on_past = True,則單個任務實例將取決於上一個任務實例的成功與否,若是指定自己的start_date,則忽略此依賴關係
# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
使用Xcom在task之間傳參
能夠直接使用jinja模板語言,在{{}}中調用ti的xcom_push和xcom_pull方法,下面的例子爲t1使用xcom_push推出了一個kv,t2經過taskid和key來接收
dag = DAG( dag_id='xcomtest', default_args=default_args, schedule_interval='*/2 * ** *') t1 = BashOperator( task_id='xcom', bash_command='''''{{ ti.xcom_push(key='aaa', value='bbb') }}''', dag=dag) t2 = BashOperator( task_id='xcom2', bash_command='''''echo"{{ ti.xcom_pull(key='aaa', task_ids='xcom') }}" ''', dag=dag) t2.set_upstream(t1)
airflow提供了不少Macros Variables,能夠直接使用jinja模板語言調用宏變量
execution_date並非task的真正執行時間,而是上一週期task的執行時間。
咱們在airflow上看到一個任務是6am執行的,並且interval=4hours,那麼execution_date的值是2am,而不是6am
暫時沒法hold或pause某個task,只支持以dag爲單位pause
當使用BashOperator時,command須要調用腳本時,腳本後須要有個空格,不然報錯,暫時不清楚緣由,但加空格後能夠正常執行,以下例,run.sh後需加空格
Airflow爲Operator提供許多常見任務,包括:
BashOperator - 執行bash命令
PythonOperator - 調用任意的Python函數
EmailOperator - 發送郵件
HTTPOperator - 發送 HTTP 請求
SqlOperator - 執行 SQL 命令
Sensor - 等待必定時間,文件,數據庫行,S3鍵等...
除了這些基本的構建塊以外,還有更多的特定Operator:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator
使用supervisord進行deamon
airflow自己沒有deamon模式,因此直接用supervisord就ok了,咱們只要寫4行代碼
[program:airflow_web] command=/home/kimi/env/athena/bin/airflow webserver -p 8080 [program:airflow_scheduler] command=/home/kimi/env/athena/bin/airflow scheduler 做者:yin1941 連接:https://www.jianshu.com/p/59d69981658a 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。
airflow 執行的命令或這種消息是支持 jinja2 模板語言;{{ ds }}是一種宏,表示當前的日期,
形如2016-12-16,支持的宏在
https://airflow.incubator.apache.org/code.html#macros
test: 用於測試特定的某個task,不須要依賴知足
run: 用於執行特定的某個task,須要依賴知足
backfill: 執行某個DAG,會自動解析依賴關係,按依賴順序執行
unpause: 將一個DAG啓動爲例行任務,默認是關的,因此編寫完DAG文件後必定要執行這和要命令,相反命令爲pause
scheduler: 這是整個 airflow 的調度程序,通常是在後臺啓動
clear: 清除一些任務的狀態,這樣會讓scheduler來執行重跑
============================
前面的腳本里用到了{{ ds }}
變量,每一個DAG在執行時都會傳入一個具體的時間(datetime對象), 這個ds
就會在 render 命令時被替換成對應的時間。這裏要特別強調一下, 對於週期任務,airflow傳入的時間是上一個週期的時間(劃重點),好比你的任務是天天執行, 那麼今天傳入的是昨天的日期,若是是周任務,那傳入的是上一週今天的值
==========================
executor
SequentialExecutor:表示單進程順序執行,一般只用於測試
LocalExecutor:表示多進程本地執行,它用python的多進程庫從而達到多進程跑任務的效果。
CeleryExecutor:表示使用celery做爲執行器,只要配置了celery,就能夠分佈式地多機跑任務,通常用於生產環境。
sql_alchemy_conn :這個配置讓你指定 airflow 的元信息用何種方式存儲,默認用sqlite,若是要部署到生產環境,推薦使用 mysql。
smtp :若是你須要郵件通知或用到 EmailOperator 的話,須要配置發信的 smtp 服務器
======================
觸發條件有兩個維度, 以T1&T2->T3 這樣的dag爲例:
一個維度是: 要根據dag上次運行T3的狀態肯定本次T3是否被調用, 由DAG的default_args.depends_on_past參數控制, 爲True時, 只有上次T3運行成功, 此次T3纔會被觸發
另外一個維度是: 要根據前置T1和T2的狀態肯定本次T3是否被調用, 由T3.trigger_rule參數控制, 有下面6種情形, 缺省是all_success.
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy: dependencies are just for show, trigger at will
========================
airflow有兩個基於PythonOperator的Operator來支持dag分支功能.
ShortCircuitOperator, 用來實現流程的判斷. Task須要基於ShortCircuitOperator, 若是本Task返回爲False的話, 其下游Task將被skip; 若是爲True的話, 其下游Task將會被正常執行. 尤爲適合用在其下游都是單線節點的場景.
BranchPythonOperator, 用來實現Case分支. Task須要基於BranchPythonOperator, airflow會根據本task的返回值(返回值是某個下游task的id),來肯定哪一個下游Task將被執行, 其餘下游Task將被skip.
======================
connection 表:
咱們的Task每每須要經過jdbc/ftp/http/webhdfs方式訪問其餘資源, 通常地訪問資源時候都須要一些簽證, airflow容許咱們將這些connection以及鑑證存放在connection表中. 能夠如今WebUI的Admin->Connections管理這些鏈接, 在代碼中使用這些鏈接.
MySQL 應該使用 mysqlclient 包, 我簡單試了mysql-connector-python 有報錯
LocalExecutor 和 CeleryExecutor 均可用於生產環境, CeleryExecutor 將使用 Celery 做爲Task執行的引擎, 擴展性很好, 固然配置也更復雜, 須要先setup Celery的backend(包括RabbitMQ, Redis)等. 其實真正要求擴展性的場景並很少, 因此LocalExecutor 是一個很不錯的選擇了.
1. 配置OS環境變量 AIRFLOW_HOME, AIRFLOW_HOME缺省爲 ~/airflow
2. 運行下面命令初始化一個Sqlite backend DB, 並生成airflow.cfg文件
your_python ${AIRFLOW_HOME}\bin\airflow initdb
3. 若是須要修改backend DB類型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn後, 而後從新運行 airflow initdb .
官方推薦使用MySQL/PostgreSQL作DB Server.
有下面3個參數用於控制Task的併發度,
parallelism, 一個Executor同時運行task實例的個數
dag_concurrency, 一個dag中某個task同時運行的實例個數
max_active_runs_per_dag: 一個dag同時啓動的實例個數
start_date 有點特別,若是你設置了這個參數,那麼airflow就會從start_date開始以 schedule_interval 的規則開始執行,例如設置成3天前每小時執行一次,那麼在調度正常啓動時,就會當即調度 24*3 次,但注意,腳本執行環境的時間仍是當前的系統時間,而不會說真是把系統時間模擬成3天前,因此感受這個功能應用場景比較好限。
===========================
dags_folder目錄支持子目錄和軟鏈接,所以不一樣的dag能夠分門別類的存儲起來
schedule_interval=timedelta(minutes=1) 或者 crontab格式
crontab格式的介紹:https://www.cnblogs.com/chenshishuo/p/5152068.html http://blog.csdn.net/liguohanhaha/article/details/52261192
sql_alchemy_conn = mysql://ct:152108@localhost/airflow
對應字段解釋以下: dialect+driver://username:password@host:port/database
當遇到不符合常理的狀況時考慮清空 airflow backend的數據庫, 可以使用airflow resetdb清空。
刪除dag文件後,webserver中可能還會存在相應信息,這時須要重啓webserver並刷新網頁。
關閉webserver: ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
界面的時候看起來比較蛋疼, utc-0的時間,
修改.../python2.7/site-packages/airflow/www/templates/admin/master.html以下(註釋掉UCTSeconds,新增一行UTCSeconds), 這樣時間就是本地時間了。
驗證腳本是否有問題:python xxx.py
看是否能查詢出新增的dags嗎:airflow list_dags
啓動schedule :airflow scheduler
這裏有的 start_date 有點特別,若是你設置了這個參數,那麼airflow就會從start_date開始以 schedule_interval 的規則開始執行,例如設置成3天前每小時執行一次,那麼在調度正常啓動時,就會當即調度 24*3 次,但注意,腳本執行環境的時間仍是當前的系統時間,而不會說真是把系統時間模擬成3天前,因此感受這個功能應用場景比較好限
在centos6.8上裝特別順利(運行時貌似一切都正常,就是任務一直處於running狀態---debug了一番源代碼, 發現內存要必需夠大,發現必需用非root身份運行airflow worker, 務必保證核數夠用,不然須要調低dag_concurrency, max_active_runs_per_dag,max_threads,parallelism, 不然worker出現莫名其妙的問題)
airflow跑着跑着就掛了,一看內存還夠用(可能須要不要錢的加內存),若是你處處找不到想要的錯誤日誌。那麼看看AIRFLOW_HOME下面是否是莫名其妙的多了幾個 .err/.out 的文件,進去看看會有收穫。
在須要運行做業的機器上的安裝airflow airflow[celery] celery[redis] 模塊後,啓動airflow worker便可.這樣做業就能運行在多個節點上.
安裝主模塊[airflow@airflow ~]$ pip install airflow2.4.2 安裝數據庫模塊、密碼模塊[airflow@airflow ~]$ pip install "airflow[postgres,password]"