服務器使用的是centos系統,須要安裝好pip和setuptools,同時注意更新安裝的版本python
接下來參考安裝好Airflowmysql
Airflow 1.8 工做流平臺搭建 http://blog.csdn.net/kk185800961/article/details/78431484
airflow最簡安裝方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530
以mysql做爲數據庫,airflow默認使用sqlite做爲數據庫web
1.建表sql
# 建立相關數據庫及帳號 mysql> create database airflow default charset utf8 collate utf8_general_ci; mysql> create user airflow@'localhost' identified by 'airflow'; mysql> grant all on airflow.* to airflow@'localhost'; mysql> flush privileges;
2.安裝airflow,須要環境隔離的時候請使用virtualenv ./env建立隔離環境shell
pip install apache-airflow
3.使用pip來安裝,安裝的路徑在python路徑下site-packages文件夾,在使用上述命令一遍就能看到數據庫
~/anaconda2/lib/python2.7/site-packages/airflow
4.在/etc/proofile中添加,以後source一下apache
#Airflow export AIRFLOW_HOME=/home/lintong/software/airflow
5.建立數據庫,這一步會建立上面路徑的airflow文件夾,以及文件夾中的一些文件vim
airflow initdb
查看是否安裝成功centos
airflow version
5.配置元數據庫地址bash
/home/lintong/software/airflow vim airflow.cfg
修改下圖中的配置
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
6.安裝python的mysql驅動
pip install mysql-python
再次初始化數據庫
airflow initdb
7.啓動web界面
airflow webserver -p 8080 http://localhost:8080/admin/
8.在airflow路徑下新建一個dags文件夾,並建立一個DAG python腳本,參考:[AirFlow]AirFlow使用指南三 第一個DAG示例
# -*- coding: utf-8 -*- import airflow import os import time from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from datetime import timedelta # ------------------------------------------------------------------------------- # these args will get passed on to each operator # you can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(0), 'email': ['xxxxxxx'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'dag': dag, # 'adhoc':False, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'trigger_rule': u'all_success' } # ------------------------------------------------------------------------------- # dag dag = DAG( 'example_print_dag', default_args=default_args, description='my first DAG', schedule_interval=timedelta(days=1)) def each_content(content, path): return ("echo \"" + content + " " + time.asctime(time.localtime(time.time())) + "\" >> " + path) # ------------------------------------------------------------------------------- # first operator #print1_operator = PythonOperator( # task_id='print1_task', # python_callable=each_content("1", "/home/lintong/桌面/test.txt"), # dag=dag) print1_operator = BashOperator( task_id='print1_task', bash_command='echo 1 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # second operator print2_operator = BashOperator( task_id='print2_task', bash_command='echo 2 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # third operator print3_operator = BashOperator( task_id='print3_task', bash_command='echo 3 >> /home/lintong/test.txt', dag=dag) # ------------------------------------------------------------------------------- # dependencies #each_content("1", "/home/lintong/桌面/test.txt") print2_operator.set_upstream(print1_operator) print3_operator.set_upstream(print1_operator) #if __name__ == "__main__": # dag.cli()
9.在web界面中查看是否出現這個DAG
10.出現的時候,DAG的狀態是off,須要將起狀態設置爲on,並點擊後面的 綠色三角形 啓動按鈕
11.啓動調度器
airflow scheduler
12.查看文件test.txt,其中會順序出現1 2 3或者1 3 2
安裝完成後使用下面shell腳原本啓動Airflow,端口爲8080
#!/bin/bash #set -x #set -e set -u # 使用./start_airflow.sh "stop_all"或者"start_all" if [ $1 == "stop_all" ]; then # 獲取全部進程,取出Airflow,去除grep,截取PID,幹掉 ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill fi if [ $1 == "start_all" ]; then cd /home/lintong/software/airflow/logs nohup airflow webserver >>webserver.log 2>&1 & nohup airflow worker >>worker.log 2>&1 & nohup airflow scheduler >>scheduler.log 2>&1 & echo "後臺啓動Airflow" fi
添加用戶的python腳本
from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'lintong' user.email = 'xxxxx@gmail.com' user.password = 'XXXXX' session = settings.Session() session.add(user) session.commit() session.close() exit()