airflow 1.10.0html
官方:http://airflow.apache.org/node
Airflow is a platform to programmatically author, schedule and monitor workflows.python
Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.mysql
When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.linux
airflow是一個能夠經過python代碼來編排、調度和監控工做流的平臺;工做流是一系列task的dag(directed acyclic graphs,有向無環圖);git
web server 使用 gunicorn 服務器,經過airflow.cfg中workers配置併發進程數;github
The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.web
四種Executor:SequentialExecutor、LocalExecutor、CeleryExecutor、MesosExecutor:redis
1)Airflow uses a sqlite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend. It works in conjunction with the SequentialExecutor which will only run task instances sequentially.
2)LocalExecutor, tasks will be executed as subprocesses;
3)CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow.cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.sql
broker_url = amqp://guest:guest@rabbitmq_server:5672/
broker_url = redis://$redis_server:6379/0
4)MesosExecutor allows you to schedule airflow tasks on a Mesos cluster.
[mesos]
master = localhost:5050
SequentialExecutor搭配sqlite庫使用,LocalExecutor使用子進程來執行任務,CeleryExecutor須要依賴backend執行(好比RabbitMQ或Redis),MesosExecutor會提交任務到mesos集羣;
In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
dag是一系列task的集合按照依賴關係組織成有向無環圖,至關於workflow;
An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct certain order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.
operator描述了工做流中的一個task,是一個抽象的概念,至關於抽象task定義;
Once an operator is instantiated, it is referred to as a 「task」. The instantiation defines specific values when calling the abstract operator, and the parameterized task becomes a node in a DAG.
operator實例化(構造函數)以後成爲task,task是一個具體的概念,做爲dag的一部分;
A DAG Run is an object representing an instantiation of the DAG in time.
dag run是一個dag的實例對象,至關於workflow instance;
A task instance represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time. Task instances also have an indicative state, which could be 「running」, 「success」, 「failed」, 「skipped」, 「up for retry」, etc.
task每次執行都會生成一個task instance,每一個task instance都有狀態,好比running、success、failed等;
詳見:http://www.javashuo.com/article/p-wmhbqyhj-ks.html
詳見:http://www.javashuo.com/article/p-dqpudpgh-gx.html
# python --version
# curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
# python get-pip.py
pip is already installed if you are using Python 2 >=2.7.9 or Python 3 >=3.4 downloaded from python.org
# pip install apache-airflow
1)若是報錯:
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 394, in <module>
do_setup()
File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 259, in do_setup
verify_gpl_dependency()
File "/tmp/pip-install-xR3O9b/apache-airflow/setup.py", line 49, in verify_gpl_dependency
raise RuntimeError("By default one of Airflow's dependencies installs a GPL "
RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE
----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-xR3O9b/apache-airflow/
須要設置環境變量
# export SLUGIFY_USES_TEXT_UNIDECODE=yes
2)若是報錯:
psutil/_psutil_linux.c:12:20: fatal error: Python.h: No such file or directory
#include <Python.h>
^
compilation terminated.
error: command 'gcc' failed with exit status 1
----------------------------------------
Command "/bin/python -u -c "import setuptools, tokenize;__file__='/tmp/pip-install-v4aq0G/psutil/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-record-2jrZ_B/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-install-v4aq0G/psutil/
須要安裝
# yum install python-devel
# export AIRFLOW_HOME=/path/to/airflow
默認在 /usr/local/airflow
# whereis airflow
airflow: /usr/bin/airflow# airflow version
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.1
自動建立$AIRFLOW_HOME/airflow.cfg
$AIRFLOW_HOME/airflow.cfg
修改以下配置
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = LocalExecutor# Default timezone in case supplied date times are naive
# can be utc (default), system, or any IANA timezone string (e.g. Europe/Amsterdam)
default_timezone = Asia/Shanghai
修改sql_alchemy_conn爲mysql或postgres鏈接串,同時將executor改成LocalExecutor
# airflow initdb
# airflow -h
若是報錯
No handlers could be found for logger "airflow.logging_config"
Traceback (most recent call last):
File "/usr/bin/airflow", line 21, in <module>
from airflow import configuration
File "/usr/lib/python2.7/site-packages/airflow/__init__.py", line 36, in <module>
from airflow import settings
File "/usr/lib/python2.7/site-packages/airflow/settings.py", line 229, in <module>
configure_logging()
File "/usr/lib/python2.7/site-packages/airflow/logging_config.py", line 71, in configure_logging
raise e
ValueError: Unable to configure handler 'task': Cannot resolve 'airflow.utils.log.file_task_handler.FileTaskHandler': cannot import name UnrewindableBodyError
重裝urllib3
# pip uninstall urllib3
# pip install urllib3
若是還有問題,重裝chardet、idna、urllib3
dag示例:
from datetime import timedelta, datetime import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'www', 'depends_on_past': False, 'start_date': datetime(2019, 1, 25), 'email': ['test@cdp.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'hello_dag', default_args=default_args, description='hello world DAG', schedule_interval='*/5 * * * *' ) start_operator = DummyOperator(task_id='start_task', dag=dag) sh_hello_operator = BashOperator( task_id='sh_hello_task', depends_on_past=False, bash_command='echo "hello {{ params.p }} : "`date` >> /tmp/test.txt', params={'p':'world'}, dag=dag ) def print_hello(): return 'Hello world!' py_hello_operator = PythonOperator( task_id='py_hello_task', python_callable=print_hello, dag=dag) start_operator >> sh_hello_operator sh_hello_operator >> py_hello_operator
示例dag中包含經常使用的BashOperator和PythonOperator,以及task之間的依賴關係
頁面上看起來是這樣的
Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks.
People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
airflow的python腳本只是定義dag的結構,實際執行時每一個task都會在不一樣的worker或者不一樣的context下執行,因此不要在腳本中傳遞變量或者執行實際業務邏輯,腳本會被scheduler按期執行來刷新dag;
Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
dag腳本中支持jinja模板,jinja模板詳見:http://jinja.pocoo.org/docs/dev/api/
參考:http://airflow.apache.org/tutorial.html#it-s-a-dag-definition-file
Time to run some tests. First let’s make sure that the pipeline parses. Let’s assume we’re saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.
# test your code without syntax error
# python ~/airflow/dags/$dag.py# print the list of active DAGs
# airflow list_dags# prints the list of tasks the dag_id
airflow list_tasks $dag_id# prints the hierarchy of tasks in the DAG
airflow list_tasks $dag_id --tree# test your task instance
# airflow test $dag_id $task_id 2015-01-01# run your task instance
# airflow run $dag_id $task_id 2015-01-01# get the status of task
# airflow task_state $dag_id $task_id 2015-01-01# trigger a dag run
# airflow trigger_dag $dag_id 2015-01-01# get the status of dag
# airflow dag_state $dag 2015-01-01# run a backfill over 2 days
# airflow backfill $dag_id -s 2015-01-01 -e 2015-01-02
airflow run|test 均可以執行task,區別是run會進行不少檢查,好比:
dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es).
dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
執行task以後日誌位於~/airflow/logs/$dag_id/$task_id/下;
# start the web server, default port is 8080
airflow webserver -p 8080# start the scheduler
airflow scheduler# visit localhost:8080 in the browser and enable the example dag in the home page
將定義dag的py文件拷貝到$AIRFLOW_HOME/dags/目錄下,scheduler會自動發現和加載,日誌位於$AIRFLOW_HOME/logs/$dag_id/$task_id/目錄下,airflow會按期從dags目錄加載dag
[2019-03-01 03:20:39,174] {{models.py:273}} INFO - Filling up the DagBag from /usr/local/airflow/dags
訪問 http://$server_ip:8080/admin/
airflow中web server和worker均可以啓動多個,可是scheduler只能啓動一個,這樣形成了airflow的單點,目前已經有第三方開源方案來解決這個問題:
Airflow Scheduler Failover Controller
地址:https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
實現原理
The Airflow Scheduler Failover Controller (ASFC) is a mechanism that ensures that only one Scheduler instance is running in an Airflow Cluster at a time. This way you don't come across the issues we described in the "Motivation" section above.
You will first need to startup the ASFC on each of the instances you want the scheduler to be running on. When you start up multiple instances of the ASFC one of them takes on the Active state and the other takes on a Standby state. There is a heart beat mechanism setup to track if the Active ASFC is still active. If the Active ASFC misses multiple heart beats, the Standby ASFC becomes active.
The Active ASFC will poll every 10 seconds to see if the scheduler is running on the desired node. If it is not, the ASFC will try to restart the daemon. If the scheduler daemons still doesn't startup, the daemon is started on another node in the cluster.
安裝
# git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
# cd airflow-scheduler-failover-controller
# pip install -e .
報錯
Collecting airflow>=1.7.0 (from scheduler-failover-controller==1.0.1)
Could not find a version that satisfies the requirement airflow>=1.7.0 (from scheduler-failover-controller==1.0.1) (from versions: 0.6)
No matching distribution found for airflow>=1.7.0 (from scheduler-failover-controller==1.0.1)
查看
# vi setup.py
install_requires=[
'airflow>=1.7.0',
'kazoo>=2.2.1',
'coverage>=4.2',
'eventlet>=0.9.7',
],# pip list|grep airflow
apache-airflow 1.10.0
須要將setup.py中airflow改成apache-airflow,安裝以後啓動
# scheduler_failover_controller -h
會報錯
pkg_resources.ContextualVersionConflict: (Flask-Login 0.2.11 (/usr/lib64/python2.7/site-packages), Requirement.parse('Flask-Login<0.5,>=0.3'), set(['flask-appbuilder']))
重裝Flask-Login
# pip uninstall Flask-Login
# pip install Flask-Login
重裝以後是Flask-Login 0.4.1,知足要求,可是又會報錯
apache-airflow 1.10.0 has requirement flask-login==0.2.11, but you'll have flask-login 0.4.1 which is incompatible.
因此Airflow Scheduler Failover Controller和airflow1.10.0不兼容;