airflow探索篇

airflow是一個 Airbnb 的 Workflow 開源項目,在Github 上已經有超過兩千星。data pipeline調度和監控工做流的平臺,用於用來建立、監控和調整data pipeline。相似的產品有:Azkaban、oozie

pip方式安裝

默認已經安裝python >= 2.7 以及 pip
安裝能夠參考這篇,比較詳細。airflow安裝以及celery方式啓動html

重要說明

使用mysql須要安裝

python 2 : pip install MySQL-python
python 3 : pip install PyMySQL

AIRFLOW_HOME配置說明

上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME設置目錄在airflow initdb的時候初始化,存放airflow的配置文件airflow.cfg及相關文件。python

DAG說明-管理建議

默認$AIRFLOW_HOME/dags存放定義的dag,能夠分目錄管理dag。經常使用管理dag作法,dag存放另外一個目錄經過git管理,並設置軟鏈接映射到$AIRFLOW_HOME/dag。好處方便dag編輯變動,同時dag變動不會出現編輯到一半的時候就加載到airflow中。mysql

plugins說明-算子定義

默認$AIRFLOW_HOME/plugins存放定義的plugins,自定義組件。能夠自定義operator,hook等等。咱們但願能夠直接使用這種模式定義機器學習的一個算子。下面定義了一個簡單的加法算子。git

# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):

    @apply_defaults
    def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
        super(PlusOperator, self).__init__(*args, **kwargs)
        self.params = params or {}
        self.set_context = set_context

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            self.op_kwargs = context

        puls = self.op_kwargs['a'] + self.op_kwargs['b']
        print "a =", self.op_kwargs['a'], ". b=", self.op_kwargs['a']
        return_value = self.main()
        context[self.task_id].xcom_push(key='return_value', value=return_value)
        return puls


# Defining the plugin class
class PlusPlugin(AirflowPlugin):
    name = "plus_plugin"
    operators = [PlusOperator]

在dag中使用案例以下github

from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id='plus_task', provide_context=True, params={'a': 1,'b':2},dag=dag)

一些命令說明

命令 說明
airflow webserver -p 8091 8091啓動webserver,經過頁面查詢不須要能夠不啓動
airflow scheduler 調度器,必須啓動,否則dag無法run起來(使用CeleryExecutor、LocalExecutor時)
airflow run dagid [time] run task instance
airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
run的demo
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-11

# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11

基於CeleryExecutor方式的系統架構

使用celery方式的系統架構圖(官方推薦使用這種方式,同時支持mesos方式部署)。turing爲外部系統,GDags服務幫助拼接成dag,能夠忽略。web

  • 1.master節點webui管理dags、日誌等信息。scheduler負責調度,只支持單節點,多節點啓動scheduler可能會掛掉
  • 2.worker負責執行具體dag中的task。這樣不一樣的task能夠在不一樣的環境中執行。

clipboard.png

基於LocalExecutor方式的系統架構圖

另外一種啓動方式的思考,一個dag分配到1臺機器上執行。若是task不復雜同時task環境相同,能夠採用這種方式,方便擴容、管理,同時沒有master單點問題。sql

clipboard.png

基於源碼的啓動以及二次開發

不少狀況airflow是不知足咱們需求,就須要本身二次開發,這時候就須要基於源碼方式啓動。好比日誌咱們指望經過http的方式提供出來,同其餘系統查看。airflow自動的webserver只提供頁面查詢的方式。apache

下載源碼

github源碼地址 : [https://github.com/apache/inc...]
git clone git@github.com:apache/incubator-airflow.gitjson

切換分支

master分支的表初始化有坑,mysql設置的sql校驗安全級別太高一直建表不成功。這個坑被整的有點慘。v1-8-stable或者v1-9-stable分支均可以。
git checkout v1-8-stableapi

安裝必要Python包

進入incubator-airflow,python setup.py install (沒啥文檔說明,又是一個坑。找了半天)

初始化

直接輸入airflow initdb(python setup.py install這個命令會將airflow安裝進去)

修改配置

進入$AIRFLOE_HOME (默認在~/airflow),修改airflow.cfg,修改mysql配置。能夠查看上面推薦的文章以及上面的[使用mysql須要安裝]

啓動

airflow webserver -p 8085
airflow scheduler

獲取日誌信息的改造

1.進入incubator-airflow/airflow/www/
2.修改views.py
在 class Airflow(BaseView)中添加下面代碼

@expose('/logs')
    @login_required
    @wwwutils.action_logging
    def logs(self):
        BASE_LOG_FOLDER = os.path.expanduser(
            conf.get('core', 'BASE_LOG_FOLDER'))
        dag_id = request.args.get('dag_id')
        task_id = request.args.get('task_id')
        execution_date = request.args.get('execution_date')
        dag = dagbag.get_dag(dag_id)
        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
            **locals())
        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
        loc = loc.format(**locals())
        log = ""
        TI = models.TaskInstance
        session = Session()
        dttm = dateutil.parser.parse(execution_date)
        ti = session.query(TI).filter(
            TI.dag_id == dag_id, TI.task_id == task_id,
            TI.execution_date == dttm).first()
        dttm = dateutil.parser.parse(execution_date)
        form = DateTimeForm(data={'execution_date': dttm})

        if ti:
            host = ti.hostname
            log_loaded = False

            if os.path.exists(loc):
                try:
                    f = open(loc)
                    log += "".join(f.readlines())
                    f.close()
                    log_loaded = True
                except:
                    log = "*** Failed to load local log file: {0}.\n".format(loc)
            else:
                WORKER_LOG_SERVER_PORT = \
                    conf.get('celery', 'WORKER_LOG_SERVER_PORT')
                url = os.path.join(
                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
                ).format(**locals())
                log += "*** Log file isn't local.\n"
                log += "*** Fetching here: {url}\n".format(**locals())
                try:
                    import requests
                    timeout = None  # No timeout
                    try:
                        timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
                    except (AirflowConfigException, ValueError):
                        pass

                    response = requests.get(url, timeout=timeout)
                    response.raise_for_status()
                    log += '\n' + response.text
                    log_loaded = True
                except:
                    log += "*** Failed to fetch log file from worker.\n".format(
                        **locals())

            if not log_loaded:
                # load remote logs
                remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
                remote_log = os.path.join(remote_log_base, log_relative)
                log += '\n*** Reading remote logs...\n'

                # S3
                if remote_log.startswith('s3:/'):
                    log += log_utils.S3Log().read(remote_log, return_error=True)

                # GCS
                elif remote_log.startswith('gs:/'):
                    log += log_utils.GCSLog().read(remote_log, return_error=True)

                # unsupported
                elif remote_log:
                    log += '*** Unsupported remote log location.'

            session.commit()
            session.close()

        if PY2 and not isinstance(log, unicode):
            log = log.decode('utf-8')

        title = "Log"

        return wwwutils.json_response(log)

3.重啓服務,訪問url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

就能夠拿到這個任務在execution_date=2018-01-11的日誌

異步任務思考

案例:task經過http請求大數據操做,拆分一些數據,存入一些臨時表。
方案:
1.新建一張task實例的狀態表如:task_instance_state。
2.擴展一個plugins,如:AsyncHttpOperator。AsyncHttpOperator實現邏輯:

  • 在task_instance_state插入一條running狀態記錄running。
  • 發送http請求給大數據平臺,操做數據。
  • 輪詢查詢task_instance_state狀態是成功、失敗、running。如是running則繼續輪詢,成功、失敗操做相應後續操做。

3.提供一個restful api update task_instance_state,供大數據平臺回調,修改任務實例狀態。

不錯的文章推薦

瓜子云的任務調度系統
Get started developing workflows with Apache Airflow
官網地址
生產環境使用可能遇到的坑
初探airflow
焦油坑
系統研究Airbnb開源項目airflow

相關文章
相關標籤/搜索