Airflow教程-使用Airflow實現ETL調度

1、Airflow是什麼

airflow 是一個編排、調度和監控workflow的平臺,由Airbnb開源,如今在Apache Software Foundation 孵化。airflow 將workflow編排爲由tasks組成的DAGs(有向無環圖),調度器在一組workers上按照指定的依賴關係執行tasks。同時,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操做,而且airflow提供了監控和報警系統。python

2、Airflow的核心概念

  1. DAGs:即有向無環圖(Directed Acyclic Graph),將全部須要運行的tasks按照依賴關係組織起來,描述的是全部tasks執行的順序。
  2. Operators:airflow內置了不少operators,如BashOperator 執行一個bash 命令,PythonOperator 調用任意的Python 函數,EmailOperator 用於發送郵件,HTTPOperator 用於發送HTTP請求, SqlOperator 用於執行SQL命令...同時,用戶能夠自定義Operator,這給用戶提供了極大的便利性。能夠理解爲用戶須要的一個操做,是Airflow提供的類
  3. Tasks:Task 是 Operator的一個實例
  4. Task Instance:因爲Task會被重複調度,每次task的運行就是不一樣的task instance了。Task instance 有本身的狀態,包括"running", "success", "failed", "skipped", "up for retry"等。
  5. Task Relationships:DAGs中的不一樣Tasks之間能夠有依賴關係

3、使用AirFlow完整天級的任務調度

說了這麼多抽象的概念,估計看官仍是雲裏霧裏,下面就直接舉個例子來講明吧。mysql

##1. 安裝airflow Airflow能夠約等於只支持linux和mac,Windows上極其難裝,筆者放棄了. 安裝也很簡單,如下代碼來自官方文檔,使用了Python的pip管理:linux

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

安裝好了之後訪問localhost:8080便可訪問ui界面web

2. 基本配置

  1. 須要建立~/airflow/dags目錄,這個目錄是默認的存放DAG的地方,想修改的話能夠修改~/airflow/airflow.cfg文件
  2. 修改airflow的數據庫 airflow會使用sqlite做爲默認的數據庫,此狀況下airflow進行調度的任務都只能單個的執行.在調度任務量不大的狀況下,可使用sqlite做爲backend.若是想scale out的話,須要修改配置文件,官方推薦使用mysql或者postgresql做爲backend數據庫.

3. 使用PostgresOperator執行SQL完成ETL任務

經過蒐集信息,瞭解到PostgresOperator能執行SQL,而且還支持傳參數.能解決大多數ETL任務中的傳參問題.傳參使用的是Python的Jinjia模塊.sql

  1. 建立DAG 首先建立一個test_param_sql.py文件.內容以下:
from datetime import datetime, timedelta
import airflow
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 26), #start_date會決定這個DAG從哪天開始生效
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
# Variable是Airflow提供的用戶自定義變量的功能,在UI界面的Admin -> Variable下能夠進行增刪改查,此處筆者定義了sql_path做爲存放sql文件的地方
tmpl_search_path = Variable.get("sql_path")  

dag = airflow.DAG(
    'test_param_sql',
    schedule_interval=timedelta(days=1), # schedule_interval是調度的頻率
    template_searchpath=tmpl_search_path, 
    default_args=args,
    max_active_runs=1)

test_param_sql = PostgresOperator(
    task_id='test_param_sql',
    postgres_conn_id='postgres_default',
    sql='param_sql.sql',
    dag=dag,
    params={'period': '201905'},
    pool='pricing_pool')

match_finish = DummyOperator(
    task_id='match_finish',
    dag=dag
)

test_param_sql >> match_finish
  1. 準備要執行的Sql文件 建立test_sql.sql文件. SQL文件會被Jinjia解析,可使用一些宏來實現時間的替換 例

{{ ds }} 會被轉換爲當天的 YYYY-MM-DD 格式的日期數據庫

{{ ds_nodash }} 會被轉換爲當天的 YYYYMMDD的格式的日期apache

在本例裏則是經過{{params.period}} 取到了 DAG上傳入的參數,bash

insert into test.param_sql_test
select * from test.dm_input_loan_info_d
where period = {{params.period}};
  1. 總體的目錄結構以下 dags/ test_param_sql.py sql/ test_sql.sql函數

  2. 測試dag是否正確 可使用 airflow test dag_id task_id date 進行測試,測試會執行Operator,Operator指定的行爲會進行調度. 可是不會將執行的行爲記錄到Airflow的數據庫裏工具

  3. 發佈 把文件放到~/airflow/dags目錄下,sql文件不要放在dags目錄下,能夠找其餘地方(好比同級目錄),配置好上文說到的Variable,能找到便可.筆者的理解是,airflow會掃描dags目錄下的內容,並嘗試解析成dag,若是有不能成功解析的內容,ui界面上會有錯誤提示,致使dag顯示不出來等問題.

其餘有用的信息

  1. 如何在dag.py裏引入其餘的本地python模塊 須要把本地的python模塊放到一個zip文件裏,例如: my_dag1.py my_dag2.py package1/init.py package1/functions.py 而後把這個zip文件放到dags目錄下,才能被正確解析

  2. pooling能夠控制任務的並行度,若是給DAG指定了一個不存在的pooling,任務會一直處於scheduled的狀態,不繼續進行

相關文章
相關標籤/搜索