1.安裝環境html
CentOS-6.5 Python-2.7.12 setuptools-29.0.1 pip-9.0.1
2.編譯Pythonpython
sudo yum install -y gcc sudo yum install -y gcc-c++ sudo yum install -y wget sudo yum install -y mysql sudo yum install -y mysql-devel sudo yum install -y python-devel sudo yum install -y zlib-devel sudo yum install -y openssl-devel sudo yum install -y sqlite-devel wget https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz sudo mkdir /usr/local/python27 sudo tar zxfv Python-2.7.12.tgz -C /usr/local/ cd /usr/local/Python-2.7.12/ ./configure --prefix=/usr/local/python27 make make install sudo mv /usr/bin/python /usr/bin/python2.6 sudo ln -sf /usr/local/python/bin/python /usr/bin/python2.7 vim /usr/bin/yum #!/usr/bin/python2.6 vim /etc/profile export PYTHON_HOME=/usr/bin/python2.6 export PATH=$PYTHON_HOME/bin:$PATH wget https://pypi.python.org/packages/59/88/2f3990916931a5de6fa9706d6d75eb32ee8b78627bb2abaab7ed9e6d0622/setuptools-29.0.1.tar.gz#md5=28ecfd0f2574b489b9a18343879a7324 tar zxfv setuptools-29.0.1.tar.gz cd setuptools-29.0.1 python setup.py install wget https://pypi.python.org/packages/11/b6/abcb525026a4be042b486df43905d6893fb04f05aac21c32c638e939e447/pip-9.0.1.tar.gz#md5=35f01da33009719497f01a4ba69d63c9 tar zxfv pip-9.0.1.tar.gz cd pip-9.0.1 python setup.py install pip install --upgrade pip wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c unzip MySQL-python-1.2.5.zip cd MySQL-python-1.2.5 python setup.py install #第三方包 /usr/local/python27/lib/python2.7/site-packages
3.安裝mysql
airflow經過pip能夠方便的安裝到系統中。c++
# airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=/usr/local/airflow # install from pypi using pip pip install airflow pip install airflow[hive] # initialize the database airflow initdb # start the web server, default port is 8080 airflow webserver -p 8080
4.設置mysql爲元數據庫web
#首先要安裝mysql客戶端 sudo yum install -y mysql sudo yum install -y mysql-devel CREATE USER airflow; CREATE DATABASE airflow; CREATE DATABASE celery_result_airflow; GRANT all privileges on airflow.* TO 'airflow'@'%' IDENTIFIED BY 'airflow'; GRANT all privileges on celery_result_airflow.* TO 'airflow'@'%' IDENTIFIED BY 'airflow'; #安裝mysql模塊 wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c unzip MySQL-python-1.2.5.zip cd MySQL-python-1.2.5 python setup.py install #在airflow的配置文件中配置mysql爲元數據的存儲庫 sudo vi $AIRFLOW_HOME/airflow.cfg #更改數據庫連接: sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow #對應字段解釋以下: dialect+driver://username:password@host:port/database #初始化元數據庫 airflow initdb #重置元數據庫 airflow resetdb
5.安裝登陸模塊sql
#安裝password模塊 pip install airflow[password] #在airflow的配置文件中修改須要認證 sudo vi $AIRFLOW_HOME/airflow.cfg [webserver] authenticate = True filter_by_owner = True auth_backend = airflow.contrib.auth.backends.password_auth
運行如下代碼將用戶名密碼寫入元數據庫中docker
import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'quzhengpeng' user.email = 'quzhengpeng@163.com' user.password = 'quzhengpeng' session = settings.Session() session.add(user) session.commit() session.close() exit()
5.啓動守護進程數據庫
啓動後臺守護進程了以後,Airflow才能實時監控任務的調度狀況。將任務腳本放到${AIRFLOW_HOME}/dags下在web UI 就能看到任務執行狀況。apache
airflow scheduler
6.啓動web服務vim
#啓動web進程 airflow webserver -p 8080 #關閉CentOS6的防火牆 sudo service iptables stop #關閉CentOS6的SELinux setenforce 0 #關閉CentOS7的防火牆 systemctl stop firewalld.service #禁止firewall開機啓動 systemctl disable firewalld.service
Celery+MySQL
#Celery文檔 http://docs.jinkan.org/docs/celery/index.html #Celery4.0.0在airflow中有一些問題,因此安裝Celery3 pip install -U Celery==3.1.24 pip install airflow[celery]
修改配置文件
vi airflow.cfg [core] executor = CeleryExecutor [celery] broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
啓動airflow
airflow webserver -p 8080 airflow scheduler #以非root用戶運行 airflow worker #啓動Celery WebUI 查看celery任務 airflow flower http://localhost:5555/
Celery+RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm #安裝RabbitMQ的依賴包 yum install erlang yum install socat #若是下載了rabbitmq的yum源 sudo yum install -y rabbitmq-server rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
啓動RabbitMQ服務
#啓動rabbitmq服務 sudo service rabbitmq-server start #或者 sudo rabbitmq-server #添加 -detached 屬性來讓它在後臺運行(注意:只有一個破折號) sudo rabbitmq-server -detached #設置開機啓動rabbitmq服務 chkconfig rabbitmq-server on #永遠不要用 kill 中止 RabbitMQ 服務器,而是應該用 rabbitmqctl 命令 sudo rabbitmqctl stop
設置RabbitMQ
#建立一個RabbitMQ用戶 rabbitmqctl add_user airflow airflow #建立一個RabbitMQ虛擬主機 rabbitmqctl add_vhost vairflow #將這個用戶賦予admin的角色 rabbitmqctl set_user_tags airflow admin #容許這個用戶訪問這個虛擬主機 rabbitmqctl set_permissions -p vairflow airflow ".*" ".*" ".*" # no usage rabbitmq-plugins enable rabbitmq_management
修改airflow配置文件支持Celery
vi $AIRFLOW_HOME/airflow/airflow.cfg #更改Executor爲CeleryExecutor executor = CeleryExecutor #更改broker_url broker_url = amqp://airflow:airflow@localhost:5672/vairflow Format explanation: transport://userid:password@hostname:port/virtual_host #更改celery_result_backend celery_result_backend = amqp://airflow:airflow@localhost:5672/vairflow Format explanation: transport://userid:password@hostname:port/virtual_host
安裝airflow的celery和rabbitmq模塊
pip install airflow[celery] pip install airflow[rabbitmq]
airflow使用DAG(Directed Acyclic Graph,有向無環圖爲)來管理做業流的
#建立DAG from datetime import datetime, timedelta from airflow.models import DAG args = { 'owner': 'airflow', 'start_date': seven_days_ago, 'email': ['airflow@airflow.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 3, 'retries_delay': timedelta(seconds=60), 'depends_on_past': True } dag = DAG( dag_id='dag', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60) )
建立任務將任務添加到DAG中
from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator demo = DummyOperator( task_id='demo', dag=dag ) last_execute = BashOperator( task_id='last_execute', bash_command='echo 1', dag=dag )
配置任務的依賴關係
demo.set_downstream(last_execute)
https://hub.docker.com/r/camil/airflow/
https://dwtobigdata.wordpress.com/2016/01/14/designing-workflow-with-airflow/
http://www.jianshu.com/p/59d69981658a
http://www.javashuo.com/article/p-fpqshddf-z.html
http://www.tuicool.com/articles/A3yIri6
http://ju.outofmemory.cn/entry/245373
http://blog.csdn.net/permike/article/details/51898213
http://www.cnblogs.com/harrychinese/p/airflow.html
http://stackoverflow.com/questions/19689510/celery-flower-security-in-production