airflow安裝與使用

1.安裝環境html

CentOS-6.5
Python-2.7.12
setuptools-29.0.1
pip-9.0.1

2.編譯Pythonpython

sudo yum install -y gcc
sudo yum install -y gcc-c++
sudo yum install -y wget
sudo yum install -y mysql
sudo yum install -y mysql-devel
sudo yum install -y python-devel
sudo yum install -y zlib-devel
sudo yum install -y openssl-devel
sudo yum install -y sqlite-devel
wget https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz

sudo mkdir /usr/local/python27
sudo tar zxfv Python-2.7.12.tgz -C /usr/local/
cd /usr/local/Python-2.7.12/
./configure --prefix=/usr/local/python27
make
make install
sudo mv /usr/bin/python /usr/bin/python2.6
sudo ln -sf /usr/local/python/bin/python /usr/bin/python2.7

vim /usr/bin/yum
#!/usr/bin/python2.6

vim /etc/profile
export PYTHON_HOME=/usr/bin/python2.6
export PATH=$PYTHON_HOME/bin:$PATH

wget https://pypi.python.org/packages/59/88/2f3990916931a5de6fa9706d6d75eb32ee8b78627bb2abaab7ed9e6d0622/setuptools-29.0.1.tar.gz#md5=28ecfd0f2574b489b9a18343879a7324
tar zxfv setuptools-29.0.1.tar.gz
cd setuptools-29.0.1
python setup.py install

wget https://pypi.python.org/packages/11/b6/abcb525026a4be042b486df43905d6893fb04f05aac21c32c638e939e447/pip-9.0.1.tar.gz#md5=35f01da33009719497f01a4ba69d63c9
tar zxfv pip-9.0.1.tar.gz
cd pip-9.0.1
python setup.py install
pip install --upgrade pip

wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c
unzip MySQL-python-1.2.5.zip
cd MySQL-python-1.2.5
python setup.py install

#第三方包 /usr/local/python27/lib/python2.7/site-packages

3.安裝mysql

    airflow經過pip能夠方便的安裝到系統中。c++

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=/usr/local/airflow

# install from pypi using pip
pip install airflow
pip install airflow[hive]

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

4.設置mysql爲元數據庫web

#首先要安裝mysql客戶端
sudo yum install -y mysql
sudo yum install -y mysql-devel

CREATE USER airflow;
CREATE DATABASE airflow;
CREATE DATABASE celery_result_airflow;

GRANT all privileges on airflow.* TO 'airflow'@'%' IDENTIFIED BY 'airflow';
GRANT all privileges on celery_result_airflow.* TO 'airflow'@'%' IDENTIFIED BY 'airflow';

#安裝mysql模塊
wget https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c
unzip MySQL-python-1.2.5.zip
cd MySQL-python-1.2.5
python setup.py install

#在airflow的配置文件中配置mysql爲元數據的存儲庫
sudo vi $AIRFLOW_HOME/airflow.cfg

#更改數據庫連接:
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

#對應字段解釋以下:
dialect+driver://username:password@host:port/database

#初始化元數據庫
airflow initdb

#重置元數據庫
airflow resetdb

5.安裝登陸模塊sql

#安裝password模塊
pip install airflow[password]

#在airflow的配置文件中修改須要認證
sudo vi $AIRFLOW_HOME/airflow.cfg
[webserver]
authenticate = True
filter_by_owner = True
auth_backend = airflow.contrib.auth.backends.password_auth

運行如下代碼將用戶名密碼寫入元數據庫中docker

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'quzhengpeng'
user.email = 'quzhengpeng@163.com'
user.password = 'quzhengpeng'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

5.啓動守護進程數據庫

    啓動後臺守護進程了以後,Airflow才能實時監控任務的調度狀況。將任務腳本放到${AIRFLOW_HOME}/dags下在web UI 就能看到任務執行狀況。apache

airflow scheduler

6.啓動web服務vim

#啓動web進程
airflow webserver -p 8080

#關閉CentOS6的防火牆
sudo service iptables stop

#關閉CentOS6的SELinux
setenforce 0

#關閉CentOS7的防火牆
systemctl stop firewalld.service

#禁止firewall開機啓動
systemctl disable firewalld.service

 

Celery+MySQL

#Celery文檔 http://docs.jinkan.org/docs/celery/index.html
#Celery4.0.0在airflow中有一些問題,因此安裝Celery3
pip install -U Celery==3.1.24
pip install airflow[celery]

修改配置文件

vi airflow.cfg

[core]
executor = CeleryExecutor

[celery]
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow

celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

啓動airflow

airflow webserver -p 8080

airflow scheduler

#以非root用戶運行
airflow worker

#啓動Celery WebUI 查看celery任務
airflow flower 
http://localhost:5555/

 

Celery+RabbitMQ

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

#安裝RabbitMQ的依賴包
yum install erlang

yum install socat

#若是下載了rabbitmq的yum源 sudo yum install -y rabbitmq-server
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

啓動RabbitMQ服務

#啓動rabbitmq服務
sudo service rabbitmq-server start 
#或者
sudo rabbitmq-server

#添加 -detached 屬性來讓它在後臺運行(注意:只有一個破折號)
sudo rabbitmq-server -detached

#設置開機啓動rabbitmq服務
chkconfig rabbitmq-server on

#永遠不要用 kill 中止 RabbitMQ 服務器,而是應該用 rabbitmqctl 命令
sudo rabbitmqctl stop

設置RabbitMQ

#建立一個RabbitMQ用戶
rabbitmqctl add_user airflow airflow

#建立一個RabbitMQ虛擬主機
rabbitmqctl add_vhost vairflow

#將這個用戶賦予admin的角色
rabbitmqctl set_user_tags airflow admin

#容許這個用戶訪問這個虛擬主機
rabbitmqctl set_permissions -p vairflow airflow ".*" ".*" ".*"

# no usage
rabbitmq-plugins enable rabbitmq_management

修改airflow配置文件支持Celery

vi $AIRFLOW_HOME/airflow/airflow.cfg

#更改Executor爲CeleryExecutor
executor = CeleryExecutor

#更改broker_url
broker_url = amqp://airflow:airflow@localhost:5672/vairflow
Format explanation: transport://userid:password@hostname:port/virtual_host

#更改celery_result_backend
celery_result_backend = amqp://airflow:airflow@localhost:5672/vairflow
Format explanation: transport://userid:password@hostname:port/virtual_host

安裝airflow的celery和rabbitmq模塊

pip install airflow[celery]
pip install airflow[rabbitmq]

 

airflow使用DAG(Directed Acyclic Graph,有向無環圖爲)來管理做業流的

#建立DAG
from datetime import datetime, timedelta
from airflow.models import DAG
args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retries_delay': timedelta(seconds=60),
    'depends_on_past': True
}

dag = DAG(
    dag_id='dag',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60)
)

建立任務將任務添加到DAG中

from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

demo = DummyOperator(
    task_id='demo',
    dag=dag
)

last_execute = BashOperator(
    task_id='last_execute',
    bash_command='echo 1',
    dag=dag
)

配置任務的依賴關係

demo.set_downstream(last_execute)

 

https://hub.docker.com/r/camil/airflow/

https://dwtobigdata.wordpress.com/2016/01/14/designing-workflow-with-airflow/

http://www.jianshu.com/p/59d69981658a

http://www.javashuo.com/article/p-fpqshddf-z.html

http://www.tuicool.com/articles/A3yIri6

http://ju.outofmemory.cn/entry/245373

http://blog.csdn.net/permike/article/details/51898213

http://www.cnblogs.com/harrychinese/p/airflow.html

http://stackoverflow.com/questions/37785061/unable-to-start-airflow-worker-flower-and-need-clarification-on-airflow-architec?rq=1

http://stackoverflow.com/questions/19689510/celery-flower-security-in-production

相關文章
相關標籤/搜索