Airflow使用入門指南

Airflow能作什麼

關注公衆號, 查看更多 http://mp.weixin.qq.com/s/xPjXMc_6ssHt16J07BC7jAphp

Airflow是一個工做流分配管理系統,經過有向非循環圖的方式管理任務流程,設置任務依賴關係和時間調度。css

Airflow獨立於咱們要運行的任務,只須要把任務的名字和運行方式提供給Airflow做爲一個task就能夠。html

安裝和使用

最簡單安裝

在Linux終端運行以下命令 (須要已安裝好python2.xpip):python

pip install airflow pip install "airflow[crypto, password]"
  • 1
  • 2

安裝成功以後,執行下面三步,就可使用了。默認是使用的SequentialExecutor, 只能順次執行任務。mysql

  • 初始化數據庫 airflow initdb [必須的步驟]
  • 啓動web服務器 airflow webserver -p 8080 [方即可視化管理dag]
  • 啓動任務 airflow scheduler [scheduler啓動後,DAG目錄下的dags就會根據設定的時間定時啓動]
  • 此外咱們還能夠直接測試單個DAG,如測試文章末尾的DAG airflow test ct1 print_date 2016-05-14

最新版本的Airflow可從https://github.com/apache/incubator-airflow下載得到,解壓縮按照安裝python包的方式安裝。linux

配置 mysql以啓用LocalExecutorCeleryExecutor

  • 安裝mysql數據庫支持git

    yum install mysql mysql-server pip install airflow[mysql]
    • 1
    • 2
  • 設置mysql根用戶的密碼github

    ct@server:~/airflow: mysql -uroot #以root身份登陸mysql,默認無密碼
    mysql> SET PASSWORD=PASSWORD("passwd"); mysql> FLUSH PRIVILEGES; # 注意sql語句末尾的分號 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 新建用戶和數據庫web

    # 新建名字爲<airflow>的數據庫
    
    mysql> CREATE DATABASE airflow; # 新建用戶`ct`,密碼爲`152108`, 該用戶對數據庫`airflow`有徹底操做權限 mysql> GRANT all privileges on airflow.* TO 'ct'@'localhost' IDENTIFIED BY '152108'; mysql> FLUSH PRIVILEGES; 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 修改airflow配置文件支持mysqlredis

    • airflow.cfg 文件一般在~/airflow目錄下
    • 更改數據庫連接

      sql_alchemy_conn = mysql://ct:152108@localhost/airflow 對應字段解釋以下: dialect+driver://username:password@host:port/database
      • 1
      • 2
    • 初始化數據庫 airflow initdb

    • 初始化數據庫成功後,可進入mysql查看新生成的數據表。

      ct@server:~/airflow: mysql -uct -p152108
      mysql> USE airflow;
      mysql> SHOW TABLES; +-------------------+ | Tables_in_airflow | +-------------------+ | alembic_version | | chart | | connection | | dag | | dag_pickle | | dag_run | | import_error | | job | | known_event | | known_event_type | | log | | sla_miss | | slot_pool | | task_instance | | users | | variable | | xcom | +-------------------+ 17 rows in set (0.00 sec)
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
  • centos7中使用mariadb取代了mysql, 但全部命令的執行相同

    yum install mariadb mariadb-server
    systemctl start mariadb ==> 啓動mariadb
    systemctl enable mariadb ==> 開機自啓動
    mysql_secure_installation ==> 設置 root密碼等相關
    mysql -uroot -p123456 ==> 測試登陸!
    • 1
    • 2
    • 3
    • 4
    • 5

配置LocalExecutor

注:做爲測試使用,此步能夠跳過, 最後的生產環境用的是CeleryExecutor; 若CeleryExecutor配置不方便,也可以使用LocalExecutor。

前面數據庫已經配置好了,因此若是想使用LocalExecutor就只須要修改airflow配置文件就能夠了。airflow.cfg 文件一般在~/airflow目錄下,打開更改executor爲 executor = LocalExecutor即完成了配置。

把文後TASK部分的dag文件拷貝幾個到~/airflow/dags目錄下,順次執行下面的命令,而後打開網址http://127.0.0.1:8080就能夠實時偵測任務動態了:

ct@server:~/airflow: airflow initdb` (若前面執行過,就跳過) ct@server:~/airflow: airflow webserver --debug & ct@server:~/airflow: airflow scheduler
  • 1
  • 2
  • 3

配置CeleryExecutor (rabbitmq支持)

  • 安裝airflow的celery和rabbitmq組件

    pip install airflow[celery] pip install airflow[rabbitmq]
    • 1
    • 2
  • 安裝erlang和rabbitmq

    • 若是能直接使用yumapt-get安裝則萬事大吉。
    • 我使用的CentOS6則不能,須要以下一番折騰,
    # (Centos6,[REF](http://www.rabbitmq.com/install-rpm.html)) wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_18.3-1~centos~6_amd64.rpm yum install esl-erlang_18.3-1~centos~6_amd64.rpm wget https://github.com/jasonmcintosh/esl-erlang-compat/releases/download/1.1.1/esl-erlang-compat-18.1-1.noarch.rpm yum install esl-erlang-compat-18.1-1.noarch.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm yum install rabbitmq-server-3.6.1-1.noarch.rpm
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 配置rabbitmq

    • 啓動rabbitmq: rabbitmq-server -detached
    • 開機啓動rabbitmq: chkconfig rabbitmq-server on
    • 配置rabbitmq (REF)

      rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*" rabbitmq-plugins enable rabbitmq_management # no usage
      • 1
      • 2
      • 3
      • 4
      • 5
  • 修改airflow配置文件支持Celery

    • airflow.cfg 文件一般在~/airflow目錄下

    • 更改executor爲 executor = CeleryExecutor

    • 更改broker_url

      broker_url = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
      • 1
      • 2
    • 更改celery_result_backend,

      # 能夠與broker_url相同 celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow Format explanation: transport://userid:password@hostname:port/virtual_host
      • 1
      • 2
      • 3
      • 4
      • 5
  • 測試

    • 啓動服務器:airflow webserver --debug
    • 啓動celery worker (不能用根用戶):airflow worker
    • 啓動scheduler: airflow scheduler
    • 提示: 
      • 測試過程當中注意觀察運行上面3個命令的3個窗口輸出的日誌
      • 當遇到不符合常理的狀況時考慮清空 airflow backend的數據庫, 可以使用airflow resetdb清空。
      • 刪除dag文件後,webserver中可能還會存在相應信息,這時須要重啓webserver並刷新網頁。
      • 關閉webserver:ps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}

一個腳本控制airflow系統的啓動和重啓

#!/bin/bash #set -x #set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to start or restart webserver service. ${txtbld}OPTIONS${txtrst}: -S Start airflow system [${bldred}Default FALSE${txtrst}] -s Restart airflow server only [${bldred}Default FALSE${txtrst}] -a Restart all airflow programs including webserver, worker and scheduler. [${bldred}Default FALSE${txtrst}] EOF } start_all= server_only= all= while getopts "hs:S:a:" OPTION do case $OPTION in h) usage exit 1 ;; S) start_all=$OPTARG ;; s) server_only=$OPTARG ;; a) all=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z "$server_only" ] && [ -z "$all" ] && [ -z "${start_all}" ]; then usage exit 1 fi if [ "$server_only" == "TRUE" ]; then ps -ef | grep -Ei '(airflow-webserver)' | grep master | \ awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >webserver.log 2>&1 & fi if [ "$all" == "TRUE" ]; then ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {} cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi if [ "${start_all}" == "TRUE" ]; then cd ~/airflow/ nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & fi 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

airflow.cfg 其它配置

  • dags_folder

    dags_folder目錄支持子目錄和軟鏈接,所以不一樣的dag能夠分門別類的存儲起來。

  • 設置郵件發送服務

    smtp_host = smtp.163.com smtp_starttls = True smtp_ssl = False smtp_user = username@163.com smtp_port = 25 smtp_password = userpasswd smtp_mail_from = username@163.com
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 多用戶登陸設置 (彷佛只有CeleryExecutor支持)

    • 修改airflow.cfg中的下面3行配置
    authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth filter_by_owner = True
    • 1
    • 2
    • 3
    • 增長一個用戶(在airflow所在服務器的python下運行)
    import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'ehbio' user.email = 'mail@ehbio.com' user.password = 'ehbio' session = settings.Session() session.add(user) session.commit() session.close() exit()
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

TASK

  • 參數解釋

    • depends_on_past

      Airflow assumes idempotent tasks that operate on immutable data 
      chunks. It also assumes that all task instance (each task for each 
      schedule) needs to run.

      If your tasks need to be executed sequentially, you need to 
      tell Airflow: use the depends_on_past=True flag on the tasks 
      that require sequential execution.)

      若是在TASK本該運行卻沒有運行時,或者設置的interval@once時,推薦使用depends_on_past=False。我在運行dag時,有時會出現,明明上游任務已經運行結束,下游任務卻沒有啓動,整個dag就卡住了。這時設置depends_on_past=False能夠解決這類問題。

    • timestamp in format like 2016-01-01T00:03:00

    • Task中調用的命令出錯後須要在網站Graph view中點擊run手動重啓。 
      爲了方便任務修改後的順利運行,有個折衷的方法是:

      • 設置 email_on_retry: True
      • 設置較長的retry_delay,方便在收到郵件後,能有時間作出處理
      • 而後再修改成較短的retry_delay,方便快速啓動
  • 寫完task DAG後,必定記得先檢測下有無語法錯誤 python dag.py

  • 測試文件1:ct1.py

    from airflow import DAG from airflow.operators import BashOperator, MySqlOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', #爲了測試方便,起始時間通常爲當前時間減去schedule_interval 'start_date': datatime(2016, 5, 29, 8, 30), 'email': ['chentong_biology@163.com'], 'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } # DAG id 'ct1'必須在airflow中是unique的, 通常與文件名相同 # 多個用戶時可加用戶名作標記 dag = DAG('ct1', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) #cmd = "/home/test/test.bash " 注意末尾的空格 t2 = BashOperator( task_id='echo', bash_command='echo "test" ', retries=3, dag=dag) templated_command = """ {% for i in range(2) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': "Parameter I passed in"}, dag=dag) # This means that t2 will depend on t1 running successfully to run # It is equivalent to t1.set_downstream(t2) t2.set_upstream(t1) t3.set_upstream(t1) # all of this is equivalent to # dag.set_dependency('print_date', 'sleep') # dag.set_dependency('print_date', 'templated') 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
  • 測試文件2: ct2.py

    from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta one_min_ago = datetime.combine(datetime.today() - timedelta(minutes=1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': one_min_ago, 'email': ['chentong_biology@163.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 5, 'retry_delay': timedelta(hours=30), #'queue': 'bash_queue', #'pool': 'backfill', #'priority_weight': 10, #'end_date': datetime(2016, 5, 29, 11, 30), } dag = DAG('ct2', default_args=default_args, schedule_interval="@once") t1 = BashOperator( task_id='run1', bash_command='(cd /home/ct/test; bash run1.sh -f ct_t1) ', dag=dag) t2 = BashOperator( task_id='run2', bash_command='(cd /home/ct/test; bash run2.sh -f ct_t1) ', dag=dag) t2.set_upstream(t1) 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • run1.sh

    #!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} -z Is there a header[${bldred}Default TRUE${txtrst}] EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; z) header=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi cat <<END >$file A B C D E F G END sleep 20s
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
  • run2.sh

    #!/bin/bash #set -x set -e set -u usage() { cat <<EOF ${txtcyn} Usage: $0 options${txtrst} ${bldblu}Function${txtrst}: This script is used to do ********************. ${txtbld}OPTIONS${txtrst}: -f Data file ${bldred}[NECESSARY]${txtrst} EOF } file= header='TRUE' while getopts "hf:z:" OPTION do case $OPTION in h) usage exit 1 ;; f) file=$OPTARG ;; ?) usage exit 1 ;; esac done if [ -z $file ]; then usage exit 1 fi awk 'BEGIN{OFS=FS="\t"}{print $0, "53"}' $file >${file}.out

其它問題

  • The DagRun object has room for a conf parameter that gets exposed 
    in the 「context」 (templates, operators, …). That is the place 
    where you would associate parameters to a specific run. For now this 
    is only possible in the context of an externally triggered DAG run. 
    The way the TriggerDagRunOperator works, you can fill in the conf 
    param during the execution of the callable that you pass to the 
    operator.

    If you are looking to change the shape of your DAG through parameters, 
    we recommend doing that using 「singleton」 DAGs (using a 「@once」 
    schedule_interval), meaning that you would write a 
    Python program that generates multiple dag_ids, one of each run, 
    probably based on metadata stored in a config file or elsewhere.

    The idea is that if you use parameters to alter the shape of your 
    DAG, you break some of the assumptions around continuity of the 
    schedule. Things like visualizing the tree view or how to perform a 
    backfill becomes unclear and mushy. So if the shape of your DAG 
    changes radically based on parameters, we consider those to be 
    different DAGs, and you generate each one in your pipeline file.

  • 徹底刪掉某個DAG的信息

    set @dag_id = 'BAD_DAG'; delete from airflow.xcom where dag_id = @dag_id; delete from airflow.task_instance where dag_id = @dag_id; delete from airflow.sla_miss where dag_id = @dag_id; delete from airflow.log where dag_id = @dag_id; delete from airflow.job where dag_id = @dag_id; delete from airflow.dag_run where dag_id = @dag_id; delete from airflow.dag where dag_id = @dag_id;
  • supervisord自動管理進程
  • [program:airflow_webserver]
    command=/usr/local/bin/python2.7 /usr/local/bin/airflow webserver user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-webserver.err.log stdout_logfile=/var/log/airflow-webserver.out.log [program:airflow_worker] command=/usr/local/bin/python2.7 /usr/local/bin/airflow worker user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-worker.err.log stdout_logfile=/var/log/airflow-worker.out.log [program:airflow_scheduler] command=/usr/local/bin/python2.7 /usr/local/bin/airflow scheduler user=airflow environment=AIRFLOW_HOME="/home/airflow/airflow", PATH="/usr/local/bin:%(ENV_PATH)s" stderr_logfile=/var/log/airflow-scheduler.err.log stdout_logfile=/var/log/airflow-scheduler.out.log
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 在特定狀況下,修改DAG後,爲了不當前日期以前任務的運行,可使用backfill填補特定時間段的任務

    • airflow backfill -s START -e END --mark_success DAG_ID

端口轉發

  • 以前的配置都是在內網服務器進行的,但內網服務器只開放了SSH端口22,所以 
    我嘗試在另一臺電腦上使用相同的配置,而後設置端口轉發,把外網服務器 
    的rabbitmq的5672端口映射到內網服務器的對應端口,而後啓動airflow鏈接 

    • ssh -v -4 -NF -R 5672:127.0.0.1:5672 aliyun
    • 上一條命令表示的格式爲

      ssh -R <local port>:<remote host>:<remote port> <SSH hostname>

      local port表示hostname的port

      Remote connections from LOCALHOST:5672 forwarded to local address 127.0.0.1:5672

    • -v: 在測試時打開

    • -4: 出現錯誤」bind: Cannot assign requested address」時,force the 
      ssh client to use ipv4
    • 若出現」Warning: remote port forwarding failed for listen port 52698」 
      ,關掉其它的ssh tunnel。

不一樣機器使用airflow

  • 在外網服務器(用作任務分發服務器)配置與內網服務器相同的airflow模塊
  • 使用前述的端口轉發以便外網服務器繞過內網服務器的防火牆訪問rabbitmq 5672端口。
  • 在外網服務器啓動 airflow webserver scheduler, 在內網服務器啓動 
    airflow worker 發現任務執行狀態丟失。繼續學習Celery,以解決此問題。

安裝redis (最後沒用到)

任務未按預期運行可能的緣由

  • 檢查 start_date 和end_date是否在合適的時間範圍內
  • 檢查 airflow workerairflow scheduler和 
    airflow webserver --debug的輸出,有沒有某個任務運行異常
  • 檢查airflow配置路徑中logs文件夾下的日誌輸出
  • 若以上都沒有問題,則考慮數據衝突,解決方式包括清空數據庫或着給當前 
    dag一個新的dag_id

References

  1. https://pythonhosted.org/airflow/
  2. http://kintoki.farbox.com/post/ji-chu-zhi-shi/airflow
  3. http://www.jianshu.com/p/59d69981658a
  4. http://bytepawn.com/luigi-airflow-pinball.html
  5. https://github.com/airbnb/airflow
  6. https://media.readthedocs.org/pdf/airflow/latest/airflow.pdf
  7. http://www.csdn.net/article/1970-01-01/2825690
  8. http://www.cnblogs.com/harrychinese/p/airflow.html
  9. http://www.javashuo.com/article/p-fpqshddf-z.html

聲明

文章原寫於http://blog.genesino.com/2016/05/airflow/。轉載請註明出處。

相關文章
相關標籤/搜索