搭建Airflow數據流調度器

服務器使用的是centos系統,須要安裝好pip和setuptools,同時注意更新安裝的版本python

接下來參考安裝好Airflowmysql

Airflow 1.8 工做流平臺搭建 http://blog.csdn.net/kk185800961/article/details/78431484
airflow最簡安裝方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530

 

以mysql做爲數據庫,airflow默認使用sqlite做爲數據庫web

1.建表sql

# 建立相關數據庫及帳號  
mysql> create database airflow default charset utf8 collate utf8_general_ci;  
mysql> create user airflow@'localhost' identified by 'airflow';  
mysql> grant all on airflow.* to airflow@'localhost';  
mysql> flush privileges;  

2.安裝airflow,須要環境隔離的時候請使用virtualenv ./env建立隔離環境shell

pip install apache-airflow

3.使用pip來安裝,安裝的路徑在python路徑下site-packages文件夾,在使用上述命令一遍就能看到數據庫

~/anaconda2/lib/python2.7/site-packages/airflow

4.在/etc/proofile中添加,以後source一下apache

#Airflow
export AIRFLOW_HOME=/home/lintong/software/airflow

5.建立數據庫,這一步會建立上面路徑的airflow文件夾,以及文件夾中的一些文件vim

airflow initdb

 查看是否安裝成功centos

airflow version

5.配置元數據庫地址bash

/home/lintong/software/airflow
vim airflow.cfg

修改下圖中的配置

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

6.安裝python的mysql驅動

pip install mysql-python

再次初始化數據庫

airflow initdb

7.啓動web界面

airflow webserver -p 8080
http://localhost:8080/admin/

8.在airflow路徑下新建一個dags文件夾,並建立一個DAG python腳本,參考:[AirFlow]AirFlow使用指南三 第一個DAG示例

# -*- coding: utf-8 -*-

import airflow
import os
import time
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

# -------------------------------------------------------------------------------
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0),
    'email': ['xxxxxxx'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

# -------------------------------------------------------------------------------
# dag

dag = DAG(
    'example_print_dag',
    default_args=default_args,
    description='my first DAG',
    schedule_interval=timedelta(days=1))


def each_content(content, path):
    return ("echo \"" + content + " " + time.asctime(time.localtime(time.time())) + "\" >> " + path)

# -------------------------------------------------------------------------------
# first operator

#print1_operator = PythonOperator(
#    task_id='print1_task',
#    python_callable=each_content("1", "/home/lintong/桌面/test.txt"),
#    dag=dag)

print1_operator = BashOperator(
    task_id='print1_task',
    bash_command='echo 1 >> /home/lintong/test.txt',
    dag=dag)

# -------------------------------------------------------------------------------
# second operator

print2_operator = BashOperator(
    task_id='print2_task',
    bash_command='echo 2 >> /home/lintong/test.txt',
    dag=dag)

# -------------------------------------------------------------------------------
# third operator

print3_operator = BashOperator(
    task_id='print3_task',
    bash_command='echo 3 >> /home/lintong/test.txt',
    dag=dag)

# -------------------------------------------------------------------------------
# dependencies
#each_content("1", "/home/lintong/桌面/test.txt")
print2_operator.set_upstream(print1_operator)
print3_operator.set_upstream(print1_operator)

#if __name__ == "__main__":
#    dag.cli()

9.在web界面中查看是否出現這個DAG

10.出現的時候,DAG的狀態是off,須要將起狀態設置爲on,並點擊後面的 綠色三角形 啓動按鈕

11.啓動調度器

airflow scheduler

12.查看文件test.txt,其中會順序出現1 2 3或者1 3 2

 

安裝完成後使用下面shell腳原本啓動Airflow,端口爲8080

#!/bin/bash

#set -x
#set -e
set -u

# 使用./start_airflow.sh "stop_all"或者"start_all"
if [ $1 == "stop_all" ]; then
    # 獲取全部進程,取出Airflow,去除grep,截取PID,幹掉
    ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill
fi

if [ $1 == "start_all" ]; then
    cd /home/lintong/software/airflow/logs
    nohup airflow webserver >>webserver.log 2>&1 &
    nohup airflow worker >>worker.log 2>&1 &
    nohup airflow scheduler >>scheduler.log 2>&1 &
    echo "後臺啓動Airflow"
fi

 添加用戶的python腳本

from airflow import models,   settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'lintong'
user.email = 'xxxxx@gmail.com'
user.password = 'XXXXX'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
相關文章
相關標籤/搜索