使用Airflow來調度Data Lake Analytics的任務

今天咱們來介紹一下使用Airflow來調度 Data Lake Analytics(後面簡稱DLA)的任務執行。DLA做爲一個數據湖的解決方案,
客戶有天天週期性的調度一些任務從DLA查詢數據迴流到業務系統的需求。由於DLA兼容
MySQL的協議,所以全部支持MySQL的協議的調度框架都自然支持DLA,今天就來介紹一下使用業界著名的
Apache Airflow 來調度DLA的做業。python

大體步驟以下:mysql

  1. 購買一個ECS用來運行Airflow
  2. 安裝Airflow
  3. 添加DLA的DB Connection
  4. 開發任務腳本

購買ECS並進行配置

購買ECS的詳細流程這裏就不一一羅列了,很是的簡單,按照官方的購買流程能夠分分鐘完成,須要注意的幾點這裏說一下:web

  • 購買的ECS的Region要和你的數據所在Region(其實也就是你開通DLA的 Region 保持一致)。
  • 購買的ECS須要開通外網訪問權限,由於Airflow的一些網頁控制檯須要經過外網來訪問。
  • ECS購買好以後記得在安全組裏面放行入方向的80端口,由於下面要安裝的Airflow有web頁面,咱們須要經過80端口進行訪問,以下圖:

 

 

同時記錄下這個ECS的外網地址:sql

 

 

安裝Airflow

Airflow是一個Python寫的軟件,所以咱們是經過Python的Package Manager:pip來安裝的,由於咱們要使用MySQL(而不是默認的SQLite) 來做爲Airflow的元數據庫, 所以咱們還要安裝MySQL相關的包:數據庫

# 安裝Airflow自己
sudo pip install apache-airflow[mysql]

# 安裝MySQL相關的依賴
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python

默認安裝的MySQL有一個配置須要調整:apache

# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1

修改完成以後重啓MySQL:瀏覽器

root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.

Airflow 安裝完成以後會在你的本地用戶目錄下產生 ~/airflow 目錄, 它裏面的內容大體以下:安全

root@hello:~/airflow# ll
total 4168
drwxr-xr-x  4 root root    4096 Oct 19 10:40 ./
drwx------ 10 root root    4096 Oct 19 10:40 ../
-rw-r--r--  1 root root   11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x  2 root root    4096 Oct 18 19:32 dags/
drwxr-xr-x  6 root root    4096 Oct 18 17:52 logs/
-rw-r--r--  1 root root    1509 Oct 18 11:38 unittests.cfg

其中 airflow.cfg 是 Airflow集羣的配置文件,各類配置都是在這裏改的,dags 目錄保存咱們寫的任務,後面咱們要寫的任務都是放在這個文件夾裏面。bash

初始化Airflow元數據庫

前面咱們已經安裝了 MySQL 數據庫,如今咱們來建立一個數據庫給Airflow來保存元數據:框架

$ mysql \
    -uroot \
    -proot \
    -e "CREATE DATABASE airflow
        DEFAULT CHARACTER SET utf8
        DEFAULT COLLATE utf8_general_ci;

        GRANT ALL PRIVILEGES
        ON airflow.*
        TO 'airflow'@'localhost'
        IDENTIFIED BY 'airflow';

        FLUSH PRIVILEGES;"
        
$ airflow initdb

到之類爲止,元數據庫就初始化好了。

安裝 Dask

Airflow自己是一個調度工具,任務的具體執行是交給一個叫作Executor的概念來作的,默認配置的executor是 SequentialExecutor, 不適合生產環境使用,分佈式的Executor有 Celery 和 Dask, 可是筆者嘗試過 Celery 以後發現坑有點多,這裏推薦使用 Dask:

安裝Dask:

pip install dask

運行 dask scheduler:

# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786

dask-scheduler --host $DASK_HOST --port $DASK_PORT

運行 dask worker:

dask-worker $DASK_HOST:$DASK_PORT

配置 airflow.cfg

由於使用的不是默認的配置:咱們選擇了使用MySQL來做爲元數據庫,使用Dask來執行任務,所以須要對配置文件: ~/airflow/airflow.cfg 進行修改:

[core]
# 使用Dask來運行任務
executor = DaskExecutor
# 元數據庫的鏈接方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

[dask]
# Dask的調度地址
cluster_address = 127.0.0.1:8786

啓動

到這裏位置全部準備工做作完了,咱們能夠啓動Airflow了,咱們須要啓動 Airflow 的三個模塊:

webserver: 用來承載Airflow的管理控制頁面:

airflow webserver -p 80 -D

scheduler: 任務調度器, 它會監控 ~/airflow/dags 下面咱們定義的任務文件的變化,這樣咱們才能經過管理控制檯及時看到咱們新開發的任務:

airflow scheduler -D

worker: 跟Dask進行交互真正執行任務的:

airflow worker -D

若是一切順利的話,一個Airflow的集羣就已經Ready了,能夠在上面執行任務了。默認安裝裏面已經一些示例的任務, 瀏覽器裏面輸入 http://<你ECS的外網IP> 就能夠看到Airflow的控制頁面了:

 

 

開發咱們本身的任務

咱們的目的是要用Airflow來調度DLA的任務,首先咱們要添加一個鏈接串, Airflow裏面經過Connection來保存鏈接串的具體信息, 打開頁面: http://<你ECS的外網IP>/admin/connection/ 你會看到以下的頁面:

 

 

咱們添加一下DLA的鏈接信息:

 

 

這裏比較重要的兩個點:

  1. 鏈接類型選擇: MySQL (DLA兼容MySQL的協議)
  2. Conn Id很關鍵,後面咱們任務裏面是經過這個Conn Id來訪問數據源的。

開發咱們的任務代碼

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'dlademo', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='echo hello-airflow',
    dag=dag)

def step2(ds, **kargs):
    mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
    for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
        print items

t2 = PythonOperator(
    task_id='execute_dla_sql',
    provide_context=True,
    python_callable=step2,
    dag=dag)

t2.set_upstream(t1)

這個任務裏面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裏面會執行有依賴關係的多個任務,DAG的第一個參數是DAG的名字, 這裏咱們叫 dlademo ,它的第三個參數是調度的週期,這裏是天天調度一次: timedelta(1) 。

第一個任務是執行一個bash命令: echo hello-airflow, 第二個任務則是咱們的SQL任務,這裏寫的比較簡單,經過SQL把DLA數據庫裏面的一張表查詢並打印出來,最後 t2.set_upstream(t1)設置兩個任務之間的依賴關係。

如今咱們打開 http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo 就能夠看到這個任務的詳情了:

 

 

在這個圖中咱們能夠看到咱們定義的兩個任務,以及它們之間的依賴關係。Airflow的功能很是的豐富,更多的功能就留給你們本身去體驗了。

總結

Airflow是Apache的頂級項目,從項目的成熟度和功能的豐富度來講都很不錯,入門也很簡單,很容易就能夠搭建本身的集羣,而且它有本身的Connection機制,使得咱們不須要把數據庫的用戶名密碼暴露在任務腳本里面,使用DLA的同窗們能夠試試Airflow來調度本身的任務。

 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索