3.Airflow使用

1. airflow簡介

airflow是Airbnb公司於2014年開始開發的一個工做流調度器.不一樣於其它調度器使用XML或者text文件方式定義工做流,airflow經過python文件做流,用戶能夠經過代碼徹底自定義本身的工做流。airflow的主要功能:工做流定義、任務調度、任務依賴、變量、池、分佈式執行任務等。html

2. 相關概念

2.1 服務進程

2.1.1. web server

web server是airflow的顯示與管理工具,在頁面中能看到任務及執行狀況,還能配置變量、池等
node

2.1.2. scheduler

調度器用來監控任務執行時間並提交任務給worker執行。在airflow中scheduler作爲獨立的服務來啓動。python

2.1.3. worker

工做進程,負責任務的的執行。worker進程會建立SequentialExecutor、LocalExecutor、CeleryExecutor之一來執行任務。在airflow中做爲獨立服務啓動。mysql

2.1.4. celery flower

celery flower用來監控celery executor的信息。
url:http://host:5555
web

2.2 相關概念

2.2.1. dag

  • 主dag
    即有向無圖,至關於azkban中的project。dag中定義的了任務類型、任務依賴、調度週期等.dag由task組中,task定義了任務的類型、任務腳本等,dag定義task之間的依賴。airflow中的任務表現爲一個個的dag.此外還有subdag,在dag中嵌套一個dag(具體做用需進一步研究)。
    sql

  • subdag
    至關於azkban中project 中的flow.將dag中的某些task合併到一個子dag中,將這個子dag作爲一個執行單元。
    apache

使用subdag時要注意:
1)by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in 'parent.child' 。
引用子dag時要加上父dag前綴,parent.child編程

2)share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)
經過向子dag的operator傳入參數來實如今父dag和子dag信息共享。json

3)SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
子dag必需要設置scheduler,若是沒有設置或者設置爲@once,則子dag直接返回執行成功,可是不會執行任務操做api

4)clearing a SubDagOperator also clears the state of the tasks within
清除子dag(的狀態?)也會清除其中的task狀態

5)marking success on a SubDagOperator does not affect the state of the tasks within
將子dag的狀態標記爲success不會影響所包含的task的狀態

6)refrain from using depends_on_past=True! in tasks within the SubDAG as this can be confusing
不要在dag中使用depends_on_past=True!

7)it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
使用SequentialExecutor來運行子dag,其它的executor執行子dag會出問題

2.2.2.task

task定義任務的類型、任務內容、任務所依賴的dag等。dag中每一個task都要有不一樣的task_id.

dag = DAG('testFile', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(   #任務類型是bash
    task_id='echoDate', #任務id
    bash_command='echo date > /home/datefile', #任務命令
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,[]()
    dag=dag)

t2.set_upstream(t1) #定義任務信賴,任務2依賴於任務1

任務之間經過task.set_upstream\task.set_downstream來設置依賴,也能夠用位運算:
t1>>t2<<t3 表示t2依賴於t1和t3.不建議用該種方式。

2.2.3.Operator

操做器,定義任務該以哪一種方式執行。airflow有多種operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社區貢獻的operator等,其中BaseOperator是全部operator的基礎operator。

BaseOperator 基礎operator,設置baseoperator會影響全部的operator
BashOperator executes a bash command
DummyOperator 空操做
PythonOperator calls an arbitrary Python function
EmailOperator sends an email
HTTPOperator sends an HTTP request
SqlOperator executes a SQL command
Sensor waits for a certain time, file, database row, S3 key, etc…
t1 = BashOperator(   #任務類型是bash
    task_id='echoDate', #任務id
    bash_command='echo date > /home/datefile', #任務命令
    dag=dag)

2.2.4 scheduler

scheduler監控dag的狀態,啓動知足條件的dag,並將任務提交給具體的executor執行。dag經過scheduler來設置執行週期。

1.什麼時候執行
注意:當使用schedule_interval來調度一個dag,假設執行週期爲1天,startdate=2016-01-01,則會在2016-01-01T23:59後執行這個任務。 airflow只會在執行週期的結尾執行任務。

2.設置dag執行週期
在dag中設置schedule_interval來定義調度週期。該參數能夠接收cron 表達式datetime.timedelta對象,另外airflow還預置了一些調度週期。

preset Run once a year at midnight of January 1 cron
None Don’t schedule, use for exclusively 「externally triggered」 DAGs
@once Schedule once and only once
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 0 0 1 * *
@yearly Run once a year at midnight of January 1 0 0 1 1 *

3.backfill和catchup
backfill:填充任務,手動重跑過去失敗的任務(指定日期)。
catchup:若是歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執行當前任務)。能夠在dag中設置dag.catchup = False或者參數文件中設置catchup_by_default = False來禁用這個功能。

4.External Triggers
我還沒整明白(等我翻下書再告訴你啊~)

2.2.5.worker

worker指工做節點,相似於yarn中的nodemanager。work負責啓動機器上的executor來執行任務。使用celeryExecutor後能夠在多個機器上部署worker服務。

2.2.6.executor

執行任務的進程,dag中的task由executor來執行。有三個executor:SequentialExecutor(順序執行)、LocalExecutor(本地執行)、CeleryExecutor(遠程執行)。

2.2.7.Task Instances

dag中被實例化的任務。

2.2.8.pool

池用來控制同個pool的task並行度。

aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

上例中,aggregate_db_message_job設置了pool,若是pool的最大並行度爲1,當其它任務也設置該池時,若是aggregate_db_message_job在運行,則其它任務必須等待。

2.2.9.connection

定義對airflow以外的鏈接,如對mysql hive hdfs等工具的鏈接。airflow中預置了一些鏈接類型,如mysql hive hdfs postgrey等。

2.2.10.Hooks

Hooks 是對外的connection接口,經過自定義hooks實現connection中不支持的鏈接。

2.2.11.Queues

airflow中的隊列嚴格來講不叫Queues,叫"lebal"更爲合適。在operator中,能夠設置queue參數如queue=spark,而後在啓動worker時:airflow worker -q spark,那麼該worker只會執行spark任務。至關於節點標籤。、

2.2.12.XComs

默認狀況下,dag與dag之間 、task與task之間信息是沒法共享的。若是想在dag、task之間實現信息共享,要使用XComs,經過設置在一個dag(task)中設置XComs參數在另外一箇中讀取來實現信息共享。

2.2.13.Variables

在airflow中能夠設置一些變量,在dag和task中能夠引用這些變量:

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

設置變量:

此外,airflow預置了一些變量:

具體參考:http://airflow.incubator.apache.org/code.html#macros

2.2.14.Branching

dag中的任務能夠選擇分支! BranchPythonOperator容許用戶經過函數返回下一步要執行的task的id,從而根據條件選擇執行的分支。azkaban沒有該功能。注意,BranchPythonOperator下級task是被"selected"或者"skipped"的分支。

2.2.15.SLAs (Service Level Agreements)

SLAs指在一段時間內應該徹底的操做,好比在一個小時內dag應該執行成功,若是達不目標能夠執行其它任務好比發郵件發短信等。

2.2.16.Trigger Rules

Trigger Rules定義了某個task在何種狀況下執行。默認狀況下,某個task是否執行,依賴於其父task(直接上游任務)所有執行成功。airflow容許建立更復雜的依賴。經過設置operator中的trigger_rule參數來控制:

  • all_success: (default) all parents have succeeded 父task全failed
  • all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed狀態
  • all_done: all parents are done with their execution 父task全執行過,無論success or failed
  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed狀態時執行,沒必要等到全部的父task都執行
  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success狀態時執行,沒必要等到全部的父task都執行
  • dummy: dependencies are just for show, trigger at will 無條件執行

該參數能夠和depends_on_past結合使用,當設置爲true時,若是上一次沒有執行成功,這一次不管如何都不會執行。

2.2.17 宏

airflow中內置了一些宏,能夠在代碼中引用。
通用宏:

airflow特定的宏:

airflow.macros.ds_add(ds, days)
airflow.macros.ds_format(ds, input_format, output_format)
airflow.macros.random() → x in the interval [0, 1)
airflow.macros.hive.closest_ds_partition(table, ds, before=True, schema='default', metastore_conn_id='metastore_default')
airflow.macros.hive.max_partition(table, schema='default', field=None, filter=None, metastore_conn_id='metastore_default')

詳細說明:
http://airflow.incubator.apache.org/code.html#macros

2.2.18 jinja2

airflow支持jinja2語法。Jinja2是基於python的模板引擎,功能比較相似於於PHP的smarty,J2ee的Freemarker和velocity。關於jinja2:
http://10.32.1.149:7180/cmf/login

2.2.19 Latest Run Only

這個太複雜,待近一步研究

3. 命令行

  • airflow命令的語法結構:

    airflow 子命令 [參數1][參數2]….
    如 airflow test example_dag print_date 2017-05-06

  • 子命令
    子命令包括:

resetdb Burn down and rebuild the metadata database
render Render a task instance’s template(s)
variables CRUD operations on variables
connections List/Add/Delete connections
pause Pause a DAG
task_failed_deps Returns the unmet dependencies for a task instance from the perspective of the scheduler
version Show the version
trigger_dag Trigger a DAG run
initdb Initialize the metadata database
test Test a task instance. This will run a task without checking for dependencies or recording it’s state in the database.
unpause Resume a paused DAG
dag_state Get the status of a dag run
run Run a single task instance
list_tasks List the tasks within a DAG
backfill Run subsections of a DAG for a specified date range
list_dags List all the DAGs
kerberos Start a kerberos ticket renewer
worker Start a Celery worker node
webserver Start a Airflow webserver instance
flower Start a Celery Flower
scheduler Start a scheduler instance
task_state Get the status of a task instance
pool CRUD operations on pools
serve_logs Serve logs generate by worker
clear Clear a set of task instance, as if they never ran
upgradedb Upgrade the metadata database to latest version

使用:

[bqadm@sitbqbm1~]$ airflow webserver -p 8080

詳細命令參考:
http://airflow.incubator.apache.org/cli.html#

4. API

airflow的api分爲Operator、Macros、Modles、Hooks、Executors幾個部分,主要關注Operator、Modles這兩部分

詳細API文檔:
http://airflow.incubator.apache.org/code.html

5. 使用

5.1 建立dag

1.建立一個pthon文件testBashOperator.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'yangxw',
    'depends_on_past': False,
    'start_date': datetime(2017, 5, 9),
    'email': ['xiaowen.yang@bqjr.cn'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('printDate', default_args=default_args,schedule_interval='*/1 * * * *')

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='datefile',
    bash_command='date > /home/bqadm/datefile',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

2.編譯該文件
把文件放到$AIRFLOW_HIME/dags下,而後執行:

[bqadm@bqdpsit1 dags]$ python testFile.py 
[2017-05-18 10:04:17,422] {__init__.py:57} INFO - Using executor CeleryExecutor

這樣dag就被建立了

3.啓動dag
在web上,點擊最左邊按鈕,將off切換爲on

這樣dag就啓動了。dag啓後,會根據自生的調度狀況執行。上列中的dag每分鐘執行一次,將時間寫入/home/bqadm/datafile裏。

若是執行出錯還會發郵件通知:

5.2 示例dag

airflow內置了16個示例dag,經過學習這些dag的源碼可掌握operator、調度、任務依賴的知識,能快速入門。

6. 總結

airflow是功能強大而且極其靈活的pipeline工具,經過python腳本能控制ETL中各個環節,其缺點是使用比較複雜,須要必定的編程水平。此外,當一個dag中有數十個task時,python文件將變的很是長致使維護不便。airflow在國內並未普遍使用,面臨必定的技術風險



相關文章
相關標籤/搜索