在Celery中使用Flask的上下文

其實我只是想把郵件發送這個動做移到Celery中執行。
既然用到了Celery,那麼每次發郵件都單獨開一個線程彷佛有點多餘,異步任務仍是交給Celery吧。html

在Flask應用中集成Celery

Celery和Flask一塊兒使用並無什麼不和諧的地方,均可以不用定製的Flask擴展,按照網上隨處可見的示例也很簡單:python

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def send_email():
    ....

然而,稍微上點規模的Flask應用都會使用Factory模式(中文叫工廠函數,我聽着特別扭),即只有在建立Flask實例時,纔會初始化各類擴展,這樣能夠動態的修改擴展程序的配置。好比你有一套線上部署的配置和一套本地開發測試的配置,但願經過不一樣的啓動入口,就使用不一樣的配置。
使用Factory模式的話,上面的代碼大概要修改爲這個樣:git

from flask import Flask
from celery import Celery

app = Flask(__name__)
celery = Celery()

def create_app(config_name):
    app.config.from_object(config[config_name])
    celery.conf.update(app.config)

經過config_name,來動態調整celery的配置。然而,這樣子是不行的!
Celery的__init__()函數會調用celery._state._register_app()直接就經過傳入的配置生成了Celery實例,上面的代碼中,celery = Celery()直接使用默認的amqp做爲了broker,隨後經過celery.conf.update(app.config)是更改不了broker的。這也就是爲何網上的示例代碼中,在定義Celery實例時,就傳入了broker=app.config['CELERY_BROKER_URL'],而不是以後經過celery.conf.update(app.config)傳入。當你的多套配置文件中,broker設置的不一樣時,就悲劇了。github

固然不用本身造輪子,Flask-Celery-Helper就是解決以上問題的FLask擴展。
看看它的__init__()函數:redis

def __init__(self, app=None):
        """If app argument provided then initialize celery using application config values.
        If no app argument provided you should do initialization later with init_app method.
        :param app: Flask application instance.
        """
        self.original_register_app = _state._register_app  # Backup Celery app registration function.
        _state._register_app = lambda _: None  # Upon Celery app registration attempt, do nothing.
        super(Celery, self).__init__()
        if app is not None:
            self.init_app(app)

_state._register_app函數備份,再置爲空。這樣__init__()就不會建立Celery實例了。但若是指定了app,那麼進入init_app,嗯,大多數Flask擴展都有這個函數,用來動態生成擴展實例。flask

def init_app(self, app):
        """Actual method to read celery settings from app configuration and initialize the celery instance.
        :param app: Flask application instance.
        """
        _state._register_app = self.original_register_app  # Restore Celery app registration function.
        if not hasattr(app, 'extensions'):
            app.extensions = dict()
        if 'celery' in app.extensions:
            raise ValueError('Already registered extension CELERY.')
        app.extensions['celery'] = _CeleryState(self, app)

        # Instantiate celery and read config.
        super(Celery, self).__init__(app.import_name, broker=app.config['CELERY_BROKER_URL'])
        ...

_state._register_app函數還原,再執行Celery本來的__init__。這樣就達到動態生成實例的目的了。接着往下看:app

task_base = self.Task

# Add Flask app context to celery instance.
class ContextTask(task_base):
    """Celery instance wrapped within the Flask app context."""
    def __call__(self, *_args, **_kwargs):
        with app.app_context():
            return task_base.__call__(self, *_args, **_kwargs)
setattr(ContextTask, 'abstract', True)
setattr(self, 'Task', ContextTask)

這裏重載了celery.Task類,經過with app.app_context():,在app.app_context()的上下文環境下執行Task。對於一個已生成的Flask實例,應用上下文不會隨便改變。因此這就現實了在Celery中使用Flask的應用上下文。
下面是官方的示例代碼:異步

# extensions.py
from flask_celery import Celery
celery = Celery()

# application.py
from flask import Flask
from extensions import celery

def create_app():
    app = Flask(__name__)
    app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
    app.config['CELERY_BROKER_URL'] = 'redis://localhost'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
    celery.init_app(app)
    return app
    
# tasks.py
from extensions import celery

@celery.task()
def add_together(a, b):
    return a + b

# manage.py
from application import create_app
app = create_app()
app.run()

跟普通的Flask擴展同樣了。async

Celery中使用Flask上下文

在Flask的view函數中調用task.delay()時,這個task至關於一個離線的異步任務,它對Flask的應用上下文和請求上下文一無所知。可是這均可能是異步任務須要用到的。好比發送郵件要用到的render_templateurl_for就分別要用到應用上下文和請求上下文。不在celery中引入它們的話,就是Running code outside of a request
引入應用上下文的工做Flask-Celery-Helper已經幫咱們作好了,在Flask的文檔中也有相關介紹。實現方法和上面Flask-Celery-Helper的同樣。然而,無論是Flask-Celery-Helper仍是Flask文檔,都沒有說起如何在Celery中使用請求上下文。ide

要引入請求上下文,須要考慮這兩個問題:

  1. 如何在Celery中產生請求上下文。Flask中有request_contexttest_request_context能夠產生請求上下文。區別是request_context須要WSGI環境變量environ,而test_request_context根據傳入的參數生成請求上下文。我沒有找到如何在Celery中獲取到WSGI環境變量的方法,因此只能本身傳入相關參數生成請求上下文了。

  2. 請求上下文是隨HTTP請求產生的,要獲取請求上下文,就必須在view函數中處理,view函數經過task.delay()發送Celery任務。因此須要重載task.delay(),以獲取請求上下文。

具體的思路仍是在init_app中重載celery.Task類,經過with app.test_request_context():,在app.test_request_context()的上下文環境下執行Task。
首先獲取request,從中整理出test_request_context()須要的參數。根據test_request_context的函數註釋,它須要的參數和werkzeug.test.EnvironBuilder類的參數同樣。

CONTEXT_ARG_NAME = '_flask_request_context'
def _include_request_context(self, kwargs):
    """Includes all the information about current Flask request context
    as an additional argument to the task.
    """
    if not has_request_context():
        return

    # keys correspond to arguments of :meth:`Flask.test_request_context`
    context = {
        'path': request.path,
        'base_url': request.url_root,
        'method': request.method,
        'headers': dict(request.headers),
        'data': request.form
    }
    if '?' in request.url:
        context['query_string'] = request.url[(request.url.find('?') + 1):]

    kwargs[self.CONTEXT_ARG_NAME] = context

_include_request_context函數從request中提取path,base_url,method,headers,data,query_string。將他們傳入test_request_context,生成僞造的請求上下文能夠覆蓋大多數的使用狀況。
Celery經過apply_async,apply,retry調用異步任務(delayapply_async的簡化方法)。這裏須要重載它們,讓這些函數獲取request:

def apply_async(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply_async(args, kwargs, **rest)

def apply(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).apply(args, kwargs, **rest)

def retry(self, args=None, kwargs=None, **rest):
    self._include_request_context(kwargs)
    return super(ContextTask, self).retry(args, kwargs, **rest)

最後重載celery.Task__call__方法:

def __call__(self, *args, **kwargs):
    """Execute task code with given arguments."""
    call = lambda: super(ContextTask, self).__call__(*args, **kwargs)

    context = kwargs.pop(self.CONTEXT_ARG_NAME, None)
    if context is None or has_request_context():
        return call()

    with app.test_request_context(**context):
        result = call()

        # process a fake "Response" so that
        # ``@after_request`` hooks are executed
        app.process_response(make_response(result or ''))

    return result

context是咱們從request中獲取的參數,將它傳給test_request_context,僞造請求上下文,並在這個上下文環境中執行task。既然僞造了請求,那也得爲這個假請求生成響應,萬一你定義了after_request這個在響應後執行的鉤子呢?經過process_response就能夠激活after_request
注意這裏並無傳入應用上下文,由於Flask在建立請求上下文時,會判斷應用上下文是否爲空,爲空就先建立應用上下文,再建立請求上下文。

完整代碼在這裏
celery = CeleryWithContext()建立的Celery實例就能夠給各類task使用了。
另外建立一個celery_worker.py文件,生成一個Flask實例,供Celery的worker使用。

# celery_worker.py

#!/usr/bin/env python
from app import create_app
from app.extensions import celery

app = create_app()

啓動worker:celery -A celery_worker.celery worker -l info
這下就可使用Celery發郵件了。唉,還真是麻煩。

reference

http://xion.io/post/code/celery-include-flask-request-context.html

博客地址

相關文章
相關標籤/搜索