airflow + CeleryExecutor 環境搭建

airflow整合環境搭建

1. 總體結構

mysql -> 後端數據庫html

redis -> 用於brokernode

CeleryExecutor -> 執行器python

2. 環境安裝

2.1,安裝python anaconda環境
添加py用戶
# useradd py
設置密碼
# passwd py
建立anaconda安裝路徑
# mkdir /anaconda
賦權
# chown -R py:py /anaconda

上傳anaconda安裝包並用py用戶運行安裝程序
$ chmod +x Anaconda3-5.1.0-Linux-x86_64.sh
$ ./Anaconda3-5.1.0-Linux-x86_64.sh
Welcome to Anaconda3 5.1.0
In order to continue the installation process, please review the license
......
- Press ENTER to confirm the location
- Press CTRL-C to abort the installation
- Or specify a different location below
[/home/py/anaconda3] >>> /anaconda/anaconda3      輸入自定義安裝路徑,若是用默認的話回車跳過
而後將anaconda加入環境變量,並使其生效

$ vi .bash_profile 
在最後一行加上以下配置:
export PATH=/anaconda/anaconda3/bin:$PATH
而後使其生效:
$source .bash_profile

檢測一下安裝結果,出現如下結果說明安裝成功:
$ python -V
Python 3.6.4 :: Anaconda, Inc.

配置pipy源:
$ mkdir ~/.pip
$ touch ~/.pip/pip.conf
$ echo '[global]' >> ~/.pip/pip.conf
$ echo 'trusted-host=mirrors.aliyun.com' >> ~/.pip/pip.conf
$ echo 'index-url=http://mirrors.aliyun.com/pypi/simple/' >> ~/.pip/pip.conf
2.2,安裝mysql相關依賴
去mysql官網下載mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar安裝包並上傳至服務器:
# tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar
檢查服務器是否有舊版本依賴:
# rpm -qa|grep mysql-libs-5.1.73|wc -l
若是結果大於0則執行以下命令卸載舊依賴:
# rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
若是等於0則不須要此操做.
而後依次安裝如下依賴:
# rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm
# rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm
# rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm
2.3,安裝airflow相關模塊
$ pip install apache-airflow[celery]
$ pip install apache-airflow[redis]
$ pip install apache-airflow[mysql]
檢測一下安裝結果:
$ airflow -h
若是顯示正常則表示安裝成功,而且用戶根目錄會出現airflow文件夾
2.4,安裝mysql
上傳mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar至須要安裝mysql的服務器上:
# tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar
檢查服務器是否有舊版本依賴:
# rpm -qa|grep mysql-libs-5.1.73|wc -l
若是結果大於0則執行以下命令卸載舊依賴:
# rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
若是等於0則不須要此操做.
而後依次安裝如下安裝:
# rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm 
# rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm 
# rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm 
# rpm -ivh mysql-community-client-5.7.22-1.el6.x86_64.rpm
# rpm -ivh mysql-community-server-5.7.22-1.el6.x86_64.rpm 

# vi /etc/my.cnf
尾部添加:
#關閉TIMESTAMP列自動生成值
explicit_defaults_for_timestamp=1
#跳過權限認證
skip-grant-tables

# service mysqld start
# mysql -u root
用於測試話劇因此密碼設置的比較簡單,僅供測試:
mysql> use mysql
mysql> update user set password_expired='N' where user='root';    
mysql> update user set authentication_string=password('123456') where user='root'; 

編輯/etc/my.cnf去掉skip-grant-tables 並重啓mysql:
# service mysqld restart
# mysql -u root -p
使用密碼123456登錄
#下降密碼複雜度要求僅僅用於測試
mysql> set global validate_password_policy=0; 
mysql> set global validate_password_length=4;
mysql> SET PASSWORD = PASSWORD('123456');
mysql> flush privileges;
針對airflow使用建立數據庫,添加用戶並受權
mysql> CREATE DATABASE airflow;
mysql> CREATE USER 'af'@'localhost' IDENTIFIED BY '123456';
mysql> GRANT all privileges on airflow.* TO 'af'@'localhost' IDENTIFIED BY '123456';
mysql> GRANT all privileges on airflow.* TO 'af'@'%' IDENTIFIED BY '123456';
mysql> flush privileges;
帳戶測試:
# mysql -u af -p
使用123456登錄
2.5,安裝redis(使用redis做爲broker)[方案一]
  • 安裝redis:
redis官網下載redis-4.0.9.tar.gz安裝包,並上傳至須要安裝redis的服務器:
$ tar zxvf redis-4.0.9.tar.gz
$ cd redis-4.0.9
$ make
$ cp redis.conf src/
$ cd src
編輯配置文件redis.conf將bind屬性改成bind 0.0.0.0
啓動redis
$ nohup ./redis-server redis.conf > output.log 2>&1 &
  • airflow對應配置:
若是執行過airflow -h命令後,則用戶目錄下面會出現一個airflow文件夾, airflow文件夾下面有個airflow.cfg
的文件,這個就是airflow的配置文件;
編輯airflow.cfg文件,修改一下內容,具體狀況根據實際狀況填寫[ip和端口]:
[core] 
#sql_alchemy_conn = mysql://[username]:[password]@[host]:[port]/airflow
sql_alchemy_conn = mysql://af:123456@localhost/airflow
executor = CeleryExecutor 
[celery] 
broker_url = redis://localhost:6379/0
celery_result_backend =  redis://localhost:6379/0

配置完成以後便可進行數據庫初始化:
$ airflow initdb
2.6,安裝rabbitmq(使用rabbitmq做爲broker)[方案二]
  • yum安裝:
安裝centos擴展源
# yum -y install epel-release
安裝erlang運行環境以及rabbitmq
# yum install erlang
# yum install rabbitmq-server
  • rpm手動安裝:

通常yum源安裝的erlang版本過低,能夠從erlang官網下載打包好的rpm包手動安裝,避免源碼編譯安裝:mysql

# wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_21.0-1~centos~6_amd64.rpm
# rpm -ivh esl-erlang_21.0-1~centos~6_amd64.rpm
# wget http://www.rabbitmq.com/releases/erlang/esl-erlang-compat-18.1-1.noarch.rpm
# rpm -ivh esl-erlang-compat-18.1-1.noarch.rpm
  • rabbitmq配置:

添加用戶並開啓遠程訪問:web

相關格式以下:redis

#下面建立用戶密碼的過程以該url爲示例
broker_url = 'pyamqp://myuser:mypassword@localhost:5672/myvhost'

如下過程爲建立用戶myuser設置密碼爲mypassword 添加一個virtual host並容許用戶訪問該virtual hostsql

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_user_tags myuser mytag
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

示例:shell

$ rabbitmqctl add_user cord 123456
$ rabbitmqctl set_user_tags cord administrator  #這裏指定爲管理員用戶
$ rabbitmqctl set_permissions -p / cord ".*" ".*" ".*"
  • airflow對應配置:

先安裝rabbitmq模塊:數據庫

$ pip install apache-airflow[rabbitmq]

而後修改配置文件:apache

編輯airflow.cfg文件,修改一下內容,具體狀況根據實際狀況填寫[ip和端口]:
[core] 
#sql_alchemy_conn = mysql://[username]:[password]@[host]:[port]/airflow
sql_alchemy_conn = mysql://af:123456@localhost/airflow
executor = CeleryExecutor 
[celery] 
注意:這裏使用pyamqp協議,而不是amqp協議,amqp使用的是librabbitmq2.0.0,這個和celery4.x集成有各類問題
broker_url = pyamqp://cord:123456@localhost:5672//
#celery_result_backend = rpc:// 使用rabbitmq做爲結果存儲
#這裏是使用mysql做爲結果存儲
celery_result_backend = db+mysql://af:123456@localhost/airflow

配置完成以後便可進行數據庫初始化:
$ airflow initdb
2.7,啓動airflow
關於啓動,這個要分應用節點和做業節點:
1) 應用節點:
$ airflow webserver -D
$ airflow scheduler -D
$ airflow worker -D (應用節點可不運行woker)
2) 做業節點:(做業節點只須要運行worker就行)
$ airflow worker -D

3.增長定時任務

  1. 在airflow文件夾下面新建dags文件夾用於存儲定時任務文件
  2. 建立以下的定時任務文件helloworld.py
from datetime import timedelta, datetime

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'jifeng.si',
    'depends_on_past': False,
    # 'depends_on_past': True,
    #'start_date': airflow.utils.dates.days_ago(2),
    'start_date': datetime(2018, 5, 2),
    'email': ['1219957063@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_hello_world_dag',
    default_args=default_args,
    description='my first DAG',
    schedule_interval='*/25 * * * *',
    start_date=datetime(2018, 5, 28)
)

dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)

hello_operator = BashOperator(
    task_id='sleep_task',
    depends_on_past=False,
    bash_command='echo `date` >> /home/py/test.txt',
    dag=dag
)

dummy_operator >> hello_operator
  1. 測試一下代碼的正確性:
$ python helloworld.py

若是沒出現異常則說明代碼無錯誤, 而且airflow環境正常.

將helloworld.py 放在 /home/py/airflow/dags下.

測試一下任務看任務是否能正常運行:

$ touch ~/test.txt 建立用於測試的文件
$ airflow run -A example_hello_world_dag sleep_task 20180528

若是運行正常,則能夠啓用該定時任務,啓用任務有兩種方式:

1) 經過命令啓動:

$ airflow unpause example_hello_world_dag

2) 經過界面啓動:
在airflow的web管理界面,將左邊的off按鈕改成on

而後觀察用戶路徑下的test.txt文件,若是運行正常的話會不斷增長時間信息:

$ cat test.txt
....
Thu May 31 15:55:10 CST 2018
Thu May 31 15:56:10 CST 2018
Thu May 31 15:57:09 CST 2018
Thu May 31 16:04:10 CST 2018
....

4.注意事項

  • airflow的cron定時器只能精確到分鐘,而不能精確到秒
  • airflow使用的是utc時區,正式使用的時候須要進行時區轉換

附錄:

附上一個環境初始化shell腳本:

#!/bin/sh

#拷貝mysql依賴
#scp命令必須手工輸入密碼確認過一次以後纔可保證sshpass能正常運行
sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-devel-5.7.22-1.el6.x86_64.rpm  /root/  &&
sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-libs-5.7.22-1.el6.x86_64.rpm  /root/  &&
sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-common-5.7.22-1.el6.x86_64.rpm  /root/ &&

#添加py用戶
password="py@123"
username="py"
pass=$(perl -e 'print crypt($ARGV[0], "password")' $password)
useradd -m -p $pass $username  &&

#添加anaconda安裝路徑
mkdir -p /anaconda  &&
chown -R py:py /anaconda &&

#設置pipy源信息
su - py -c  "mkdir ~/.pip && touch ~/.pip/pip.conf"
su - py -c  "echo '[global]' >> ~/.pip/pip.conf"
su - py -c  "echo 'trusted-host=pypi.douban.com/simple' >> ~/.pip/pip.conf"
su - py -c  "echo 'index-url=http://pypi.douban.com/simple' >> ~/.pip/pip.conf"

#安裝mysql依賴
old=$(rpm -qa|grep mysql-libs-5.1.73|wc -l)
if [ $old -gt 0 ]; then
    rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
fi
rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm  &&
rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm &&
rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm &&

參考連接:

http://doc.okbase.net/permike/archive/245749.html

http://bubuko.com/infodetail-2284634.html

http://celery.readthedocs.io/en/latest/userguide/configuration.html#conf-rpc-result-backend

相關文章
相關標籤/搜索