最近工做任務須要把原來使用Kettle的ETL流程遷移到Hadoop平臺上,就須要找一個替代Kettle工做流部分的工具。在大數據環境下,經常使用的無非是Oozie,Airflow或者Azkaban。通過簡單的評估以後,咱們選擇了輕量化的Airflow做爲咱們的工做流工具。python
Airflow是一個工做流分配管理系統,經過有向非循環圖的方式管理任務流程,設置任務依賴關係和時間調度。Airflow獨立於咱們要運行的任務,只須要把任務的名字和運行方式提供給Airflow做爲一個task就能夠。mysql
本次安裝Airflow 1.8 ,而不是最新版的apache-airflow 1.9,主要緣由是1.9版本的全部運行都是基於UTC時間的,這樣致使在配置調度信息的時候不夠直觀。目前開發中的2.0版本已經能夠設置本地時區,可是尚未公開發布。web
Python 3.5 :Anaconda 4.2環境sql
MySQL 5.6 :使用LocalExcutor模式,全部DAG信息保存在後端數據庫中。shell
OS 用戶:etl數據庫
後端使用MySQL數據庫來保存任務信息,先在數據庫中創建database和user。以下apache
create database airflow;
grant all privileges on airflow.* to 'airflow'@'%' identified by 'airflow';
flush privileges;
複製代碼
Airflow在運行過程當中會使用全局環境變量,因此必須先在~/.bash_profile 中增長變量以下後端
export AIRFLOW_HOME=/home/etl/airflow
複製代碼
使用pip安裝airflow以及依賴的數據庫驅動以後,須要進行初始化。這個過程會生成默認的配置文件ariflow.cfg,後續的配置修改就經過這個文件進行。bash
# 默認安裝1.8版本,由於1.9版本的名字變成了apache-airflow
pip install airflow
# 由於須要鏈接MySQL數據庫,因此須要安裝驅動
pip install airflow[mysql]
# 初始化數據庫,這一步是必須的,不然沒法生成默認配置文件
airflow initdb
# 建立須要的文件夾,不然運行時會報錯找不到默認文件夾
mkdir dags
mkdir logs
複製代碼
默認狀況下airflow的web管理臺是沒有用戶密碼的,在遷移到正式環境以前,咱們須要啓用權限機制。session
在airflow.cfg中設置以下選項
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
複製代碼
啓用權限以後,在第一次登陸以前必須手動經過python REPL來設置初始用戶
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'airflow@xxx.com'
user.password = 'airflow'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
複製代碼
由於使用MySQL做爲元數據庫,因此還須要配置數據庫的鏈接參數。在airflow.cfg中設置以下選項
[core]
executor = LocalExecutor
sql_alchemy_conn = mysql://airflow:airflow@192.168.100.57:3306/airflow?charset=utf8
複製代碼
更改數據庫鏈接方式以後,須要從新執行一次初始化操做。
airflow initdb
複製代碼
還有一些零散的配置很差歸類,就統一記錄在這裏。
任務成功,失敗或重試後發送郵件通知的配置
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = smtp.mxhichina.com
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = bialert@xxx.com
smtp_password = ******
smtp_port = 25
smtp_mail_from = bialert@xxx.com
複製代碼
默認狀況下,Web界面會把樣例DAG都顯示出來很是混亂。除了在數據庫中刪除樣例DAG以外,也能夠經過配置不顯示這部分樣例。
# 不顯示樣例DAG
load_examples = False
複製代碼
Airflow的catchup機制,會在你啓動一個DAG的時候,把當前時間以前未執行的job依次執行一次。這個好處是能夠把遺漏的調度任務進行補足,可是在不少時候咱們並不須要這個特性。經過修改配置,能夠禁止catchup,以下
[scheduler]
# 避免執行catchup,即避免把當前時間以前未執行的job都執行一次
catchup_by_default = False
複製代碼
在默認的8080端口頁面上,能夠對DAG進行平常操做,包括但不限於啓動,中止,查看日誌等。界面以下圖
當前版本Airflow沒有提供關閉腳本,也沒有提供一個便捷的辦法來完全刪除DAG。爲了方便測試,我寫了一個管理腳原本處理相關的任務。
腳本調用方式以下
$ ./airflow_util.py -h
usage: airflow_util.py [-h] [-k] [-s] [--clear CLEAR] [--delete DELETE]
optional arguments:
-h, --help show this help message and exit
-k, --kill 關閉Airflow
-s, --start 啓動Airflow
--clear CLEAR 刪除歷史日誌
--delete DELETE 提供須要刪除的DAG ID
複製代碼
管理腳本源代碼以下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import pymysql
import subprocess
import time
#鏈接配置信息
config = {
'host':'127.0.0.1',
'port':3306,
'user':'airflow',
'password':'airflow',
'db':'airflow',
'charset':'utf8',
}
# 刪除歷史日誌
def clear_log(num):
print("Clear logs before {0} days ...".format(num))
cmd = "find %s -maxdepth 1 -type d -mtime +%d | xargs -i rm -rf {}"
subprocess.call(cmd % ('./logs',num), shell=True)
subprocess.call(cmd % ('./logs/scheduler',num), shell=True)
# 經過殺掉後臺進程來關閉Airflow
def kill_airflow():
print("Stoping Airflow ...")
# exclude current file in case the file name contains keyword 'airflow'
cmd = "ps -ef | grep -Ei 'airflow' | grep -v 'grep' | grep -v '%s' | awk '{print $2}' | xargs -i kill -9 {}" % (__file__.split('/')[-1])
subprocess.call(cmd, shell=True)
# 啓動Airflow
def start_airflow():
kill_airflow()
time.sleep(3)
print("Starting Airflow Webserver ...")
subprocess.call("rm logs/webserver.log", shell=True)
subprocess.call("nohup airflow webserver >>logs/webserver.log 2>&1 &", shell=True)
print("Starting Airflow Scheduler ...")
subprocess.call("rm logs/scheduler.log", shell=True)
subprocess.call("nohup airflow scheduler >>logs/scheduler.log 2>&1 &", shell=True)
# 刪除指定DAG ID在數據庫中的所有信息。
# PS:由於SubDAG的命名方式爲 parent_id.child_id ,因此也會把符合這種規則的SubDAG刪除!
def delete_dag(dag_id):
# 建立鏈接
connection = pymysql.connect(**config)
cursor = connection.cursor()
sql="select dag_id from airflow.dag where (dag_id like '{}.%' and is_subdag=1) or dag_id='{}'".format(dag_id, dag_id)
cursor.execute(sql)
rs = cursor.fetchall()
dags = [r[0] for r in rs ]
for dag in dags:
for tab in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag_stats", "dag" ]:
sql="delete from airflow.{} where dag_id='{}'".format(tab, dag)
print(sql)
cursor.execute(sql)
connection.commit()
connection.close()
#
def main_process():
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--kill", help="關閉Airflow", action='store_true')
parser.add_argument("-s", "--start", help="啓動Airflow", action='store_true')
parser.add_argument("--clear", help="刪除歷史日誌", type=int)
parser.add_argument("--delete", help="提供須要刪除的DAG ID")
args = parser.parse_args()
if args.kill:
kill_airflow()
if args.start:
start_airflow()
if args.clear:
clear_log(args.clear)
if args.delete:
delete_dag(args.delete)
if __name__ == '__main__':
main_process()
複製代碼
原生Airflow的工做流經過簡單的python腳原本進行定義(有一些第三方擴展能夠實現拖放模式的定義)。
對於task不是特別多的場景,把全部task都定義在同一個py文件裏面便可。以下,定義了4個task
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
template_caller = "sh /home/etl/jupyter_home/etl_script/spark_scheduler/subdir/caller_spark.sh -m {0} -f {1} "
template_file = '/home/etl/jupyter_home/etl_script/spark_scheduler/subdir/{0}'
default_spark_master = 'spark://192.168.100.51:7077'
#-------------------------------------------------------------------------------
default_args = {
'owner': '測試',
'depends_on_past': False,
'start_date': datetime(2018,4,24,14,0,0),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
}
#-------------------------------------------------------------------------------
dag = DAG(
'demo_spark_normal',
default_args=default_args,
description='測試-調用Spark',
schedule_interval='*/20 * * * *')
#-------------------------------------------------------------------------------
# spark operator
cmd = template_caller.format(default_spark_master, template_file.format('hive_rw.ipynb'))
t1 = BashOperator( task_id='spark_hive', bash_command=cmd , dag=dag)
cmd = template_caller.format(default_spark_master, template_file.format('jdbc_rw.ipynb'))
t2 = BashOperator( task_id='spark_jdbc', bash_command=cmd , dag=dag)
cmd = template_caller.format(default_spark_master, template_file.format('csv_relative.py'))
t3 = BashOperator( task_id='spark_csv', bash_command=cmd , dag=dag)
cmd = template_caller.format(default_spark_master, template_file.format('pure_sql.sql'))
t4 = BashOperator( task_id='spark_sql', bash_command=cmd , dag=dag)
#-------------------------------------------------------------------------------
# dependencies
t1 >> t2 >> t4
t1 >> t3 >> t4
複製代碼
當一個工做流裏面的task過多,UI顯示會比較擁擠,這種場景下能夠經過把task分類到不一樣SubDAG中的辦法來實現。在具體編寫上,又能夠分爲單一py文件和多個py文件的方案。
這種狀況下,咱們把DAG和SubDAG都寫在一個py文件裏面。優勢是隻有一個文件易於編寫,缺點是若是task比較多的話,文件不易管理。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
PARENT_DAG_NAME = 'atest_04'
#CHILD_DAG_NAME = 'child_dag'
default_args = {
'owner': '測試',
'depends_on_past': False,
}
main_dag = DAG(
dag_id=PARENT_DAG_NAME,
default_args=default_args,
description='測試-內嵌SubDAG',
start_date=datetime(2018,4,21,16,0,0),
schedule_interval='*/30 * * * *'
)
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
t1 = BashOperator(
task_id='print_{}'.format(child_dag_name),
bash_command='echo sub key_{} `date` >> /home/etl/airflow/test.log'.format(child_dag_name),
dag=dag)
return dag
#
sub_dag_1 = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, 'child_01', main_dag.start_date, main_dag.schedule_interval),
task_id='child_01',
dag=main_dag,
)
sub_dag_2 = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, 'child_02', main_dag.start_date, main_dag.schedule_interval),
task_id='child_02',
dag=main_dag,
)
#
sub_dag_1 >> sub_dag_2
複製代碼
當SubDAG比較多的場景下,把DAG文件保存在獨立的py文件中是一種更好的方法。文件目錄結構以下
主文件以下
PS:由於在airflow中調用其餘文件的過程當中會出現找不到model的錯誤,因此在主文件中增長了一句處理路徑的語句。若是有更好的辦法,能夠對這個進行替換。
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
複製代碼
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime
from sub.subdag import sub_dag
PARENT_DAG_NAME = 'atest_03'
CHILD_DAG_NAME = 'child_dag'
default_args = {
'owner': '測試',
'depends_on_past': False,
}
main_dag = DAG(
dag_id=PARENT_DAG_NAME,
default_args=default_args,
description='測試-獨立SubDAG',
start_date=datetime(2018,4,14,19,0,0),
schedule_interval='*/10 * * * *',
catchup=False
)
sub_dag = SubDagOperator(
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
main_dag.schedule_interval),
task_id=CHILD_DAG_NAME,
dag=main_dag,
)
複製代碼
子文件以下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
t1 = BashOperator(
task_id='print_1',
bash_command='echo sub 1 `date` >> /home/etl/airflow/test.log',
dag=dag)
return dag
複製代碼
在DAG上,任務的觸發由兩個主要參數定義,start_date 和 schedule_interval 。一個DAG第一次被觸發的時間點是 start_date + schedule_interval。舉例以下:
start_date 2018-04-20 14:00:00
schedule_interval */30 * * * *
複製代碼
那第一次觸發會在14:30發生,可是執行的是14:00的任務。根據激活DAG時間的不一樣,會發生不一樣的觸發。
綜上,若是但願每30分鐘觸發一次,而且第一次觸發發生在14:00,那麼設置的start_date就應該是 13:30:00,這樣在14:00的時候,就會觸發第一次任務。
在1.8版本中,不能直接經過cfg文件來配置LOGGING LEVEL,因此採用修改源碼的方式實現這個功能。
PS:在1.9以後的版本中,聽說能夠直接進行配置,我沒有測試。
vi /opt/anaconda3/lib/python3.5/site-packages/airflow/settings.py
# 修改此處代碼,把默認的INFO修改爲WARN便可
LOGGING_LEVEL = logging.WARN
複製代碼