原先在業務開發部門作後端開發;因爲業務須要,從每一個部門抽一名開發人員去作數據報表的開發,接到任務,心裏告訴本身好好作,數據平臺用於公司高層及運營人員方便查看數據的平臺,在新的項目組能學到新的python和任務調度工具airflow,對本身會有很大的提高。python
前期準備git
測試環境搭建airflow的UI界面github
airflow的最佳實戰web
airflow的官方文檔(全英文)apache
etl技術文檔編程
etl-example最佳實戰(代碼)後端
airflow中文文檔api
airflow是一個可編程、調度和監控的工做流平臺。bash
Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.服務器
基於有向無環圖(DAG),airflow能夠定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面一樣也能夠方便的管控調度任務,而且對任務運行狀態進行實時監控,方便了系統的運維和管理。
Airflow 是 Airbnb 開源的一個用 Python 編寫的工做流管理平臺,自帶 web UI 和調度,目前在Apache下作孵化。
在實際項目中,咱們常常遇到如下場景:
airflow經過DAG配置文件,能輕鬆定義各類任務及任務之間的依賴關係和調度執行,並一個可視化的操做web界面。
自帶web管理界面,易上手;
業務代碼和調度代碼徹底解耦;
經過python代碼定義子任務,並支持各類Operate操做器,靈活性大,能知足用戶的各類需求;
python開源項目,支持擴展operate等插件,便於二次開發;
相似的工具備akzban,quart等;
概要:DAG(Directed Acyclic Graph)是有向無環圖,也稱爲有向無循環圖。在Airflow中,一個DAG定義了一個完整的做業。同一個DAG中的全部Task擁有相同的調度時間。
參數: dag_id: 惟一識別DAG,方便往後管理
default_args: 默認參數,若是當前DAG實例的做業沒有配置相應參數,則採用DAG實例的default_args中的相應參數
schedule_interval: 配置DAG的執行週期,可採用crontab語法
概要:Task爲DAG中具體的做業任務,依賴於DAG,也就是必須存在於某個DAG中。Task在DAG中能夠配置依賴關係(固然也能夠配置跨DAG依賴,可是並不推薦。跨DAG依賴會致使DAG圖的直觀性下降,並給依賴管理帶來麻煩)。
參數:
dag: 傳遞一個DAG實例,以使當前做業屬於相應DAG
task_id: 給任務一個標識符(名字),方便往後管理
owner: 任務的擁有者,方便往後管理
start_date: 任務的開始時間,即任務將在這個時間點以後開始調度
start_date:在配置中,它是做業開始調度時間。而在談論執行情況時,它是調度開始時間。
schedule_interval:調度執行週期。
execution_date: 執行時間。在Airflow中稱爲執行時間,但其實它並非真實的執行時間。
因此,第一次調度時間:在做業中配置的start_date,且知足schedule_interval的時間點。記錄的execution_date爲做業中配置的start_date的第一個知足schedule_interval的時間。
[舉個例子]
假設咱們配置了一個做業的start_date爲2019年6月2日,配置的schedule_interval爲* 00 12 * * *,那麼第一次執行的時間將是2019年6月3日12點。所以execution_date並非如期字面說的表示執行時間,真正的執行時間是execution_date所顯示的時間的下一個知足schedule_interval的時間點。
# coding: utf-8
# DAG 對象; 咱們將須要它來實例化一個 DAG
from airflow import DAG
import pendulum
# Operators; 咱們須要利用這個對象去執行流程!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# 定義默認參數
default_args = {
'owner': 'airflow', # 擁有者名稱
'depends_on_past': False, # 是否依賴上一個本身的執行狀態
'start_date': datetime(2019,7,24,16,45), # 第一次開始執行的時間,爲格林威治時間,爲了方便測試,通常設置爲當前時間減去執行週期
'retries': 3, # 失敗重試次數
'retry_delay': timedelta(seconds=5) # 失敗重試間隔
}
# 定義DAG,實例化
dag = DAG(
dag_id='hello_world_dag', # dag_id
default_args=default_args, # 指定默認參數
# schedule_interval="00, *, *, *, *" # 執行週期,依次是分,時,天,月,年,此處表示每一個整點執行
schedule_interval=timedelta(minutes=1) # 執行週期,表示每分鐘執行一次
)
# 定義要執行的Python函數1
def hello_world_1():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_1.txt', 'a') as f:
f.write('%s\n' % current_time)
assert 1 == 1 # 能夠在函數中使用assert斷言來判斷執行是否正常,也能夠直接拋出異常
# 定義要執行的Python函數2
def hello_world_2():
current_time = str(datetime.today())
with open('/root/tmp/hello_world_2.txt', 'a') as f:
f.write('%s\n' % current_time)
————————————————
# 定義要執行的task 1
t1 = PythonOperator(
task_id='hello_world_1', # task_id
python_callable=hello_world_1, # 指定要執行的函數
dag=dag, # 指定歸屬的dag
retries=2, # 重寫失敗重試次數,若是不寫,則默認使用dag類中指定的default_args中的設置
)
# 定義要執行的task 2
t2 = PythonOperator(
task_id='hello_world_2', # task_id
python_callable=hello_world_2, # 指定要執行的函數
dag=dag, # 指定歸屬的dag
)
t2.set_upstream(t1) # t2依賴於t1;等價於 t1.set_downstream(t2);同時等價於 dag.set_dependency('hello_world_1', 'hello_world_2')
# 表示t2這個任務只有在t1這個任務執行成功時才執行,
# 或者
#t1 >> t2
複製代碼
咱們能夠看到,整個 DAG 的配置就是一份完整的 Python 代碼,在代碼中實例化 DAG,實例化適合的 Operator,並經過 set_downstream 等方法配置上下游依賴關係。
The Airflow UI makes it easy to monitor and troubleshoot your data pipelines. Here’s a quick overview of some of the features and visualizations you can find in the Airflow UI.
DAGs View
List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance.
Tree View
A tree representation of the DAG that spans across time. If a pipeline is late, you can quickly see where the different steps are and identify the blocking ones.
Graph View
The graph view is perhaps the most comprehensive. Visualize your DAG’s dependencies and their current status for a specific run.
Variable View
The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.
Gantt Chart
The Gantt chart lets you analyse task duration and overlap. You can quickly identify bottlenecks and where the bulk of the time is spent for specific DAG runs.
Task Duration
The duration of your different tasks over the past N runs. This view lets you find outliers and quickly understand where the time is spent in your DAG over many runs.
Code View
Transparency is everything. While the code for your pipeline is in source control, this is a quick way to get to the code that generates the DAG and provide yet more context.
完成數據從原始庫遷移到目標庫。
007不寫就出局的主旨:一個普通人經過持續寫做實現成長的平臺 平臺營造:陪伴+監督互助氛圍,「自律+他律踐行環境」,讓更多戰友得到新生。
全國各地的朋友能夠參加,每個月按期會組織線下活動,全國各地的職場人也都在《007不寫就出局》社羣中,歡迎各行各業的夥伴交流。
我爲何寫做
戰友爲何加入007寫做社羣
想加入《007不寫就出局》,掃小企鵝二維碼。
天天早上喜歡聽的商業思惟課程,分享給你們
敏娘娘: 由中國創業天后級導師 敏娘娘 親自帶隊的 《世界首富思惟學院》即將拉開帷幕。該系統爲中國首家研究-全球世界首富思惟的大學。
敏娘娘老師是微世管理諮詢創始人,是中國企業家教練,新商業生態架構師,爲世界500強服務:中國銀行,中國移動,比亞迪。每一年培訓️萬名企業家,一對一指導企業10年突破️️️家。
1.表達清晰、明確、有過程描述和結果描述。
2.不要下達模糊指令,下達指令以後要給出方法。
複製代碼