在web開發中咱們常常會遇到一些耗時的操做,好比發送郵件/短信,執行各類任務等等,這時咱們會採起異步的方式去執行這些任務,而celery就是這樣的一個異步的分佈式任務處理框架,官方文檔
今天,咱們的主題是celery如何與flask一塊兒工做,咱們都知道,flask是一個很是小巧的web框架,有許許多多的擴展,celery也不例外,咱們先看下目前經常使用的幾個flask-celery的擴展:python
除這些擴展以外,其實flask的官方文檔中已經給出了在flask中使用celery的方式,不過,那是一個單文件中運行flask的demo,在實際項目中使用,仍是有許多須要注意的地方,接下來,咱們就一塊兒探究下如何在flask項目中使用celery。 mysql
├── celery_task # celery任務相關 │ ├── __init__.py │ ├── tasks.py │ └── test.py ├── manage.py # celery worker實例 ├── requirements.txt # 依賴包 └── test_api # flask 項目 ├── api # 藍本相關 │ ├── __init__.py │ └── v1 │ ├── __init__.py │ └── views.py ├── extensions.py # 擴展初始化 ├── __init__.py # flask app ├── models.py # 模型文件 └── settings.py # 配置文件
本項目中沒有使用擴展,只是基於官方文檔中的示例作進一步的應用。git
from celery import Celery def make_celery(app): celery = Celery( app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery
這是一個celery的工廠函數,使用flask app中的配置設置celery相關的屬性,而且更改了celery對象的Task,使其可以使用flask的應用上下文,這一點很是重要。咱們將這段代碼放置到flask項目初始化文件中去也就是testapi/__init_\.pygithub
celerytask/__init_\.pyweb
rom test_api import create_app, make_celery app = create_app() celery = make_celery(app) class MyTask(celery.Task): # celery 基類 def on_success(self, retval, task_id, args, kwargs): # 執行成功的操做 print('MyTasks 基類回調,任務執行成功') return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): # 執行失敗的操做 # 任務執行失敗,能夠調用接口進行失敗報警等操做 print('MyTasks 基類回調,任務執行失敗') return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
這裏我對Task作了進一步的定製,用於添加一些任務信息。redis
import datetime import time import os import random from flask import current_app from test_api.models import User from test_api.extensions import db from celery_task import celery, MyTask @celery.task(bind=True, base=MyTask) def apptask(self): print(current_app.config) print("==============%s " % current_app.config["SQLALCHEMY_DATABASE_URI"]) print("++++++++++++++%s " % os.getenv("DATABASE_URL")) time.sleep(5) user = User(username="user%s" % random.randint(1,100)) db.session.add(user) db.session.commit() return 'success'
這個任務很簡單,使用User模型類異步向數據庫中添加數據,爲了體現耗時操做,使用sleep函數模擬。sql
test_api/api/v1/views.py數據庫
from flask import jsonify from celery_task.tasks import apptask from test_api.api.v1 import api_v1 from test_api.extensions import db from flask import current_app @api_v1.route("/", methods=["GET"]) def index(): r = apptask.apply_async() return jsonify({"status": "success"})
視圖函數很是的簡單,只作了提交任務的操做。json
爲了不循環導入問題,咱們在項目根目錄下新建manage.pyflask
from test_api import create_app, make_celery app = create_app() celery = make_celery(app) if __name__ == '__main__': app.run()
這個文件只用來啓動celery,啓動命令以下:
# celery worker -A manage:celery -l debug
看到以下輸出,代表啓動成功:
-------------- celery@test-3 v4.4.0 (cliffs) --- ***** ----- -- ******* ---- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2020-03-03 21:14:13 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: test_api:0x7f87c31a4e48 - ** ---------- .> transport: redis://127.0.0.1:6379/3 - ** ---------- .> results: redis://127.0.0.1:6379/4 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . celery_task.tasks.apptask [2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Hub [2020-03-03 21:14:13,632: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:13,632: DEBUG/MainProcess] | Worker: Starting Pool [2020-03-03 21:14:13,690: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:13,691: DEBUG/MainProcess] | Worker: Starting Consumer [2020-03-03 21:14:13,691: DEBUG/MainProcess] | Consumer: Starting Connection [2020-03-03 21:14:13,708: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3 [2020-03-03 21:14:13,708: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:13,708: DEBUG/MainProcess] | Consumer: Starting Events [2020-03-03 21:14:13,718: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:13,718: DEBUG/MainProcess] | Consumer: Starting Mingle [2020-03-03 21:14:13,718: INFO/MainProcess] mingle: searching for neighbors [2020-03-03 21:14:14,743: INFO/MainProcess] mingle: all alone [2020-03-03 21:14:14,743: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:14,744: DEBUG/MainProcess] | Consumer: Starting Gossip [2020-03-03 21:14:14,748: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:14,748: DEBUG/MainProcess] | Consumer: Starting Heart [2020-03-03 21:14:14,750: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:14,750: DEBUG/MainProcess] | Consumer: Starting Tasks [2020-03-03 21:14:14,756: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:14,756: DEBUG/MainProcess] | Consumer: Starting Control [2020-03-03 21:14:14,759: DEBUG/MainProcess] ^-- substep ok [2020-03-03 21:14:14,759: DEBUG/MainProcess] | Consumer: Starting event loop [2020-03-03 21:14:14,759: DEBUG/MainProcess] | Worker: Hub.register Pool... [2020-03-03 21:14:14,760: INFO/MainProcess] celery@test-3 ready. [2020-03-03 21:14:14,760: DEBUG/MainProcess] basic.qos: prefetch_count->8
啓動flask:
# flask run * Serving Flask app "test_api" (lazy loading) * Environment: development * Debug mode: on * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit) * Restarting with stat * Debugger is active! * Debugger PIN: 237-492-852
調試接口:
# curl http://127.0.0.1:5000/api/v1/ { "status": "success" }
查看celery日誌:
[2020-03-03 21:17:31,330: WARNING/ForkPoolWorker-2] [2020-03-03 21:17:31,330: DEBUG/MainProcess] Task accepted: celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] pid:2341 [2020-03-03 21:17:36,391: WARNING/ForkPoolWorker-2] MyTasks 基類回調,任務執行成功 [2020-03-03 21:17:36,392: INFO/ForkPoolWorker-2] Task celery_task.tasks.apptask[5f27a148-161f-4485-931f-17d94637168e] succeeded in 5.0624741315841675s: 'success'
任務執行成功,查看數據庫數據:
mysql> select * from user order by id; +----+----------+ | id | username | +----+----------+ | 1 | user26 | | 2 | user69 | | 3 | user71 | | 4 | user35 | | 5 | user13 | | 6 | user54 | | 7 | user88 | | 8 | user63 | | 9 | user87 | | 10 | user90 | | 11 | user3 | | 12 | user18 | | 13 | user65 | +----+----------+
數據已被插入,實驗成功!
有幾個坑但願你們注意下
出錯文件: testapi/__init_\.py
import os import click from flask import Flask, jsonify from test_api.api.v1 import api_v1 # 藍圖在上方導入,循環報錯產生 from test_api.settings import config from test_api.models import User from celery import Celery def make_celery(app): ... def create_app(config_name=None): if config_name is None: config_name = os.getenv('FLASK_ENV', 'development') app = Flask('test_api') app.config.from_object(config[config_name]) register_extensions(app) register_blueprints(app) register_commands(app) register_errors(app) return app # 註冊藍圖函數 def register_blueprints(app): app.register_blueprint(api_v1, url_prefix='/api/v1')
啓動celery和請求接口時均會報錯,錯誤堆棧以下:
from test_api import create_app, make_celery File "/tmp/test/test_api/__init__.py", line 5, in <module> from test_api.api.v1 import api_v1 File "/tmp/test/test_api/api/v1/__init__.py", line 9, in <module> from test_api.api.v1 import views File "/tmp/test/test_api/api/v1/views.py", line 2, in <module> from celery_task.tasks import apptask File "/tmp/test/celery_task/__init__.py", line 1, in <module> from test_api import create_app, make_celery ImportError: cannot import name 'create_app'
將藍圖的導入下放置藍圖註冊函數中testapi/__init_\.py:
... def register_blueprints(app): from test_api.api.v1 import api_v1 app.register_blueprint(api_v1, url_prefix='/api/v1') ...
提交任務,celery報錯以下:
... options = self.get_options(sa_url, echo) File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 575, in get_options self._sa.apply_driver_hacks(self._app, sa_url, options) File "/tmp/py3/lib/python3.6/site-packages/flask_sqlalchemy/__init__.py", line 877, in apply_driver_hacks if sa_url.drivername.startswith('mysql'): AttributeError: 'NoneType' object has no attribute 'drivername'
經過調試我發現,flask的app的配置是能夠拿到的,由於咱們在工廠函數中推送了應用上下文,個人數據庫配置信息是以鍵值的形式寫在了.env文件中,這也是目前flask推薦的方式。那爲何celery取不到數據庫鏈接配置呢?其實,啓動celery的app和咱們web服務所用app是兩個獨立的app,celery沒法經過.env中的環境變量取到相應的值,這裏有三種解決辦法:
不使用環境變量的方式,直接將相關信息寫在配置文件中例如: SQLALCHEMY_DATABASE_URI = "mysql+pymysql://xxx:xxx@127.0.0.1:3306/test?charset=utf8"
相比之下,方案三是採納比較多的,因而咱們在test_api/settings.py文件中加入以下代碼:
from dotenv import find_dotenv, load_dotenv load_dotenv(find_dotenv())
find_dotenv函數會在當前以及父目錄中搜尋.env文件,load_dotenv函數則負責加載環境變量。如此,大功告成。咱們能夠繼續愉快擼代碼啦。
附:項目源碼