1.1 安裝環境
1.2 建立用戶
2.安裝airflow
2.1 安裝python
2.2 安裝pip
2.3 安裝數據庫
2.4 安裝airflow
2.4.1 安裝主模塊
2.4.2 安裝數據庫模塊、密碼模塊
2.5 配置airflown
2.5.1 設置環境變量
2.5.2 修改配置文件
3. 啓動airflow
3.1 初始化數據庫
3.2 建立用戶
3.3 啓動airflow
4.執行任務
5.安裝celery
5.1 安裝celery模塊
5.2 安裝celery broker
5.2.1 使用RabbitMQ做爲broker
5.2.2 使用Redis作爲broker
5.3 修改airflow配置文件啓用celery
5.4 測試celery
5.5 部署多個worker
6. 問題
官方文檔文檔:
http://airflow.incubator.apache.org/project.htmlhtml
1.環境準備
1.1 安裝環境
- centos 6.7 (docker)
- python 2.7.13
docker run --name airflow -h airflow -dti --net hadoopnet --ip=172.18.0.20 -p 10131:22 -v /dfs/centos/airflow/home:/home -v /dfs/centos/airflow/opt:/opt yangxw/centos:6.7python
1.2 建立用戶
[root@airflow ~]# groupadd airflow [root@airflow ~]# useradd airflow -g airflow
2.安裝airflow
2.1 安裝python
官網只有source包,因此必須編譯安裝。
參考:編譯安裝python2.7.13
因爲編譯python須要升級gcc,進而須要編譯gcc,太複雜,所以直接下載python的集成環境Anaconda便可. wegt https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/
mysql
2.2 安裝pip
anacconda中集成了pip,直接使用便可.git
2.3 安裝數據庫
airflow支持mysql postgrey oracle等。這裏postgrey.使用yum install postgrey安裝便可.github
2.4 安裝airflow
airflow組件能夠模塊化安裝,用到哪一個組件安裝哪一個組件,如: web
2.4.1 安裝主模塊
安裝主模塊redis
[airflow@airflow ~]$ pip install airflow
2.4.2 安裝數據庫模塊、密碼模塊
[airflow@airflow ~]$ pip install "airflow[postgres,password]"
2.5 配置airflown
2.5.1 設置環境變量
先設置$AIRFLOW_HOME環境變量。首次執行airflow命令時,會在$AIRFLOW_HOME下面建立airflow的配置文件airflow.cfg。sql
[airflow@airflow ~]$ vi .bashrc export AIRFLOW_HOME=/home/airflow/airflow01
[airflow@airflow ~]$ airflow [2017-05-08 02:00:04,677] {__init__.py:57} INFO - Using executor SequentialExecutor usage: airflow [-h] {resetdb,render,variables,connections,pause,task_failed_deps,version,trigger_dag,initdb,test,unpause,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,upgradedb} … airflow: error: too few arguments [airflow@airflow ~]$ ll airflow01/ total 16 -rw-rw-r-- 1 airflow airflow 11418 May 8 02:00 airflow.cfg -rw-rw-r-- 1 airflow airflow 1549 May 8 02:00 unittests.cfg
2.5.2 修改配置文件
查看airflow.cfg文件,整個文件分爲core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin幾個部分。
對其中一些參數作修改,其它的保持默認值便可:docker
[core] airflow_home = /home/airflow/airflow01 dags_folder = /home/airflow/airflow01/dags #dag python文件目錄 executor = LocalExecutor #先使用local模式 base_log_folder = /home/airflow/airflow01/logs #主日誌目錄 sql_alchemy_conn = postgresql+psycopg2://yangxiaowen:yangxiaowen@10.38.1.78:5432/yangxiaowen load_examples = True default_impersonation = xiaowen.yang [webserver] authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth #1.8.1版本中cfg文件沒有寫這個參數,必定要加上,否則會報"airflow.exceptions.AirflowException: Failed to import authentication backend"錯誤 filter_by_owner = true web_server_host = XXX.XXX.XXX.XXX #web server 機器IP base_url = http://XXX.XXX.XXX.XXX:8080 #web server 機器IP:PORT [smtp] smtp_host = smtp.exmail.qq.com smtp_user = bd-no-reply@bqjr.cn smtp_password = BQJRbd@2016 smtp_mail_from = bd-no-reply@bqjr.cn
3. 啓動airflow
3.1 初始化數據庫
[airflow@airflow ~]$ airflow initdb
3.2 建立用戶
$ python Python 2.7.9 (default, Feb 10 2015, 03:28:08) Type "help", "copyright", "credits" or "license" for more information. >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'new_user_name' >>> user.email = 'new_user_email@example.com' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
3.3 啓動airflow
[airflow@airflow ~]$ airflow webserver -p 8080 [airflow@airflow ~]$ airflow scheduler
若是不出錯就啓動成功了.
能夠在頁面上查看airflow的頁面. 數據庫
4.執行任務
airflow中的任務都是python程序.下面建立一個簡單的python程序.
在$AIRFLOW_HOME下建立dags\logs目錄.
vi testBashOperator.py #!/usr/bin/python from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'yangxw', 'depends_on_past': False, 'start_date': datetime(2017, 5, 9), 'email': ['xiaowen.yang@bqjr.cn'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('testBashOperator', default_args=default_args) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t2.set_upstream(t1) airflow webserver --debug=True
執行 python testBashOperator.py編譯該文件,而後執行 airflow run testBashOperator print_date 2017-05-09 執行文件,在頁面上能看到dag信息.
5.安裝celery
celery是一個分佈式消息隊列,在airflow中,使用celeryExecutor能夠動態的增長worker個數並將任務在遠程機器上執行.生產中建議使用celeryExecutor來執行.
5.1 安裝celery模塊
pip install airflow[celery]
5.2 安裝celery broker
celery須要設置broker和result隊列(能夠用一樣的)來保存消息.celery 支持多種broker:
5.2.1 使用RabbitMQ做爲broker
- 安裝airflow的RabbitMQ模塊
celery可使用RabbitMQ或者redias等作爲broker,甚至可使用一些Experimental(實驗性的)工具(如sqlalchemy支持的數據庫),默認使用RabbitMQ.pip install airflow[rabbitmq]
- 安裝RabbitMQ-server
yum install rabbitmq-server
(有160多個依賴包!)
而後啓動service rabbitmq-server start
- 配置 rabbitmq
http://blog.csdn.net/qazplm12_3/article/details/53065654
rabbitmqctl add_user ct 152108 rabbitmqctl add_vhost ct_airflow rabbitmqctl set_user_tags ct airflow rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*"
5.2.2 使用Redis作爲broker
- 安裝celery redis模塊
pip install -U "celery[redis]"
- 安裝redis數據庫
yum install redis
- 啓動redis
service redis start
4.修改airflow配置文件
broker_url = redis://localhost:6379/0
celery_result_backend = redis://localhost:6379/0
5.3 修改airflow配置文件啓用celery
修改airflow.cfg文件:
[core]
executor = CeleryExecutor
[celery]
broker_url = amqp://ct:152108@localhost:5672/ct_airflow
celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow
5.4 測試celery
[airflow@airflow ~]$ airflow webserver -p 8100 [airflow@airflow ~]$ airflow scheduler [airflow@airflow ~]$ airflow worker #啓動celeryexcutor
能夠看到CeleryExecutor啓動狀況.再執行airflow run testBashOperator print_date 2017-05-09,看看CeleryExecutor運行狀況.
5.5 部署多個worker
在須要運行做業的機器上的安裝airflow airflow[celery] celery[redis] 模塊後,啓動airflow worker便可.這樣做業就能運行在多個節點上.
6. 問題
在docker中遇到如下問題,換成實體機後解決
[2017-05-10 09:14:59,777: ERROR/Worker-1] Command 'airflow run testFile echoDate 2017-05-10T00:00:00 --local -sd DAGS_FOLDER/testFile.py' returned non-zero exit status 1 [2017-05-10 09:14:59,783: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c5d5ea39-0141-46bb-b33a-06a924c07508] raised unexpected: AirflowException('Celery command failed',) Traceback (most recent call last): File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/opt/anaconda2/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 59, in execute_command raise AirflowException('Celery command failed') AirflowException: Celery command failed
參考:
http://airflow.incubator.apache.org
http://www.javashuo.com/article/p-xypmfobn-nu.html
http://blog.csdn.net/qazplm12_3/article/details/53065654
http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
http://www.rabbitmq.com/install-rpm.html