[譯] 理解 Apache Airflow 的關鍵概念

四部分系列的第三系列

Quizlet尋找最優工做流管理系統第一部分第二部分中,咱們促進了現代商業實踐中對工做流管理系統(WMS)的需求,並提供了一份但願得到的特性以及功能列表,這使得咱們最後選擇了 Apache Airflow 做爲咱們的 WMS 選擇。這篇文章旨在給好奇的讀者提供提供關於 Airflow 的組件和操做的詳細概述。咱們會經過實現本系列第一部分中介紹的示例工做流(查閱 圖 3.1)來介紹 Airflow 的關鍵概念。前端

圖 3.1:數據處理工做流的示例。android

Airflow 是一種 WMS,即:它將任務以及它們的依賴看做代碼,按照那些計劃規範任務執行,並在 worker 進程之間分發需執行的任務。Airflow 提供了一個用於顯示當前活動任務和過去任務狀態的優秀 UI,並容許用戶手動管理任務的執行和狀態。ios

工做流都是「有向無環圖」

Airflow 中的工做流是具備方向性依賴的任務集合。具體說明則是 Airflow 使用有向有向無環圖 —— 或簡稱的 DAG —— 來表現工做流。圖中的每一個節點都是一個任務,圖中的邊表示的是任務之間的依賴(該圖強制爲無循環的,所以不會出現循環依賴,從而致使無限執行循環)。git

圖 3.2 頂部演示了咱們的示例工做流是如何在 Airflow 中變現爲 DAG 的。注意在圖 1.1 中咱們的示例工做流任務的執行計劃結構與圖 3.2 中的 DAG 結構類似。github

圖 3.2 來自 Airflow UI 的屏幕截圖,表示示例工做流 DAG。面板頂部:1 月 25 號 DagRun 的圖表視圖。深綠色節點表示 TaskInstance 的「成功」狀態。淡綠色描繪了 TaskInstance 的「運行」狀態。底部子面板example_workflow DAG 的樹圖。Airflow 的主要組件在屏幕截圖中高亮顯示,包括 Sensor、Operator、任務、DagRunsTaskInstancesDagRuns 在圖視中表示爲列 —— DagRun 在 1 月 25 號用青色表示。圖示中的每一個方框表示一個 TaskInstance —— 1 月 25 號 爲 perform_currency_conversion 任務的 TaskInstance(「運行態」)用藍色表示。數據庫

在高級別中,能夠將 DAG 看做是一個包含任務極其依賴,什麼時候以及如何設置那些任務的上下文的容器。每一個 DAG 都有一組屬性,最重要的是它的 dag_id,在全部 DAG 中的惟一標識符,它的 start_date 用於說明 DAG 任務被執行的時間,schedule_interval 用於說明任務被執行的頻率。此外,dag_idstart_dateschedule_interval,每一個 DAG 均可以使用一組 default_arguments 進行初始化。這些默認參數由 DAG 中的全部任務繼承。apache

在下列代碼塊中,咱們在 Airflow 中定義了一個用於實現咱們遊戲公司示例工做流的 DAG。json

# 每一個工做流/DAG 都必需要有一個惟一的文本標識符
WORKFLOW_DAG_ID = 'example_workflow_dag'

# 開始/結束時間是 datetime 對象
# 這裏咱們在 2017 年 1 月 1 號開始執行
WORKFLOW_START_DATE = datetime(2017, 1, 1)

# 調度器/重試間隔是 timedelta 對象
# 這裏咱們天天都執行 DAG 任務
WORKFLOW_SCHEDULE_INTERVAL = timedelta(1)

# 默認參數默認應用於全部任務
# 在 DAG 中
WORKFLOW_DEFAULT_ARGS = {
    'owner': 'example',
    'depends_on_past': False,
    'start_date': WORKFLOW_START_DATE,
    'email': ['example@example_company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

# 初始化 DAG
dag = DAG(
    dag_id=WORKFLOW_DAG_ID,
    start_date=WORKFLOW_START_DATE,
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    default_args=WORKFLOW_DEFAULT_ARGS,
)
複製代碼

OperatorsSensors 和 Tasks

儘管 DAG 用於組織並設置執行上下文,但 DAG 不會執行任何實際計算。相反,任務其實是 Airflow 中咱們想要執行「所作工做」的元素。任務有兩種特色:它們能夠執行一些顯示操做,在這種狀況下,它們是 Operator,或者它們能夠暫停執行依賴任務,直到知足某些條件,在這種狀況下,它們是 Sensors。原則上來講,Operator 能夠執行在 Python 中被執行的任何函數。一樣,Sensors 能夠檢查任何進程或者數據結構的狀態。後端

下述代碼塊顯示瞭如何定義一些(假設的)Operator 和 Sensor 類來實現咱們的工做流示例。api

##################################################
# 自定義 Sensors 示例/ Operators (NoOps) #
##################################################

class ConversionRatesSensor(BaseSensorOperator):
    """ An example of a custom Sensor. Custom Sensors generally overload the `poke` method inherited from `BaseSensorOperator` """
    def __init__(self, *args, **kwargs):
        super(ConversionRatesSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        print 'poking {}'.__str__()
        
        # poke functions should return a boolean
        return check_conversion_rates_api_for_valid_data(context)

class ExtractAppStoreRevenueOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts data for a particular app store identified by `app_store_name`. """
    def __init__(self, app_store_name, *args, **kwargs):
        self.app_store_name = app_store_name
        super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # pull data from specific app store
        json_revenue_data = extract_app_store_data(self.app_store_name, context)
        
        # upload app store json data to filestore, can use context variable for 
        # date-specific storage metadata
        upload_appstore_json_data(json_revenue_data, self.app_store_name, context)

class TransformAppStoreJSONDataOperator(BaseOperator):
    """ An example of a custom Operator that takes non-default BaseOperator arguments. Extracts, transforms, and loads data for an array of app stores identified by `app_store_names`. """
    def __init__(self, app_store_names, *args, **kwargs):
        self.app_store_names = app_store_names
        super(TransformJSONDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print 'executing {}'.__str__()
        
        # load all app store data from filestores. context variable can be used to retrieve
        # particular date-specific data artifacts
        all_app_stores_extracted_data = []
        for app_store in self.app_store_names:
            all_app_stores_extracted_data.append(extract_app_store_data(app_store, context))
        
        # combine all app store data, transform to proper format, and upload to filestore 
        all_app_stores_json_data = combine_json_data(all_app_stores_extracted_data)
        app_stores_transformed_data = transform_json_data(all_app_stores_json_data)
        upload_data(app_stores_transformed_data, context)
複製代碼

代碼定義了 BaseSensorOperator 的子類,即 ConversionRatesSensor。這個類實現了全部 BaseSensorOperator 對象必需的 poke 方法。若是下游任務要繼續執行,poke 方法必須返回 True,不然返回 False。在咱們的示例中,這個 sensor 將用於決定什麼時候外部 API 的交換率什麼時候可用。

ExtractAppStoreRevenueOperatorTransformAppStoreJSONDataOperator 這兩個類都繼承自 Airflow 的BaseOperator 類,並實現了 execute 方法。在咱們的示例中,這兩個類的 execute 方法都從應用程序存儲 API 中獲取數據,並將它們轉換爲公司首選的存儲格式。注意 ExtractAppStoreRevenueOperator 也接受一個自定義參數 app_store_name,它告訴類應用程序存儲應該從哪裏獲取請求數據。

注意,Operator 和 Sensor 一般在單獨文件中定義,並導入到咱們定義 DAG 的同名命名空間中。但咱們也能夠將這些類定義添加到同一個 DAG 定義的文件中。

形式上,Airflow 定義任務爲 Sensor 或 Operator 類實例化。實例化任務須要提供一個惟一的 task_id 和 DAG 容器來添加任務(注意:在高於 1.8 的版本中,再也不須要 DAG 對象)。下面的代碼塊顯示瞭如何實例化執行示例工做流所需的全部任務。(注意:咱們假設示例中引用的全部 Operator 都是在命名空間中定義或導入的)。

########################
# 實例化任務 #
########################

# 實例化任務來提取廣告網絡收入
extract_ad_revenue = ExtractAdRevenueOperator(
    task_id='extract_ad_revenue',
    dag=dag)

# 動態實例化任務來提取應用程序存儲數據
APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
    task = ExtractAppStoreRevenueOperator(
        task_id='extract_{}_revenue'.format(app_store),
        dag=dag,
        app_store_name=app_store,
        )
    app_store_tasks.append(task)

# 實例化任務來等待轉換率、數據均衡
wait_for_conversion_rates = ConversionRatesSensor(
    task_id='wait_for_conversion_rates',
    dag=dag)

# 實例化任務,從 API 中提取轉化率
extract_conversion_rates = ExtractConversionRatesOperator(
    task_id='get_conversion_rates',
    dag=dag)

# 實例化任務來轉換電子表格數據
transform_spreadsheet_data = TransformAdsSpreadsheetDataOperator(
    task_id='transform_spreadsheet_data',
    dag=dag) 

# 從全部應用程序存儲中實例化任務轉換 JSON 數據
transform_json_data = TransformAppStoreJSONDataOperator(
    task_id='transform_json_data',
    dag=dag,
    app_store_names=APP_STORES)

# 實例化任務來應用
perform_currency_conversions = CurrencyConversionsOperator(
    task_id='perform_currency_conversions',
    dag=dag)

# 實例化任務來組合全部數據源
combine_revenue_data = CombineDataRevenueDataOperator(
    task_id='combine_revenue_data',
    dag=dag)  

# 實例化任務來檢查歷史數據是否存在
check_historical_data = CheckHistoricalDataOperator(
    task_id='check_historical_data',
    dag=dag)

# 實例化任務來根據歷史數據進行預測
predict_revenue = RevenuePredictionOperator(
    task_id='predict_revenue',
    dag=dag)  
複製代碼

此任務實例化代碼在與 DAG 定義相同的文件/命名空間中執行。咱們能夠看到添加任務的代碼很是簡潔,並且容許經過註解進行內聯文檔。第 10–19 行展現了在代碼中定義工做流的優點之一。咱們可以動態地定義三個不一樣的任務,用於使用 for 循環從每一個應用程序存儲中提取數據。這種方法可能在這個小示例中不會給咱們帶來太大的好處,但隨着應用程序商店數量的增長,好處會日益顯著。

定義任務依賴關係

Airflow 的關鍵優點是定義任務之間依賴關係的簡潔性和直觀約定。下述代碼代表了咱們如何爲示例工做流定義任務依賴關係圖:

###############################
# 定義任務依賴關係 #
###############################

# 依賴設置使用 `.set_upstream` 和/或 
# `.set_downstream` 方法
# (in version >=1.8.1,也可使用
# `extract_ad_revenue << transform_spreadsheet_data` 語法)

transform_spreadsheet_data.set_upstream(extract_ad_revenue)

# 動態定義應用程序存儲依賴項
for task in app_store_tasks:
    transform_json_data.set_upstream(task)

extract_conversion_rates.set_upstream(wait_for_conversion_rates)

perform_currency_conversions.set_upstream(transform_json_data)
perform_currency_conversions.set_upstream(extract_conversion_rates)

combine_revenue_data.set_upstream(transform_spreadsheet_data)
combine_revenue_data.set_upstream(perform_currency_conversions)

check_historical_data.set_upstream(combine_revenue_data)

predict_revenue.set_upstream(check_historical_data) 
複製代碼

同時,此代碼在與 DAG 定義相同的文件/命名空間中運行。任務依賴使用 set_upstreamset_downstream operators 來設置(但在高於 1.8 的版本中,使用移位運算符 <<>> 來更簡潔地執行類似操做是可行的)。一個任務還能夠同時具備多個依賴(例如,combine_revenue_data),或一個也沒有(例如,全部的 extract_* 任務)。

圖 3.2 的頂部子面板顯示了由上述代碼所建立的 Airflow DAG,渲染爲 Airflow 的 UI(稍後咱們會詳細介紹 UI)。 DAG 的依賴結構與在圖 1.1 顯示的咱們爲咱們的示例工做流所提出的執行計劃很是類似。當 DAG 被執行時,Airflow 會使用這種依賴結構來自動肯定哪些任務能夠在任什麼時候間點同時運行(例如,全部的 extract_* 任務)。

DagRuns 和 TaskInstances

一旦咱們定義了 DAG —— 即,咱們已經實例化了任務並定義了它們的依賴項 —— 咱們就能夠基於 DAG 的參數來執行任務。Airflow 中的一個關鍵概念是 execution_time。當 Airflow 調度器正在運行時,它會定義一個用於執行 DAG 相關任務的按期間斷的日期計劃。執行時間從 DAG start_date 開始,並重復每個 schedule_interval。在咱們的示例中,調度時間是 (‘2017–01–01 00:00:00’, ‘2017–01–02 00:00:00’, ...)。對於每個 execution_time,都會建立 DagRun 並在執行時間上下文中進行操做。所以,DagRun 只是具備必定執行時間的 DAG(參見 圖 3.2 的底部子面板)。

全部與 DagRun 關聯的任務都稱爲 TaskInstance。換句話說,TaskInstance 是一個已經實例化並且擁有 execution_date 上下文的任務(參見 圖 3.2 的底部子面板)。DagRuns 和 TaskInstance 是 Airflow 的核心概念。每一個DagRun and TaskInstance 都與記錄其狀態的 Airflow 元數據庫中的一個條目相關聯(例如 「queued」、「running」、「failed」、「skipped」、「up for retry」)。讀取和更新這些狀態是 Airflow 調度和執行過程的關鍵。

Airflow 的架構

在其核心中,Airflow 是創建在元數據庫上的隊列系統。數據庫存儲隊列任務的狀態,調度器使用這些狀態來肯定如何將其它任務添加到隊列的優先級。此功能由四個主要組件編排。(請參閱圖 3.2 的左子面板):

  1. 元數據庫:這個數據庫存儲有關任務狀態的信息。數據庫使用在 SQLAlchemy 中實現的抽象層執行更新。該抽象層將 Airflow 剩餘組件功能從數據庫中乾淨地分離了出來。
  2. 調度器:調度器是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務須要被執行以及任務執行優先級的過程。調度器一般做爲服務運行。
  3. 執行器:Excutor 是一個消息隊列進程,它被綁定到調度器中,用於肯定實際執行每一個任務計劃的工做進程。有不一樣類型的執行器,每一個執行器都使用一個指定工做進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一臺機器上運行的並行進程執行任務。其餘像 CeleryExecutor 的執行器使用存在於獨立的工做機器集羣中的工做進程執行任務。
  4. Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器肯定。

圖 3.2:Airflow 的通常架構。Airflow 的操做創建於存儲任務狀態和工做流的元數據庫之上(即 DAG)。調度器和執行器將任務發送至隊列,讓 Worker 進程執行。WebServer 運行(常常與調度器在同一臺機器上運行)並與數據庫通訊,在 Web UI 中呈現任務狀態和任務執行日誌。每一個有色框代表每一個組件均可以獨立於其餘組件存在,這取決於部署配置的類型。

調度器操做

首先,Airflow 調度器操做看起來更像是黑魔法而不是邏輯程序。也就是說,若是你發現本身正在調試它的執行,那麼瞭解調度器的工做原理久能夠節省大量的時間,爲了讓讀者免於深陷 Airflow 的源代碼(儘管咱們很是推薦它!)咱們用僞代碼概述了調度器的基本操做:

步驟 0. 從磁盤中加載可用的 DAG 定義(填充 DagBag)

當調度器運行時:
	步驟 1. 調度器使用 DAG 定義來標識而且/或者初始化在元數據的 db 中的任何 DagRuns。
	
	步驟 2. 調度器檢查與活動 DagRun 關聯的 TaskInstance 的狀態,解析 TaskInstance 之間的任何依賴,標識須要被執行的 TaskInstance,而後將它們添加至 worker 隊列,將新排列的 TaskInstance 狀態更新爲數據庫中的「排隊」狀態。
	
	步驟 3. 每一個可用的 worker 從隊列中取一個 TaskInstance,而後開始執行它,將此 TaskInstance 的數據庫記錄從「排隊」更新爲「運行」。
	
	步驟 4. 一旦一個 TaskInstance 完成運行,關聯的 worker 就會報告到隊列並更新數據庫中的 TaskInstance 的狀態(例如「完成」、「失敗」等)。
	
	步驟 5. 調度器根據全部已完成的相關 TaskInstance 的狀態更新全部活動 DagRuns 的狀態(「運行」、「失敗」、「完成」)。
	
	步驟 6. 重複步驟 1-5
複製代碼

Web UI

除了主要的調度和執行組件外,Airflow 還支持包括全功能的 Web UI 組件(參閱圖 3.2 的一些 UI 示例),包括:

  1. Webserver:此過程運行一個簡單的 Flask 應用程序,它從元數據庫中讀取全部任務狀態,並讓 Web UI 呈現這些狀態。
  2. Web UI:此組件容許客戶端用戶查看和編輯元數據庫中的任務狀態。因爲調度器和數據庫之間的耦合,Web UI 容許用戶操做調度器的行爲。
  3. 執行日誌:這些日誌由 worker 進程編寫,存儲在磁盤或遠程文件存儲區(例如 GCSS3)中。Webserver 訪問日誌並將其提供給 Web UI。

儘管對於 Airflow 的基本操做來講,這些附加組件都不是必要的,但從功能性角度來講,它們確實使 Airflow 有別於當前的其餘工做流管理。 具體來講,UI 和集成執行日誌容許用戶檢查和診斷任務執行,以及查看和操做任務狀態。

命令行接口

除了調度程序和 Web UI,Airflow 還經過命令行接口(CLI)提供了健壯性的特性。尤爲是,當咱們開發 Airflow 時,發現如下的這些命令很是有用:

  • airflow test DAG_ID TASK_ID EXECUTION_DATE。容許用戶在不影響元數據庫或關注任務依賴的狀況下獨立運行任務。這個命令很適合獨立測試自定義 Operator 類的基本行爲。
  • airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE。在 START_DATEEND_DATE 之間執行歷史數據的回填,而不須要運行調度器。當你須要更改現有工做流的一些業務邏輯並須要更新歷史數據時,這是很好的。(請注意,回填不須要在數據庫中建立 DagRun 條目,由於它們不是由 [SchedulerJob](https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L471) 類運行的)。
  • airflow clear DAG_ID。移除 DAG_ID 元數據庫中的 TaskInstance 記錄。當你迭代工做流/DAG 功能時,這會頗有用。
  • airflow resetdb:雖然你一般不想常常運行這個命令,但若是你須要建立一個「乾淨的歷史記錄」,這是很是有幫助的,這種狀況載最初設置 Airflow 時可能會出現(注意:這個命令隻影響數據庫,不刪除日誌)。

綜上所述,咱們提供了一些更加抽象的概念,做爲 Airflow 的基礎。在此係列的最後部分 installment 中,咱們將討論在生產中部署 Airflow 時的一些更實際的注意事項。

感謝 Laura Oppenheimer

若是發現譯文存在錯誤或其餘須要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可得到相應獎勵積分。文章開頭的 本文永久連接 即爲本文在 GitHub 上的 MarkDown 連接。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索