AirFlow簡介

1, 簡介

​ Airflow是一個可編程,調度和監控的工做流平臺,基於有向無環圖(DAG),airflow能夠定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面一樣也能夠方便的管控調度任務,而且對任務運行狀態進行實時監控,方便了系統的運維和管理。html

2,執行器(Executor)

​ Airflow自己是一個綜合平臺,它兼容多種組件,因此在使用的時候有多種方案能夠選擇。好比最關鍵的執行器就有四種選擇:python

SequentialExecutor:單進程順序執行任務,默認執行器,一般只用於測試web

LocalExecutor:多進程本地執行任務redis

CeleryExecutor:分佈式調度,生產經常使用sql

DaskExecutor :動態任務調度,主要用於數據分析編程

在當前項目使用CeleryExecutor做爲執行器。segmentfault

celery是一個分佈式調度框架,其自己無隊列功能,須要使用第三方組件,好比redis或者rabbitmq,當前項目使用的是rabbitmq,系統總體結構以下所示:bash

其中:併發

turing爲外部系統框架

GDags服務幫助拼接成dag

master節點webui管理dags、日誌等信息

scheduler負責調度,只支持單節點

worker負責執行具體dag中的task, worker支持多節點

在整個調度系統中,節點之間的傳遞介質是消息,而消息的本質內容是執行腳本的命令,也就是說,工做節點的dag文件必須和master節點的dag文件保持一致,否則任務的執行會出問題。

3,任務處理器

airflow內置了豐富的任務處理器,用於實現不一樣類型的任務:

BashOperator : 執行bash命令

PythonOperator : 調用python代碼

EmailOperator : 發送郵件

HTTPOperator : 發送 HTTP 請求

SqlOperator : 執行 SQL 命令

除了這些基本的構建塊以外,還有更多的特定處理器:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator ...

在當前項目使用了HTTPOperator做爲執行器,用於調用JAVA服務,總體結構圖以下:

關於airflow的環境搭建能夠參考另一篇博客: http://www.javashuo.com/article/p-yjpfdieu-ck.html

4,基本使用

4.1,經常使用命令
$ airflow webserver -D       守護進程運行webserver
$ airflow scheduler -D       守護進程運行調度器
$ airflow worker -D          守護進程運行調度器
$ airflow worker -c 1 -D     守護進程運行celery worker並指定任務併發數爲1
$ airflow pause dag_id      暫停任務
$ airflow unpause dag_id     取消暫停,等同於在管理界面打開off按鈕
$ airflow list_tasks dag_id  查看task列表
$ airflow clear dag_id       清空任務實例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  運行整個dag文件
$ airflow run dag_id task_id execution_date       運行task
4.2,web管控界面的使用

啓動web管控界面須要執行airflow webserver -D命令,默認訪問端口是8080

http://110.55.63.51:8080/admin/

(1) 任務啓動暫停開關

(2) 任務運行狀態

(3) 待執行,未分發的任務

(4) 手動觸發執行任務

(5) 任務管控界面

選擇對應dag欄目,點擊(5) Graph View便可進入任務管控界面

點擊對應的任務,會彈出一個任務管控臺,主要幾個功能以下:

View Log : 查看任務日誌

Run : 運行選中任務

Clear:清空任務隊列

Mark Success : 標記任務爲成功狀態

4.3 經過定義DAG文件實現建立定時任務
1) 普通任務
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = { #默認參數
    'owner': 'jifeng.si', #dag擁有者,用於權限管控
    'depends_on_past': False,  #是否依賴上游任務
    'start_date': datetime(2018, 5, 2), #任務開始時間,默認utc時間
    'email': ['123456789@qq.com'], #告警通知郵箱地址
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_hello_world_dag',  #dag的id
    default_args=default_args,
    description='my first DAG', #描述
    schedule_interval='*/25 * * * *', # crontab
    start_date=datetime(2018, 5, 28) #開始時間,覆蓋默認參數
)

def print_hello():
    return 'Hello world!'

dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)

hello_operator = BashOperator(   #經過BashOperator定義執行bash命令的任務
    task_id='sleep_task',
    depends_on_past=False,
    bash_command='echo `date` >> /home/py/test.txt',
    dag=dag
)

dummy_operator >> hello_operator #設置任務依賴關係
#dummy_operator.set_downstream(hello_operator)
2) 定義http任務並使用本地時間
import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG

default_args = {
    'owner': 'cord',
    # 'depends_on_past': False,
    'depends_on_past': True,
    'wait_for_downstream': True,
    'execution_timeout': timedelta(minutes=3),
    'email': ['123456789@qq.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#將本地時間轉換爲utc時間,再設置爲start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)

os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'

dag = DAG(
    'bm01',
    default_args=default_args,
    description='my DAG',
    schedule_interval='*/2 * * * *',
    start_date=utc_dt
)

#經過SimpleHttpOperator定義http任務
task1 = SimpleHttpOperator(
    task_id='get_op1',
    http_conn_id='http_test',
    method='GET',
    endpoint='test1',
    data={},
    headers={},
    dag=dag)

task2 = SimpleHttpOperator(
    task_id='get_op2',
    http_conn_id='http_test',
    method='GET',
    endpoint='test2',
    data={},
    headers={},
    dag=dag)

task1 >> task2
4.4 crontab語法

crontab格式以下所示:

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                       7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * *  command to execute
是否必須 取值範圍 可用特殊符號 備註
Minutes Yes 0–59 * , -
Hours Yes 0–23 * , -
Day of month Yes 1–31 * , - ? L W ? L W部分實現可用
Month Yes 1–12 or JAN–DEC * , -
Day of week Yes 0–6 or SUN–SAT * , - ? L # ? L W 部分實現可用
Year No 1970–2099 * , - 標準實現裏無這一項

特殊符號功能說明:

逗號(,)
​ 逗號用於分隔一個列表裏的元素,好比 "MON,WED,FRI" 在第五域(day of week)表示Mondays, Wednesdays and Fridays。

連字符(-)
​ 連字符用於表示範圍,好比2000–2010表示2000到2010之間的每一年,包括這兩年(閉區間)。

百分號(%)
​ 用於命令(command)中的格式化

L
​ 表示last,最後一個,好比第五域,5L表示當月最後一個星期五

W
W表示weekday(Monday-Friday),指離指定日期附近的工做日,好比第三域設置爲15L ,這表示臨近當月15附近的工做日,假如15號是星期六,那麼定時器會在14號執行,若是15號是星期天,那麼定時器會在16號執行,也就是說只會在離指定日期最近的那天執行。

井號#
#用於第五域(day of week),#後面跟着一個1~5之間的數字,這個用於表示第幾個星期,好比5#3表示第三個星期五

?
​ 在有些實現裏面,*的功能相同,還有一些實現裏面?表示cron的啓動時間,好比 當cron服務在8:25am啓動,則? ? * * * *會更新爲25 8 * * * *, 直到下一次cron服務從新啓動,定時器會再次更新。

/
/通常與*組合使用,後面跟着一個數字,表示頻率,好比在第一域(Minutes)中*/5表示每5分鐘,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的縮寫

參考連接:
https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral

https://en.wikipedia.org/wiki/Cron

相關文章
相關標籤/搜索