airflow是Airbnb公司於2014年開始開發的一個工做流調度器.不一樣於其它調度器使用XML或者text文件方式定義工做流,airflow經過python文件做流,用戶能夠經過代碼徹底自定義本身的工做流。airflow的主要功能:工做流定義、任務調度、任務依賴、變量、池、分佈式執行任務等。html
web server是airflow的顯示與管理工具,在頁面中能看到任務及執行狀況,還能配置變量、池等
node
調度器用來監控任務執行時間並提交任務給worker執行。在airflow中scheduler作爲獨立的服務來啓動。python
工做進程,負責任務的的執行。worker進程會建立SequentialExecutor、LocalExecutor、CeleryExecutor之一來執行任務。在airflow中做爲獨立服務啓動。mysql
celery flower用來監控celery executor的信息。
url:http://host:5555
web
主dag
即有向無圖,至關於azkban中的project。dag中定義的了任務類型、任務依賴、調度週期等.dag由task組中,task定義了任務的類型、任務腳本等,dag定義task之間的依賴。airflow中的任務表現爲一個個的dag.此外還有subdag,在dag中嵌套一個dag(具體做用需進一步研究)。
sql
subdag
至關於azkban中project 中的flow.將dag中的某些task合併到一個子dag中,將這個子dag作爲一個執行單元。
apache
使用subdag時要注意:
1)by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in 'parent.child' 。
引用子dag時要加上父dag前綴,parent.child編程
2)share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
經過向子dag的operator傳入參數來實如今父dag和子dag信息共享。json
3)SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
子dag必需要設置scheduler,若是沒有設置或者設置爲@once,則子dag直接返回執行成功,可是不會執行任務操做api
4)clearing a SubDagOperator also clears the state of the tasks within
清除子dag(的狀態?)也會清除其中的task狀態
5)marking success on a SubDagOperator does not affect the state of the tasks within
將子dag的狀態標記爲success不會影響所包含的task的狀態
6)refrain from using depends_on_past=True! in tasks within the SubDAG as this can be confusing
不要在dag中使用depends_on_past=True!
7)it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
使用SequentialExecutor來運行子dag,其它的executor執行子dag會出問題
task定義任務的類型、任務內容、任務所依賴的dag等。dag中每一個task都要有不一樣的task_id.
dag = DAG('testFile', default_args=default_args) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( #任務類型是bash task_id='echoDate', #任務id bash_command='echo date > /home/datefile', #任務命令 dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3,[]() dag=dag) t2.set_upstream(t1) #定義任務信賴,任務2依賴於任務1
任務之間經過task.set_upstream\task.set_downstream來設置依賴,也能夠用位運算:
t1>>t2<<t3 表示t2依賴於t1和t3.不建議用該種方式。
操做器,定義任務該以哪一種方式執行。airflow有多種operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社區貢獻的operator等,其中BaseOperator是全部operator的基礎operator。
BaseOperator | 基礎operator,設置baseoperator會影響全部的operator |
BashOperator | executes a bash command |
DummyOperator | 空操做 |
PythonOperator | calls an arbitrary Python function |
EmailOperator | sends an email |
HTTPOperator | sends an HTTP request |
SqlOperator | executes a SQL command |
Sensor | waits for a certain time, file, database row, S3 key, etc… |
t1 = BashOperator( #任務類型是bash task_id='echoDate', #任務id bash_command='echo date > /home/datefile', #任務命令 dag=dag)
scheduler監控dag的狀態,啓動知足條件的dag,並將任務提交給具體的executor執行。dag經過scheduler來設置執行週期。
1.什麼時候執行
注意:當使用schedule_interval
來調度一個dag,假設執行週期爲1天,startdate=2016-01-01,則會在2016-01-01T23:59後執行這個任務。 airflow只會在執行週期的結尾執行任務。
2.設置dag執行週期
在dag中設置schedule_interval
來定義調度週期。該參數能夠接收cron 表達式
和datetime.timedelta
對象,另外airflow還預置了一些調度週期。
preset | Run once a year at midnight of January 1 | cron |
---|---|---|
None |
Don’t schedule, use for exclusively 「externally triggered」 DAGs | |
@once |
Schedule once and only once | |
@hourly |
Run once an hour at the beginning of the hour | 0 * * * * |
@daily |
Run once a day at midnight | 0 0 * * * |
@weekly |
Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly |
Run once a month at midnight of the first day of the month | 0 0 1 * * |
@yearly |
Run once a year at midnight of January 1 | 0 0 1 1 * |
3.backfill和catchup
backfill:填充任務,手動重跑過去失敗的任務(指定日期)。
catchup:若是歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執行當前任務)。能夠在dag中設置dag.catchup = False
或者參數文件中設置catchup_by_default = False
來禁用這個功能。
4.External Triggers
我還沒整明白(等我翻下書再告訴你啊~)
worker指工做節點,相似於yarn中的nodemanager。work負責啓動機器上的executor來執行任務。使用celeryExecutor後能夠在多個機器上部署worker服務。
執行任務的進程,dag中的task由executor來執行。有三個executor:SequentialExecutor(順序執行)、LocalExecutor(本地執行)、CeleryExecutor(遠程執行)。
dag中被實例化的任務。
池用來控制同個pool的task並行度。
aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', execution_timeout=timedelta(hours=3), pool='ep_data_pipeline_db_msg_agg', bash_command=aggregate_db_message_job_cmd, dag=dag) aggregate_db_message_job.set_upstream(wait_for_empty_queue)
上例中,aggregate_db_message_job設置了pool,若是pool的最大並行度爲1,當其它任務也設置該池時,若是aggregate_db_message_job在運行,則其它任務必須等待。
定義對airflow以外的鏈接,如對mysql hive hdfs等工具的鏈接。airflow中預置了一些鏈接類型,如mysql hive hdfs postgrey等。
Hooks 是對外的connection接口,經過自定義hooks實現connection中不支持的鏈接。
airflow中的隊列嚴格來講不叫Queues,叫"lebal"更爲合適。在operator中,能夠設置queue參數如queue=spark,而後在啓動worker時:airflow worker -q spark,那麼該worker只會執行spark任務。至關於節點標籤。、
默認狀況下,dag與dag之間 、task與task之間信息是沒法共享的。若是想在dag、task之間實現信息共享,要使用XComs,經過設置在一個dag(task)中設置XComs參數在另外一箇中讀取來實現信息共享。
在airflow中能夠設置一些變量,在dag和task中能夠引用這些變量:
from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True)
設置變量:
此外,airflow預置了一些變量:
具體參考:http://airflow.incubator.apache.org/code.html#macros
dag中的任務能夠選擇分支! BranchPythonOperator容許用戶經過函數返回下一步要執行的task的id,從而根據條件選擇執行的分支。azkaban沒有該功能。注意,BranchPythonOperator下級task是被"selected"或者"skipped"的分支。
SLAs指在一段時間內應該徹底的操做,好比在一個小時內dag應該執行成功,若是達不目標能夠執行其它任務好比發郵件發短信等。
Trigger Rules定義了某個task在何種狀況下執行。默認狀況下,某個task是否執行,依賴於其父task(直接上游任務)所有執行成功。airflow容許建立更復雜的依賴。經過設置operator中的trigger_rule參數來控制:
all_success
: (default) all parents have succeeded 父task全failed
all_failed
: all parents are in a failed
or upstream_failed
state 父task全failed
或者upstream_failed
狀態all_done
: all parents are done with their execution 父task全執行過,無論success or failedone_failed
: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed
狀態時執行,沒必要等到全部的父task都執行one_success
: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success
狀態時執行,沒必要等到全部的父task都執行dummy
: dependencies are just for show, trigger at will 無條件執行該參數能夠和depends_on_past
結合使用,當設置爲true時,若是上一次沒有執行成功,這一次不管如何都不會執行。
airflow中內置了一些宏,能夠在代碼中引用。
通用宏:
airflow特定的宏:
airflow.macros.ds_add(ds, days) |
airflow.macros.ds_format(ds, input_format, output_format) |
airflow.macros.random() → x in the interval [0, 1) |
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default') |
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default') |
詳細說明:
http://airflow.incubator.apache.org/code.html#macros
airflow支持jinja2語法。Jinja2是基於python的模板引擎,功能比較相似於於PHP的smarty,J2ee的Freemarker和velocity。關於jinja2:
http://10.32.1.149:7180/cmf/login
這個太複雜,待近一步研究
airflow命令的語法結構:
airflow 子命令 [參數1][參數2]….
如 airflow test example_dag print_date 2017-05-06
子命令
子命令包括:
resetdb | Burn down and rebuild the metadata database |
render | Render a task instance’s template(s) |
variables | CRUD operations on variables |
connections | List/Add/Delete connections |
pause | Pause a DAG |
task_failed_deps | Returns the unmet dependencies for a task instance from the perspective of the scheduler |
version | Show the version |
trigger_dag | Trigger a DAG run |
initdb | Initialize the metadata database |
test | Test a task instance. This will run a task without checking for dependencies or recording it’s state in the database. |
unpause | Resume a paused DAG |
dag_state | Get the status of a dag run |
run | Run a single task instance |
list_tasks | List the tasks within a DAG |
backfill | Run subsections of a DAG for a specified date range |
list_dags | List all the DAGs |
kerberos | Start a kerberos ticket renewer |
worker | Start a Celery worker node |
webserver | Start a Airflow webserver instance |
flower | Start a Celery Flower |
scheduler | Start a scheduler instance |
task_state | Get the status of a task instance |
pool | CRUD operations on pools |
serve_logs | Serve logs generate by worker |
clear | Clear a set of task instance, as if they never ran |
upgradedb | Upgrade the metadata database to latest version |
使用:
[bqadm@sitbqbm1~]$ airflow webserver -p 8080
詳細命令參考:
http://airflow.incubator.apache.org/cli.html#
airflow的api分爲Operator、Macros、Modles、Hooks、Executors幾個部分,主要關注Operator、Modles這兩部分
詳細API文檔:
http://airflow.incubator.apache.org/code.html
1.建立一個pthon文件testBashOperator.py:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'yangxw', 'depends_on_past': False, 'start_date': datetime(2017, 5, 9), 'email': ['xiaowen.yang@bqjr.cn'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('printDate', default_args=default_args,schedule_interval='*/1 * * * *') # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='datefile', bash_command='date > /home/bqadm/datefile', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t2.set_upstream(t1)
2.編譯該文件
把文件放到$AIRFLOW_HIME/dags下,而後執行:
[bqadm@bqdpsit1 dags]$ python testFile.py [2017-05-18 10:04:17,422] {__init__.py:57} INFO - Using executor CeleryExecutor
這樣dag就被建立了
3.啓動dag
在web上,點擊最左邊按鈕,將off切換爲on
這樣dag就啓動了。dag啓後,會根據自生的調度狀況執行。上列中的dag每分鐘執行一次,將時間寫入/home/bqadm/datafile裏。
若是執行出錯還會發郵件通知:
airflow內置了16個示例dag,經過學習這些dag的源碼可掌握operator、調度、任務依賴的知識,能快速入門。
airflow是功能強大而且極其靈活的pipeline工具,經過python腳本能控制ETL中各個環節,其缺點是使用比較複雜,須要必定的編程水平。此外,當一個dag中有數十個task時,python文件將變的很是長致使維護不便
。airflow在國內並未普遍使用,面臨必定的技術風險
。