學習airflow編排、調度和監控workflow平臺的反思

爲何學airflow呢?

原先在業務開發部門作後端開發;因爲業務須要,從每一個部門抽一名開發人員去作數據報表的開發,接到任務,心裏告訴本身好好作,數據平臺用於公司高層及運營人員方便查看數據的平臺,在新的項目組能學到新的python和任務調度工具airflow,對本身會有很大的提高。python

開啓學習airflow之旅

  • 前期準備git

    測試環境搭建airflow的UI界面github

    airflow的最佳實戰web

    airflow的官方文檔(全英文)apache

    etl技術文檔編程

    etl-example最佳實戰(代碼)後端

    airflow中文文檔api

什麼是airflow

  • 官方解釋: Airflow is a platform to programmatically author, schedule and monitor workflows.

airflow是一個可編程、調度和監控的工做流平臺。bash

Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.服務器

基於有向無環圖(DAG),airflow能夠定義一組有依賴的任務,按照依賴依次執行。airflow提供了豐富的命令行工具用於系統管控,而其web管理界面一樣也能夠方便的管控調度任務,而且對任務運行狀態進行實時監控,方便了系統的運維和管理。

  • airflow簡介:

Airflow 是 Airbnb 開源的一個用 Python 編寫的工做流管理平臺,自帶 web UI 和調度,目前在Apache下作孵化。

airflow能用來幹什麼?

在實際項目中,咱們常常遇到如下場景:

  • 運維人員,定時對服務器執行腳本某些腳本,最簡單的方式是添加一些crond任務,但若是想追溯各個任務的執行結果時?
  • 在大數據場景下,每隔一段時間需導出線上數據、導入到大數據平臺、觸發數據處理等多個子操做,且各個子操做含有依賴關係時?
  • 在管理大量主機時,想要一個統一的做業管理平臺,能在上面定義各類任務來管理下面的設備?

airflow經過DAG配置文件,能輕鬆定義各類任務及任務之間的依賴關係和調度執行,並一個可視化的操做web界面。

airflow有什麼優點?

  • 自帶web管理界面,易上手;

  • 業務代碼和調度代碼徹底解耦;

  • 經過python代碼定義子任務,並支持各類Operate操做器,靈活性大,能知足用戶的各類需求;

  • python開源項目,支持擴展operate等插件,便於二次開發;

  • 相似的工具備akzban,quart等;

Airflow中的做業和任務

  • DAG

概要:DAG(Directed Acyclic Graph)是有向無環圖,也稱爲有向無循環圖。在Airflow中,一個DAG定義了一個完整的做業。同一個DAG中的全部Task擁有相同的調度時間。

參數: dag_id: 惟一識別DAG,方便往後管理

default_args: 默認參數,若是當前DAG實例的做業沒有配置相應參數,則採用DAG實例的default_args中的相應參數

schedule_interval: 配置DAG的執行週期,可採用crontab語法

  • Task

概要:Task爲DAG中具體的做業任務,依賴於DAG,也就是必須存在於某個DAG中。Task在DAG中能夠配置依賴關係(固然也能夠配置跨DAG依賴,可是並不推薦。跨DAG依賴會致使DAG圖的直觀性下降,並給依賴管理帶來麻煩)。

參數:

dag: 傳遞一個DAG實例,以使當前做業屬於相應DAG

task_id: 給任務一個標識符(名字),方便往後管理

owner: 任務的擁有者,方便往後管理

start_date: 任務的開始時間,即任務將在這個時間點以後開始調度

Airflow的調度時間

start_date:在配置中,它是做業開始調度時間。而在談論執行情況時,它是調度開始時間。

schedule_interval:調度執行週期。

execution_date: 執行時間。在Airflow中稱爲執行時間,但其實它並非真實的執行時間。

因此,第一次調度時間:在做業中配置的start_date,且知足schedule_interval的時間點。記錄的execution_date爲做業中配置的start_date的第一個知足schedule_interval的時間。

[舉個例子]

假設咱們配置了一個做業的start_date爲2019年6月2日,配置的schedule_interval爲* 00 12 * * *,那麼第一次執行的時間將是2019年6月3日12點。所以execution_date並非如期字面說的表示執行時間,真正的執行時間是execution_date所顯示的時間的下一個知足schedule_interval的時間點。

代碼示例

# coding: utf-8
# DAG 對象; 咱們將須要它來實例化一個 DAG
from airflow import DAG
import pendulum
# Operators; 咱們須要利用這個對象去執行流程!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# 定義默認參數
default_args = {
    'owner': 'airflow',  # 擁有者名稱
    'depends_on_past': False,   # 是否依賴上一個本身的執行狀態
    'start_date': datetime(2019,7,24,16,45),  # 第一次開始執行的時間,爲格林威治時間,爲了方便測試,通常設置爲當前時間減去執行週期
    'retries': 3,  # 失敗重試次數
    'retry_delay': timedelta(seconds=5)  # 失敗重試間隔
}
 
# 定義DAG,實例化
dag = DAG(
    dag_id='hello_world_dag',  # dag_id
    default_args=default_args,  # 指定默認參數
    # schedule_interval="00, *, *, *, *" # 執行週期,依次是分,時,天,月,年,此處表示每一個整點執行
    schedule_interval=timedelta(minutes=1)  # 執行週期,表示每分鐘執行一次
)


# 定義要執行的Python函數1
def hello_world_1():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_1.txt', 'a') as f:
        f.write('%s\n' % current_time)
    assert 1 == 1  # 能夠在函數中使用assert斷言來判斷執行是否正常,也能夠直接拋出異常
# 定義要執行的Python函數2
def hello_world_2():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_2.txt', 'a') as f:
        f.write('%s\n' % current_time)

————————————————

# 定義要執行的task 1
t1 = PythonOperator(
    task_id='hello_world_1',  # task_id
    python_callable=hello_world_1,  # 指定要執行的函數
    dag=dag,  # 指定歸屬的dag
    retries=2,  # 重寫失敗重試次數,若是不寫,則默認使用dag類中指定的default_args中的設置
)
# 定義要執行的task 2
t2 = PythonOperator(
    task_id='hello_world_2',  # task_id
    python_callable=hello_world_2,  # 指定要執行的函數
    dag=dag,  # 指定歸屬的dag
)
 
t2.set_upstream(t1)  # t2依賴於t1;等價於 t1.set_downstream(t2);同時等價於 dag.set_dependency('hello_world_1', 'hello_world_2')
# 表示t2這個任務只有在t1這個任務執行成功時才執行,
# 或者
#t1 >> t2

複製代碼

咱們能夠看到,整個 DAG 的配置就是一份完整的 Python 代碼,在代碼中實例化 DAG,實例化適合的 Operator,並經過 set_downstream 等方法配置上下游依賴關係。

Web UI

The Airflow UI makes it easy to monitor and troubleshoot your data pipelines. Here’s a quick overview of some of the features and visualizations you can find in the Airflow UI.

  • DAGs View

    List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance.

  • Tree View

    A tree representation of the DAG that spans across time. If a pipeline is late, you can quickly see where the different steps are and identify the blocking ones.

  • Graph View

    The graph view is perhaps the most comprehensive. Visualize your DAG’s dependencies and their current status for a specific run.

  • Variable View

    The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.

  • Gantt Chart

    The Gantt chart lets you analyse task duration and overlap. You can quickly identify bottlenecks and where the bulk of the time is spent for specific DAG runs.

  • Task Duration

    The duration of your different tasks over the past N runs. This view lets you find outliers and quickly understand where the time is spent in your DAG over many runs.

  • Code View

    Transparency is everything. While the code for your pipeline is in source control, this is a quick way to get to the code that generates the DAG and provide yet more context.

學習回顧:

  • 學習airflow官方文檔花費時間太多,其實一天均可以看完,實際看2天;
  • 學習etl技術文檔應該按小時學習計算。
  • 學習資料的搜索上是否準確
  • 學完實踐

向同事學習

  • 項目溝通能力
  • 向項目經理學習(以前在SAP工做)
  • 熟悉業務
  • 學習醫療系統方面的知識

結果

完成數據從原始庫遷移到目標庫。

學習資料

airflow官網

airflow中文

github項目示例

社羣分享

007不寫就出局的主旨:一個普通人經過持續寫做實現成長的平臺 平臺營造:陪伴+監督互助氛圍,「自律+他律踐行環境」,讓更多戰友得到新生。

全國各地的朋友能夠參加,每個月按期會組織線下活動,全國各地的職場人也都在《007不寫就出局》社羣中,歡迎各行各業的夥伴交流。

  • 我爲何寫做

  • 戰友爲何加入007寫做社羣

  • 想加入《007不寫就出局》,掃小企鵝二維碼。

  • 天天早上喜歡聽的商業思惟課程,分享給你們

導師介紹:

敏娘娘: 由中國創業天后級導師 敏娘娘 親自帶隊的 《世界首富思惟學院》即將拉開帷幕。該系統爲中國首家研究-全球世界首富思惟的大學。

敏娘娘老師是微世管理諮詢創始人,是中國企業家教練,新商業生態架構師,爲世界500強服務:中國銀行,中國移動,比亞迪。每一年培訓️萬名企業家,一對一指導企業10年突破️️️家。

  • 服務項目涵蓋:社交新零售、門店服裝、美業、金融業、製造業
  • 擅長:商業模式設計、戰略諮詢、銷售引爆、商學院系統、新商業設計
  • 被譽爲:行走中的印鈔機、女愛因斯坦、財神娘娘

團隊管理篇|執行力《上級如何下達執行指令-提高團隊執行力》

1.表達清晰、明確、有過程描述和結果描述。

2.不要下達模糊指令,下達指令以後要給出方法。
複製代碼

相關文章
相關標籤/搜索