Airflow是一個可編程,調度和監控的工做流平臺,基於有向無環圖(DAG),airflow能夠定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面一樣也能夠方便的管控調度任務,而且對任務運行狀態進行實時監控,方便了系統的運維和管理。html
Airflow自己是一個綜合平臺,它兼容多種組件,因此在使用的時候有多種方案能夠選擇。好比最關鍵的執行器就有四種選擇:python
SequentialExecutor:單進程順序執行任務,默認執行器,一般只用於測試web
LocalExecutor:多進程本地執行任務redis
CeleryExecutor:分佈式調度,生產經常使用sql
DaskExecutor :動態任務調度,主要用於數據分析編程
在當前項目使用CeleryExecutor
做爲執行器。segmentfault
celery是一個分佈式調度框架,其自己無隊列功能,須要使用第三方組件,好比redis或者rabbitmq,當前項目使用的是rabbitmq,系統總體結構以下所示:bash
其中:併發
turing爲外部系統框架
GDags服務幫助拼接成dag
master節點webui管理dags、日誌等信息
scheduler負責調度,只支持單節點
worker負責執行具體dag中的task, worker支持多節點
在整個調度系統中,節點之間的傳遞介質是消息,而消息的本質內容是執行腳本的命令,也就是說,工做節點的dag文件必須和master節點的dag文件保持一致,否則任務的執行會出問題。
airflow內置了豐富的任務處理器,用於實現不一樣類型的任務:
BashOperator : 執行bash命令
PythonOperator : 調用python代碼
EmailOperator : 發送郵件
HTTPOperator : 發送 HTTP 請求
SqlOperator : 執行 SQL 命令
除了這些基本的構建塊以外,還有更多的特定處理器:DockerOperator
,HiveOperator
,S3FileTransferOperator
,PrestoToMysqlOperator
,SlackOperator
...
在當前項目使用了HTTPOperator
做爲執行器,用於調用JAVA服務,總體結構圖以下:
關於airflow的環境搭建能夠參考另一篇博客: http://www.javashuo.com/article/p-yjpfdieu-ck.html
$ airflow webserver -D 守護進程運行webserver
$ airflow scheduler -D 守護進程運行調度器
$ airflow worker -D 守護進程運行調度器
$ airflow worker -c 1 -D 守護進程運行celery worker並指定任務併發數爲1
$ airflow pause dag_id 暫停任務
$ airflow unpause dag_id 取消暫停,等同於在管理界面打開off按鈕
$ airflow list_tasks dag_id 查看task列表
$ airflow clear dag_id 清空任務實例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 運行整個dag文件
$ airflow run dag_id task_id execution_date 運行task
啓動web管控界面須要執行airflow webserver -D
命令,默認訪問端口是8080
http://110.55.63.51:8080/admin/
(1) 任務啓動暫停開關
(2) 任務運行狀態
(3) 待執行,未分發的任務
(4) 手動觸發執行任務
(5) 任務管控界面
選擇對應dag欄目,點擊(5) Graph View便可進入任務管控界面
點擊對應的任務,會彈出一個任務管控臺,主要幾個功能以下:
View Log : 查看任務日誌
Run : 運行選中任務
Clear:清空任務隊列
Mark Success : 標記任務爲成功狀態
from datetime import timedelta, datetime import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator default_args = { #默認參數 'owner': 'jifeng.si', #dag擁有者,用於權限管控 'depends_on_past': False, #是否依賴上游任務 'start_date': datetime(2018, 5, 2), #任務開始時間,默認utc時間 'email': ['123456789@qq.com'], #告警通知郵箱地址 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'example_hello_world_dag', #dag的id default_args=default_args, description='my first DAG', #描述 schedule_interval='*/25 * * * *', # crontab start_date=datetime(2018, 5, 28) #開始時間,覆蓋默認參數 ) def print_hello(): return 'Hello world!' dummy_operator = DummyOperator(task_id='dummy_task', dag=dag) hello_operator = BashOperator( #經過BashOperator定義執行bash命令的任務 task_id='sleep_task', depends_on_past=False, bash_command='echo `date` >> /home/py/test.txt', dag=dag ) dummy_operator >> hello_operator #設置任務依賴關係 #dummy_operator.set_downstream(hello_operator)
import os from datetime import timedelta, datetime import pytz from airflow.operators.http_operator import SimpleHttpOperator from airflow.models import DAG default_args = { 'owner': 'cord', # 'depends_on_past': False, 'depends_on_past': True, 'wait_for_downstream': True, 'execution_timeout': timedelta(minutes=3), 'email': ['123456789@qq.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } #將本地時間轉換爲utc時間,再設置爲start_date tz = pytz.timezone('Asia/Shanghai') dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz) utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None) os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090' dag = DAG( 'bm01', default_args=default_args, description='my DAG', schedule_interval='*/2 * * * *', start_date=utc_dt ) #經過SimpleHttpOperator定義http任務 task1 = SimpleHttpOperator( task_id='get_op1', http_conn_id='http_test', method='GET', endpoint='test1', data={}, headers={}, dag=dag) task2 = SimpleHttpOperator( task_id='get_op2', http_conn_id='http_test', method='GET', endpoint='test2', data={}, headers={}, dag=dag) task1 >> task2
crontab
格式以下所示:
# ┌───────────── minute (0 - 59) # │ ┌───────────── hour (0 - 23) # │ │ ┌───────────── day of month (1 - 31) # │ │ │ ┌───────────── month (1 - 12) # │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday; # │ │ │ │ │ 7 is also Sunday on some systems) # │ │ │ │ │ # │ │ │ │ │ # * * * * * command to execute
域 | 是否必須 | 取值範圍 | 可用特殊符號 | 備註 |
---|---|---|---|---|
Minutes | Yes | 0–59 | * , - |
|
Hours | Yes | 0–23 | * , - |
|
Day of month | Yes | 1–31 | * , - ? L W |
? L W 部分實現可用 |
Month | Yes | 1–12 or JAN–DEC | * , - |
|
Day of week | Yes | 0–6 or SUN–SAT | * , - ? L # |
? L W 部分實現可用 |
Year | No | 1970–2099 | * , - |
標準實現裏無這一項 |
特殊符號功能說明:
逗號(,
)
逗號用於分隔一個列表裏的元素,好比 "MON,WED,FRI" 在第五域(day of week)表示Mondays, Wednesdays and Fridays。
連字符(-
)
連字符用於表示範圍,好比2000–2010表示2000到2010之間的每一年,包括這兩年(閉區間)。
百分號(%
)
用於命令(command)中的格式化
L
表示last
,最後一個,好比第五域,5L
表示當月最後一個星期五
W
W
表示weekday(Monday-Friday),指離指定日期附近的工做日,好比第三域設置爲15L
,這表示臨近當月15附近的工做日,假如15號是星期六,那麼定時器會在14號執行,若是15號是星期天,那麼定時器會在16號執行,也就是說只會在離指定日期最近的那天執行。
井號#
#
用於第五域(day of week),#後面跟着一個1~5之間的數字,這個用於表示第幾個星期,好比5#3
表示第三個星期五
?
在有些實現裏面,?
與*
的功能相同,還有一些實現裏面?
表示cron的啓動時間,好比 當cron服務在8:25am啓動,則? ? * * * *
會更新爲25 8 * * * *
, 直到下一次cron服務從新啓動,定時器會再次更新。
/
/
通常與*
組合使用,後面跟着一個數字,表示頻率,好比在第一域(Minutes)中*/5
表示每5分鐘,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的縮寫
參考連接:
https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral