關注公衆號, 查看更多 http://mp.weixin.qq.com/s/xPjXMc_6ssHt16J07BC7jAphp
Airflow是一個工做流分配管理系統,經過有向非循環圖的方式管理任務流程,設置任務依賴關係和時間調度。css
Airflow獨立於咱們要運行的任務,只須要把任務的名字和運行方式提供給Airflow做爲一個task就能夠。html
在Linux終端運行以下命令 (須要已安裝好python2.x
和pip
):python
pip install airflow pip install "airflow[crypto, password]"
安裝成功以後,執行下面三步,就可使用了。默認是使用的SequentialExecutor
, 只能順次執行任務。mysql
airflow initdb
[必須的步驟]airflow webserver -p 8080
[方即可視化管理dag]airflow scheduler
[scheduler啓動後,DAG目錄下的dags就會根據設定的時間定時啓動]airflow test ct1 print_date 2016-05-14
最新版本的Airflow可從https://github.com/apache/incubator-airflow下載得到,解壓縮按照安裝python包的方式安裝。linux
mysql
以啓用LocalExecutor
和CeleryExecutor
安裝mysql數據庫支持git
yum install mysql mysql-server pip install airflow[mysql]
設置mysql根用戶的密碼github
ct@server:~/airflow: mysql -uroot #以root身份登陸mysql,默認無密碼
mysql> SET PASSWORD=PASSWORD("passwd"); mysql> FLUSH PRIVILEGES; # 注意sql語句末尾的分號
新建用戶和數據庫web
# 新建名字爲<airflow>的數據庫
mysql> CREATE DATABASE airflow; # 新建用戶`ct`,密碼爲`152108`, 該用戶對數據庫`airflow`有徹底操做權限 mysql> GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108'; mysql> FLUSH PRIVILEGES;
修改airflow配置文件支持mysqlredis
airflow.cfg
文件一般在~/airflow
目錄下更改數據庫連接
sql_alchemy_conn = mysql://ct:152108@localhost/airflow 對應字段解釋以下: dialect+driver://username:password@host:port/database
初始化數據庫 airflow initdb
初始化數據庫成功後,可進入mysql查看新生成的數據表。
ct@server:~/airflow: mysql -uct -p152108
mysql> USE airflow;
mysql> SHOW TABLES; +-------------------+ | Tables_in_airflow | +-------------------+ | alembic_version | | chart | | connection | | dag | | dag_pickle | | dag_run | | import_error | | job | | known_event | | known_event_type | | log | | sla_miss | | slot_pool | | task_instance | | users | | variable | | xcom | +-------------------+ 17 rows in set (0.00 sec)
centos7中使用mariadb取代了mysql, 但全部命令的執行相同
yum install mariadb mariadb-server systemctl start mariadb ==> 啓動mariadb systemctl enable mariadb ==> 開機自啓動 mysql_secure_installation ==> 設置 root密碼等相關 mysql -uroot -p123456 ==> 測試登陸!
注:做爲測試使用,此步能夠跳過, 最後的生產環境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可以使用LocalExecutor。
前面數據庫已經配置好了,因此若是想使用LocalExecutor就只須要修改airflow配置文件就能夠了。airflow.cfg
文件一般在~/airflow
目錄下,打開更改executor
爲 executor = LocalExecutor
即完成了配置。
把文後TASK部分的dag文件拷貝幾個到~/airflow/dags
目錄下,順次執行下面的命令,而後打開網址http://127.0.0.1:8080就能夠實時偵測任務動態了:
ct@server:~/airflow: airflow initdb` (若前面執行過,就跳過) ct@server:~/airflow: airflow webserver --debug & ct@server:~/airflow: airflow scheduler
安裝airflow的celery和rabbitmq組件
pip install airflow[celery] pip install airflow[rabbitmq]
安裝erlang和rabbitmq
yum
或apt-get
安裝則萬事大吉。# (Centos6,[REF](http://www.rabbitmq.com/install-rpm.html)) wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm yum install esl-erlang_18.3-1~centos~6_amd64.rpm wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm yum install esl-erlang-compat-18.1-1.noarch.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm yum install rabbitmq-server-3.6.1-1.noarch.rpm
配置rabbitmq
rabbitmq-server -detached
chkconfig rabbitmq-server on
配置rabbitmq (REF)
rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*" rabbitmq-plugins enable rabbitmq_management # no usage
修改airflow配置文件支持Celery
airflow.cfg
文件一般在~/airflow
目錄下
更改executor爲 executor = CeleryExecutor
broker_url = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
# 能夠與broker_url相同 celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
測試
airflow webserver --debug
airflow worker
airflow scheduler
airflow backend
的數據庫, 可以使用airflow resetdb
清空。ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}
#!/bin/bash #set -x #set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to start or restart webserver service. ${txtbld}OPTIONS${txtrst}: -S Start airflow system [${bldred}Default FALSE${txtrst}] -s Restart airflow server only [${bldred}Default FALSE${txtrst}] -a Restart all airflow programs including webserver, worker and scheduler. [${bldred}Default FALSE${txtrst}] EOF } start_all= server_only= all= while getopts "hs:S:a:" OPTION do case $OPTION in h) usage exit 1 ;; S) start_all=$OPTARG ;; s) server_only=$OPTARG ;; a) all=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; then usage exit 1 fi if [ "$server_only" == "TRUE" ]; then ps -ef | grep -Ei '(airflow-webserver)' | grep master | \ awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >webserver.log 2>&1 & fi if [ "$all" == "TRUE" ]; then ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi if [ "${start_all}" == "TRUE" ]; then cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi
dags_folder
dags_folder
目錄支持子目錄和軟鏈接,所以不一樣的dag能夠分門別類的存儲起來。
設置郵件發送服務
smtp_host = smtp.163.com smtp_starttls = True smtp_ssl = False smtp_user = username@163.com smtp_port = 25 smtp_password = userpasswd smtp_mail_from = username@163.com
多用戶登陸設置 (彷佛只有CeleryExecutor支持)
airflow.cfg
中的下面3行配置authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = True
import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'ehbio' user.email = 'mail@ehbio.com' user.password = 'ehbio' session = settings.Session() session.add(user) session.commit() session.close() exit()
參數解釋
depends_on_past
Airflow assumes idempotent tasks that operate on immutable data
chunks. It also assumes that all task instance (each task for each
schedule) needs to run.
If your tasks need to be executed sequentially, you need to
tell Airflow: use the depends_on_past=True
flag on the tasks
that require sequential execution.)
若是在TASK本該運行卻沒有運行時,或者設置的interval
爲@once
時,推薦使用depends_on_past=False
。我在運行dag時,有時會出現,明明上游任務已經運行結束,下游任務卻沒有啓動,整個dag就卡住了。這時設置depends_on_past=False
能夠解決這類問題。
timestamp
in format like 2016-01-01T00:03:00
Task中調用的命令出錯後須要在網站Graph view
中點擊run
手動重啓。
爲了方便任務修改後的順利運行,有個折衷的方法是:
email_on_retry: True
retry_delay
,方便在收到郵件後,能有時間作出處理retry_delay
,方便快速啓動寫完task DAG後,必定記得先檢測下有無語法錯誤 python dag.py
測試文件1:ct1.py
from airflow import DAG from airflow.operators import BashOperator, MySqlOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', #爲了測試方便,起始時間通常爲當前時間減去schedule_interval 'start_date': datatime(2016, 5, 29, 8, 30), 'email': ['chentong_biology@163.com'], 'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } # DAG id 'ct1'必須在airflow中是unique的, 通常與文件名相同 # 多個用戶時可加用戶名作標記 dag = DAG('ct1', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) #cmd = "/home/test/test.bash " 注意末尾的空格 t2 = BashOperator( task_id='echo', bash_command='echo "test" ', retries=3, dag=dag) templated_command = """ {% for i in range(2) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': "Parameter I passed in"}, dag=dag) # This means that t2 will depend on t1 running successfully to run # It is equivalent to t1.set_downstream(t2) t2.set_upstream(t1) t3.set_upstream(t1) # all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated')
測試文件2: ct2.py
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago, 'email': ['chentong_biology@163.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } dag = DAG('ct2', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag) t2 = BashOperator( task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag) t2.set_upstream(t1)
run1.sh
#!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} -z Is there a header[${bldred}Default TRUE${txtrst}] EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; z) header=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi cat <<END >$file A B C D E F G END sleep 20s
run2.sh
#!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi awk 'BEGIN{OFS=FS="\t"}{print $0, "53"}' $file >${file}.out
The DagRun object has room for a conf
parameter that gets exposed
in the 「context」 (templates, operators, …). That is the place
where you would associate parameters to a specific run. For now this
is only possible in the context of an externally triggered DAG run.
The way the TriggerDagRunOperator
works, you can fill in the conf
param during the execution of the callable that you pass to the
operator.
If you are looking to change the shape of your DAG through parameters,
we recommend doing that using 「singleton」 DAGs (using a 「@once」 schedule_interval
), meaning that you would write a
Python program that generates multiple dag_ids, one of each run,
probably based on metadata stored in a config file or elsewhere.
The idea is that if you use parameters to alter the shape of your
DAG, you break some of the assumptions around continuity of the
schedule. Things like visualizing the tree view or how to perform a
backfill becomes unclear and mushy. So if the shape of your DAG
changes radically based on parameters, we consider those to be
different DAGs, and you generate each one in your pipeline file.
徹底刪掉某個DAG的信息
set @dag_id = 'BAD_DAG'; delete from airflow.xcom where dag_id = @dag_id; delete from airflow.task_instance where dag_id = @dag_id; delete from airflow.sla_miss where dag_id = @dag_id; delete from airflow.log where dag_id = @dag_id; delete from airflow.job where dag_id = @dag_id; delete from airflow.dag_run where dag_id = @dag_id; delete from airflow.dag where dag_id = @dag_id;
supervisord自動管理進程
[program:airflow_webserver]
command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-webserver.err.log stdout_logfile=/var/log/airflow-webserver.out.log [program:airflow_worker] command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-worker.err.log stdout_logfile=/var/log/airflow-worker.out.log [program:airflow_scheduler] command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log
在特定狀況下,修改DAG後,爲了不當前日期以前任務的運行,可使用backfill
填補特定時間段的任務
airflow backfill -s START -e END --mark_success DAG_ID
以前的配置都是在內網服務器進行的,但內網服務器只開放了SSH端口22,所以
我嘗試在另一臺電腦上使用相同的配置,而後設置端口轉發,把外網服務器
的rabbitmq的5672端口映射到內網服務器的對應端口,而後啓動airflow鏈接
。
ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
上一條命令表示的格式爲
ssh -R <local port>:<remote host>:<remote port> <SSH hostname>
local port
表示hostname的port
Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672
-v: 在測試時打開
rabbitmq 5672
端口。webserver
scheduler
, 在內網服務器啓動 airflow worker
發現任務執行狀態丟失。繼續學習Celery,以解決此問題。tar xvzf redis-3.2.0.tar.gz
and make
redis-server
啓動redisps -ef | grep 'redis'
檢測後臺進程是否存在netstat -lntp | grep 6379
start_date
和end_date
是否在合適的時間範圍內airflow worker
, airflow scheduler
和 airflow webserver --debug
的輸出,有沒有某個任務運行異常logs
文件夾下的日誌輸出dag
一個新的dag_id
文章原寫於http://blog.genesino.com/2016/05/airflow/。轉載請註明出處。