今天咱們來介紹一下使用Airflow來調度 Data Lake Analytics(後面簡稱DLA)的任務執行。DLA做爲一個數據湖的解決方案,
客戶有天天週期性的調度一些任務從DLA查詢數據迴流到業務系統的需求。由於DLA兼容
MySQL的協議,所以全部支持MySQL的協議的調度框架都自然支持DLA,今天就來介紹一下使用業界著名的
Apache Airflow 來調度DLA的做業。python
大體步驟以下:mysql
購買ECS的詳細流程這裏就不一一羅列了,很是的簡單,按照官方的購買流程能夠分分鐘完成,須要注意的幾點這裏說一下:web
同時記錄下這個ECS的外網地址:sql
Airflow是一個Python寫的軟件,所以咱們是經過Python的Package Manager:pip來安裝的,由於咱們要使用MySQL(而不是默認的SQLite) 來做爲Airflow的元數據庫, 所以咱們還要安裝MySQL相關的包:數據庫
# 安裝Airflow自己 sudo pip install apache-airflow[mysql] # 安裝MySQL相關的依賴 sudo apt-get install mysql-sever sudo apt-get install libmysqlclient-dev sudo pip install mysql-python
默認安裝的MySQL有一個配置須要調整:apache
# /etc/mysql/mysql.conf.d/mysqld.cnf [mysqld] explicit_defaults_for_timestamp = 1
修改完成以後重啓MySQL:瀏覽器
root@hello:~/airflow/dags# /etc/init.d/mysql restart [ ok ] Restarting mysql (via systemctl): mysql.service.
Airflow 安裝完成以後會在你的本地用戶目錄下產生 ~/airflow
目錄, 它裏面的內容大體以下:安全
root@hello:~/airflow# ll total 4168 drwxr-xr-x 4 root root 4096 Oct 19 10:40 ./ drwx------ 10 root root 4096 Oct 19 10:40 ../ -rw-r--r-- 1 root root 11765 Oct 19 10:40 airflow.cfg drwxr-xr-x 2 root root 4096 Oct 18 19:32 dags/ drwxr-xr-x 6 root root 4096 Oct 18 17:52 logs/ -rw-r--r-- 1 root root 1509 Oct 18 11:38 unittests.cfg
其中 airflow.cfg
是 Airflow集羣的配置文件,各類配置都是在這裏改的,dags
目錄保存咱們寫的任務,後面咱們要寫的任務都是放在這個文件夾裏面。bash
前面咱們已經安裝了 MySQL 數據庫,如今咱們來建立一個數據庫給Airflow來保存元數據:框架
$ mysql \ -uroot \ -proot \ -e "CREATE DATABASE airflow DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost' IDENTIFIED BY 'airflow'; FLUSH PRIVILEGES;" $ airflow initdb
到之類爲止,元數據庫就初始化好了。
Airflow自己是一個調度工具,任務的具體執行是交給一個叫作Executor的概念來作的,默認配置的executor是 SequentialExecutor
, 不適合生產環境使用,分佈式的Executor有 Celery
和 Dask
, 可是筆者嘗試過 Celery
以後發現坑有點多,這裏推薦使用 Dask:
安裝Dask:
pip install dask
運行 dask scheduler:
# default settings for a local cluster DASK_HOST=127.0.0.1 DASK_PORT=8786 dask-scheduler --host $DASK_HOST --port $DASK_PORT
運行 dask worker:
dask-worker $DASK_HOST:$DASK_PORT
由於使用的不是默認的配置:咱們選擇了使用MySQL來做爲元數據庫,使用Dask來執行任務,所以須要對配置文件: ~/airflow/airflow.cfg
進行修改:
[core] # 使用Dask來運行任務 executor = DaskExecutor # 元數據庫的鏈接方式 sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow [dask] # Dask的調度地址 cluster_address = 127.0.0.1:8786
到這裏位置全部準備工做作完了,咱們能夠啓動Airflow了,咱們須要啓動 Airflow 的三個模塊:
webserver: 用來承載Airflow的管理控制頁面:
airflow webserver -p 80 -D
scheduler: 任務調度器, 它會監控 ~/airflow/dags
下面咱們定義的任務文件的變化,這樣咱們才能經過管理控制檯及時看到咱們新開發的任務:
airflow scheduler -D
worker: 跟Dask進行交互真正執行任務的:
airflow worker -D
若是一切順利的話,一個Airflow的集羣就已經Ready了,能夠在上面執行任務了。默認安裝裏面已經一些示例的任務, 瀏覽器裏面輸入 http://<你ECS的外網IP>
就能夠看到Airflow的控制頁面了:
咱們的目的是要用Airflow來調度DLA的任務,首先咱們要添加一個鏈接串, Airflow裏面經過Connection來保存鏈接串的具體信息, 打開頁面: http://<你ECS的外網IP>/admin/connection/
你會看到以下的頁面:
咱們添加一下DLA的鏈接信息:
這裏比較重要的兩個點:
from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.hooks.mysql_hook import MySqlHook default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'dlademo', default_args=default_args, schedule_interval=timedelta(1)) t1 = BashOperator( task_id='print_date', bash_command='echo hello-airflow', dag=dag) def step2(ds, **kargs): mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3') for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"): print items t2 = PythonOperator( task_id='execute_dla_sql', provide_context=True, python_callable=step2, dag=dag) t2.set_upstream(t1)
這個任務裏面定義了一個DAG, 一個DAG表示一個任務流程,一個流程裏面會執行有依賴關係的多個任務,DAG的第一個參數是DAG的名字, 這裏咱們叫 dlademo
,它的第三個參數是調度的週期,這裏是天天調度一次: timedelta(1)
。
第一個任務是執行一個bash命令: echo hello-airflow
, 第二個任務則是咱們的SQL任務,這裏寫的比較簡單,經過SQL把DLA數據庫裏面的一張表查詢並打印出來,最後 t2.set_upstream(t1)
設置兩個任務之間的依賴關係。
如今咱們打開 http://<你的ECS公網IP>/admin/airflow/tree?dag_id=dlademo
就能夠看到這個任務的詳情了:
在這個圖中咱們能夠看到咱們定義的兩個任務,以及它們之間的依賴關係。Airflow的功能很是的豐富,更多的功能就留給你們本身去體驗了。
Airflow是Apache的頂級項目,從項目的成熟度和功能的豐富度來講都很不錯,入門也很簡單,很容易就能夠搭建本身的集羣,而且它有本身的Connection機制,使得咱們不須要把數據庫的用戶名密碼暴露在任務腳本里面,使用DLA的同窗們能夠試試Airflow來調度本身的任務。
本文爲雲棲社區原創內容,未經容許不得轉載。