其實我只是想把郵件發送這個動做移到Celery中執行。
既然用到了Celery,那麼每次發郵件都單獨開一個線程彷佛有點多餘,異步任務仍是交給Celery吧。html
Celery和Flask一塊兒使用並無什麼不和諧的地方,均可以不用定製的Flask擴展,按照網上隨處可見的示例也很簡單:python
from flask import Flask from celery import Celery app = Flask(__name__) app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) @celery.task def send_email(): ....
然而,稍微上點規模的Flask應用都會使用Factory模式(中文叫工廠函數,我聽着特別扭),即只有在建立Flask實例時,纔會初始化各類擴展,這樣能夠動態的修改擴展程序的配置。好比你有一套線上部署的配置和一套本地開發測試的配置,但願經過不一樣的啓動入口,就使用不一樣的配置。
使用Factory模式的話,上面的代碼大概要修改爲這個樣:git
from flask import Flask from celery import Celery app = Flask(__name__) celery = Celery() def create_app(config_name): app.config.from_object(config[config_name]) celery.conf.update(app.config)
經過config_name
,來動態調整celery的配置。然而,這樣子是不行的!
Celery的__init__()
函數會調用celery._state._register_app()
直接就經過傳入的配置生成了Celery實例,上面的代碼中,celery = Celery()
直接使用默認的amqp做爲了broker,隨後經過celery.conf.update(app.config)
是更改不了broker的。這也就是爲何網上的示例代碼中,在定義Celery實例時,就傳入了broker=app.config['CELERY_BROKER_URL']
,而不是以後經過celery.conf.update(app.config)
傳入。當你的多套配置文件中,broker設置的不一樣時,就悲劇了。github
固然不用本身造輪子,Flask-Celery-Helper就是解決以上問題的FLask擴展。
看看它的__init__()
函數:redis
def __init__(self, app=None): """If app argument provided then initialize celery using application config values. If no app argument provided you should do initialization later with init_app method. :param app: Flask application instance. """ self.original_register_app = _state._register_app # Backup Celery app registration function. _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing. super(Celery, self).__init__() if app is not None: self.init_app(app)
將_state._register_app
函數備份,再置爲空。這樣__init__()
就不會建立Celery實例了。但若是指定了app
,那麼進入init_app
,嗯,大多數Flask擴展都有這個函數,用來動態生成擴展實例。flask
def init_app(self, app): """Actual method to read celery settings from app configuration and initialize the celery instance. :param app: Flask application instance. """ _state._register_app = self.original_register_app # Restore Celery app registration function. if not hasattr(app, 'extensions'): app.extensions = dict() if 'celery' in app.extensions: raise ValueError('Already registered extension CELERY.') app.extensions['celery'] = _CeleryState(self, app) # Instantiate celery and read config. super(Celery, self).__init__(app.import_name, broker=app.config['CELERY_BROKER_URL']) ...
將_state._register_app
函數還原,再執行Celery本來的__init__
。這樣就達到動態生成實例的目的了。接着往下看:app
task_base = self.Task # Add Flask app context to celery instance. class ContextTask(task_base): """Celery instance wrapped within the Flask app context.""" def __call__(self, *_args, **_kwargs): with app.app_context(): return task_base.__call__(self, *_args, **_kwargs) setattr(ContextTask, 'abstract', True) setattr(self, 'Task', ContextTask)
這裏重載了celery.Task
類,經過with app.app_context():
,在app.app_context()
的上下文環境下執行Task。對於一個已生成的Flask實例,應用上下文不會隨便改變。因此這就現實了在Celery中使用Flask的應用上下文。
下面是官方的示例代碼:異步
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config['CELERY_IMPORTS'] = ('tasks.add_together', ) app.config['CELERY_BROKER_URL'] = 'redis://localhost' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost' celery.init_app(app) return app # tasks.py from extensions import celery @celery.task() def add_together(a, b): return a + b # manage.py from application import create_app app = create_app() app.run()
跟普通的Flask擴展同樣了。async
在Flask的view函數中調用task.delay()
時,這個task至關於一個離線的異步任務,它對Flask的應用上下文和請求上下文一無所知。可是這均可能是異步任務須要用到的。好比發送郵件要用到的render_template
和url_for
就分別要用到應用上下文和請求上下文。不在celery中引入它們的話,就是Running code outside of a request
。
引入應用上下文的工做Flask-Celery-Helper已經幫咱們作好了,在Flask的文檔中也有相關介紹。實現方法和上面Flask-Celery-Helper的同樣。然而,無論是Flask-Celery-Helper仍是Flask文檔,都沒有說起如何在Celery中使用請求上下文。ide
要引入請求上下文,須要考慮這兩個問題:
如何在Celery中產生請求上下文。Flask中有request_context
和test_request_context
能夠產生請求上下文。區別是request_context
須要WSGI環境變量environ
,而test_request_context
根據傳入的參數生成請求上下文。我沒有找到如何在Celery中獲取到WSGI環境變量的方法,因此只能本身傳入相關參數生成請求上下文了。
請求上下文是隨HTTP請求產生的,要獲取請求上下文,就必須在view函數中處理,view函數經過task.delay()
發送Celery任務。因此須要重載task.delay()
,以獲取請求上下文。
具體的思路仍是在init_app
中重載celery.Task
類,經過with app.test_request_context():
,在app.test_request_context()
的上下文環境下執行Task。
首先獲取request,從中整理出test_request_context()須要的參數。根據test_request_context
的函數註釋,它須要的參數和werkzeug.test.EnvironBuilder類的參數同樣。
CONTEXT_ARG_NAME = '_flask_request_context' def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { 'path': request.path, 'base_url': request.url_root, 'method': request.method, 'headers': dict(request.headers), 'data': request.form } if '?' in request.url: context['query_string'] = request.url[(request.url.find('?') + 1):] kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context
函數從request中提取path
,base_url
,method
,headers
,data
,query_string
。將他們傳入test_request_context
,生成僞造的請求上下文能夠覆蓋大多數的使用狀況。
Celery經過apply_async
,apply
,retry
調用異步任務(delay
是apply_async
的簡化方法)。這裏須要重載它們,讓這些函數獲取request:
def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest)
最後重載celery.Task
的__call__
方法:
def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or '')) return result
context是咱們從request中獲取的參數,將它傳給test_request_context,僞造請求上下文,並在這個上下文環境中執行task。既然僞造了請求,那也得爲這個假請求生成響應,萬一你定義了after_request
這個在響應後執行的鉤子呢?經過process_response
就能夠激活after_request
。
注意這裏並無傳入應用上下文,由於Flask在建立請求上下文時,會判斷應用上下文是否爲空,爲空就先建立應用上下文,再建立請求上下文。
完整代碼在這裏。celery = CeleryWithContext()
建立的Celery實例就能夠給各類task使用了。
另外建立一個celery_worker.py文件,生成一個Flask實例,供Celery的worker使用。
# celery_worker.py #!/usr/bin/env python from app import create_app from app.extensions import celery app = create_app()
啓動worker:celery -A celery_worker.celery worker -l info
這下就可使用Celery發郵件了。唉,還真是麻煩。
http://xion.io/post/code/celery-include-flask-request-context.html