airflow的安裝


官方文檔文檔: 
http://airflow.incubator.apache.org/project.htmlhtml

 

1.環境準備

1.1 安裝環境

  • centos 6.7 (docker)
  • python 2.7.13

docker run --name airflow -h airflow -dti --net hadoopnet --ip=172.18.0.20 -p 10131:22 -v /dfs/centos/airflow/home:/home -v /dfs/centos/airflow/opt:/opt yangxw/centos:6.7python

1.2 建立用戶

[root@airflow ~]# groupadd airflow
[root@airflow ~]# useradd airflow -g airflow

2.安裝airflow

2.1 安裝python

官網只有source包,因此必須編譯安裝。 
參考:編譯安裝python2.7.13 
因爲編譯python須要升級gcc,進而須要編譯gcc,太複雜,所以直接下載python的集成環境Anaconda便可. 
wegt https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/mysql

2.2 安裝pip

anacconda中集成了pip,直接使用便可.git

2.3 安裝數據庫

airflow支持mysql postgrey oracle等。這裏postgrey.使用yum install postgrey安裝便可.github

2.4 安裝airflow

airflow組件能夠模塊化安裝,用到哪一個組件安裝哪一個組件,如: 
web

2.4.1 安裝主模塊

安裝主模塊redis

[airflow@airflow ~]$ pip install airflow

2.4.2 安裝數據庫模塊、密碼模塊

[airflow@airflow ~]$ pip install "airflow[postgres,password]"

2.5 配置airflown

2.5.1 設置環境變量

先設置$AIRFLOW_HOME環境變量。首次執行airflow命令時,會在$AIRFLOW_HOME下面建立airflow的配置文件airflow.cfg。sql

[airflow@airflow ~]$ vi .bashrc
export AIRFLOW_HOME=/home/airflow/airflow01
[airflow@airflow ~]$ airflow
[2017-05-08 02:00:04,677] {__init__.py:57} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               {resetdb,render,variables,connections,pause,task_failed_deps,version,trigger_dag,initdb,test,unpause,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,upgradedb}
               …
airflow: error: too few arguments
[airflow@airflow ~]$ ll airflow01/
total 16
-rw-rw-r-- 1 airflow airflow 11418 May  8 02:00 airflow.cfg
-rw-rw-r-- 1 airflow airflow  1549 May  8 02:00 unittests.cfg

2.5.2 修改配置文件

查看airflow.cfg文件,整個文件分爲core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin幾個部分。 
對其中一些參數作修改,其它的保持默認值便可:docker

[core]
airflow_home = /home/airflow/airflow01
dags_folder = /home/airflow/airflow01/dags #dag python文件目錄 
executor = LocalExecutor #先使用local模式
base_log_folder = /home/airflow/airflow01/logs #主日誌目錄
sql_alchemy_conn = postgresql+psycopg2://yangxiaowen:yangxiaowen@10.38.1.78:5432/yangxiaowen
load_examples = True
default_impersonation = xiaowen.yang
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth #1.8.1版本中cfg文件沒有寫這個參數,必定要加上,否則會報"airflow.exceptions.AirflowException: Failed to import authentication backend"錯誤
filter_by_owner = true
web_server_host = XXX.XXX.XXX.XXX  #web server 機器IP
base_url = http://XXX.XXX.XXX.XXX:8080  #web server 機器IP:PORT
[smtp]
smtp_host = smtp.exmail.qq.com
smtp_user = bd-no-reply@bqjr.cn
smtp_password = BQJRbd@2016
smtp_mail_from = bd-no-reply@bqjr.cn

3. 啓動airflow

3.1 初始化數據庫

[airflow@airflow ~]$ airflow initdb

3.2 建立用戶

$ python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

3.3 啓動airflow

[airflow@airflow ~]$ airflow webserver -p 8080

[airflow@airflow ~]$ airflow scheduler

若是不出錯就啓動成功了. 
能夠在頁面上查看airflow的頁面. 
數據庫

4.執行任務

airflow中的任務都是python程序.下面建立一個簡單的python程序. 
在$AIRFLOW_HOME下建立dags\logs目錄.

vi testBashOperator.py
#!/usr/bin/python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'yangxw',
    'depends_on_past': False,
    'start_date': datetime(2017, 5, 9),
    'email': ['xiaowen.yang@bqjr.cn'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('testBashOperator', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)

airflow webserver --debug=True

執行 python testBashOperator.py編譯該文件,而後執行 airflow run testBashOperator print_date 2017-05-09 執行文件,在頁面上能看到dag信息. 

5.安裝celery

celery是一個分佈式消息隊列,在airflow中,使用celeryExecutor能夠動態的增長worker個數並將任務在遠程機器上執行.生產中建議使用celeryExecutor來執行.

5.1 安裝celery模塊

pip install airflow[celery]

5.2 安裝celery broker

celery須要設置broker和result隊列(能夠用一樣的)來保存消息.celery 支持多種broker: 

5.2.1 使用RabbitMQ做爲broker

  1. 安裝airflow的RabbitMQ模塊 
    celery可使用RabbitMQ或者redias等作爲broker,甚至可使用一些Experimental(實驗性的)工具(如sqlalchemy支持的數據庫),默認使用RabbitMQ. 
    pip install airflow[rabbitmq]
  2. 安裝RabbitMQ-server 
    yum install rabbitmq-server 
    (有160多個依賴包!) 
    而後啓動service rabbitmq-server start
  3. 配置 rabbitmq 
    http://blog.csdn.net/qazplm12_3/article/details/53065654
rabbitmqctl add_user ct 152108
rabbitmqctl add_vhost ct_airflow
rabbitmqctl set_user_tags ct airflow
rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*"

5.2.2 使用Redis作爲broker

  1. 安裝celery redis模塊 
    pip install -U "celery[redis]"
  2. 安裝redis數據庫 
    yum install redis
  3. 啓動redis 
    service redis start 
    4.修改airflow配置文件 
    broker_url = redis://localhost:6379/0 
    celery_result_backend = redis://localhost:6379/0

5.3 修改airflow配置文件啓用celery

修改airflow.cfg文件: 
[core] 
executor = CeleryExecutor 
[celery] 
broker_url = amqp://ct:152108@localhost:5672/ct_airflow 
celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow

5.4 測試celery

[airflow@airflow ~]$ airflow webserver -p 8100
[airflow@airflow ~]$ airflow scheduler
[airflow@airflow ~]$ airflow worker  #啓動celeryexcutor

能夠看到CeleryExecutor啓動狀況.再執行airflow run testBashOperator print_date 2017-05-09,看看CeleryExecutor運行狀況.

5.5 部署多個worker

在須要運行做業的機器上的安裝airflow airflow[celery] celery[redis] 模塊後,啓動airflow worker便可.這樣做業就能運行在多個節點上.

6. 問題

在docker中遇到如下問題,換成實體機後解決

[2017-05-10 09:14:59,777: ERROR/Worker-1] Command 'airflow run testFile echoDate 2017-05-10T00:00:00 --local -sd DAGS_FOLDER/testFile.py' returned non-zero exit status 1
[2017-05-10 09:14:59,783: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c5d5ea39-0141-46bb-b33a-06a924c07508] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/anaconda2/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 59, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed

參考:

http://airflow.incubator.apache.org 
http://www.javashuo.com/article/p-xypmfobn-nu.html 
http://blog.csdn.net/qazplm12_3/article/details/53065654 
http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html 
http://www.rabbitmq.com/install-rpm.html 

相關文章
相關標籤/搜索