用 Flask 來寫個輕博客 (26) — 使用 Flask-Celery-Helper 實現異步任務

Blog 項目源碼:https://github.com/JmilkFan/JmilkFan-s-Bloghtml

目錄

前文列表

用 Flask 來寫個輕博客 (1) — 建立項目
用 Flask 來寫個輕博客 (2) — Hello World!
用 Flask 來寫個輕博客 (3) — (M)VC_鏈接 MySQL 和 SQLAlchemy
用 Flask 來寫個輕博客 (4) — (M)VC_建立數據模型和表
用 Flask 來寫個輕博客 (5) — (M)VC_SQLAlchemy 的 CRUD 詳解
用 Flask 來寫個輕博客 (6) — (M)VC_models 的關係(one to many)
用 Flask 來寫個輕博客 (7) — (M)VC_models 的關係(many to many)
用 Flask 來寫個輕博客 (8) — (M)VC_Alembic 管理數據庫結構的升級和降級
用 Flask 來寫個輕博客 (9) — M(V)C_Jinja 語法基礎快速概覽
用 Flask 來寫個輕博客 (10) — M(V)C_Jinja 經常使用過濾器與 Flask 特殊變量及方法
用 Flask 來寫個輕博客 (11) — M(V)C_建立視圖函數
用 Flask 來寫個輕博客 (12) — M(V)C_編寫和繼承 Jinja 模板
用 Flask 來寫個輕博客 (13) — M(V)C_WTForms 服務端表單檢驗
用 Flask 來寫個輕博客 (14) — M(V)C_實現項目首頁的模板
用 Flask 來寫個輕博客 (15) — M(V)C_實現博文頁面評論表單
用 Flask 來寫個輕博客 (16) — MV(C)_Flask Blueprint 藍圖
用 Flask 來寫個輕博客 (17) — MV(C)_應用藍圖來重構項目
用 Flask 來寫個輕博客 (18) — 使用工廠模式來生成應用對象
用 Flask 來寫個輕博客 (19) — 以 Bcrypt 密文存儲帳戶信息與實現用戶登錄表單
用 Flask 來寫個輕博客 (20) — 實現註冊表單與應用 reCAPTCHA 來實現驗證碼
用 Flask 來寫個輕博客 (21) — 結合 reCAPTCHA 驗證碼實現用戶註冊與登陸
用 Flask 來寫個輕博客 (22) — 實現博客文章的添加和編輯頁面
用 Flask 來寫個輕博客 (23) — 應用 OAuth 來實現 Facebook 第三方登陸
用 Flask 來寫個輕博客 (24) — 使用 Flask-Login 來保護應用安全
用 Flask 來寫個輕博客 (25) — 使用 Flask-Principal 實現角色權限功能python

擴展閱讀

Celery-分佈式任務隊列
基於後臺做業的 Celery
Flask-Celery-Helper 1.1.0git

Celery

Celery 是使用 Python 多任務庫來編寫的任務隊列工具, 能夠 並行 的執行任務. 咱們會將執行時間較長但又不那麼追求實時的功能以異步任務的形式完成, EG. 上傳文件, 發送郵件…, Python 和 Celery 之間須要一箇中間人(消息隊列)來進行任務隊列的管理, Celery 官方推薦使用 RabbirMQ 或 Redis 來充當這個角色. 固然也能夠同時使用二者, 其中 MQ 做爲中間人, Redis 傳遞 Celery 執行的結果給應用端. 這樣作的優點在於, 返回給應用的結果是持久化保存在數據庫中的.github

消息隊列: 是一種專門設計的系統, 用於在生產者(往隊列發送消息的程序)和消費者(從隊列中取出消息的隊列)之間傳遞消息.web

這裏寫圖片描述

  • 安裝 Celery
pip install Celery
  • 安裝 Flask-Celery-Helper
    Flask-Celery-Helper 是一個 Flask 擴展, 用於輔助使用 app 來初始化 Celery 對象, 使其得以註冊到 app 對象中.
pip install Flask-Celery-Helper
pip freeze > requirments.txt
/etc/init.d/rabbitmq-server start

將 Celery 加入到應用中

  • 配置 Celery 鏈接 RabbitMQ 的 URL
    vim jmilkfansblog/config.py
class DevConfig(Config):
    """Development config class."""
...
    # Celery <--> RabbitMQ connection
    CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672//"
    CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"

NOTE: RabbitMQ 使用默認的 guest 用戶, 端口爲 5672sql

  • 建立 celery 對象
    vim jmilkfansblog/extensions.py
from flask.ext.celery import Celery

...

# Create the Flask-Celery-Helper's instance
flask_celery = Celery()
  • 將 celery 對象註冊到 app 對象
    vim jmilkfansblog/__init__.py
from jmilkfansblog.extensions import flask_celery

def create_app(object_name):
    """Create the app instance via `Factory Method`"""
...
    # Init the Flask-Celery-Helper via app object
    # Register the celery object into app object
    flask_celery.init_app(app)

NOTE 1: Celery 的進程必須在 Flask app 的上下文中運行, 這樣 Celery 纔可以跟其餘的 Flask 擴展協同工做。因此必須將 flask_celery 對象註冊到 app 對象中, 而且每建立一個 Celery 進程都須要建立一個新的 Flask app 對象, 這裏咱們使用工廠模式來建立 celery 對象。這一點是很是重要的,實際上 Flask application 和 Celery application 是兩個不一樣的進程,在 Celery 沒有加入 Flask 上下文的狀況下,Celery 的程序邏輯就不能輕易的訪問 Flask 相關資源,好比不能加載 Flask 的環境配置信息,沒法經過 Flask 來訪問數據庫,不能使用 Flask 的擴展功能等。若是想作到這些,Celery 都須要本身再實現一套相同的邏輯,這樣作顯然是沒有必要的。因此 Flask application 原生支持將本身的 Context 嵌入到別的 application 中,固然有些狀況也須要相應擴展的輔助,例如 Flask-Celery-Helper 在這裏就充當着這個角色。數據庫

NOTE 2: flask_celery 對象是 Flask-Celery-Helper 擴展的對象, 用於輔助處理 Celery 的初始化, 因此實際上咱們是能夠不使用這個擴展, 而直接使用 Celery 的. celery 對象纔是真正的 Celery 的對象.flask

  • 使用工廠模式來建立 celery 對象
    ./celery_runner.py
mport os

from celery import Celery

from jmilkfansblog import create_app


def make_celery(app):
    """Create the celery process."""

    # Init the celery object via app's configuration.
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL'])

    # Flask-Celery-Helpwe to auto-setup the config.
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):

        abstract = True

        def __call__(self, *args, **kwargs):
            """Will be execute when create the instance object of ContextTesk."""

            # Will context(Flask's Extends) of app object(Producer Sit) 
            # be included in celery object(Consumer Site).
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    # Include the app_context into celery.Task.
    # Let other Flask extensions can be normal calls.
    celery.Task = ContextTask
    return celery

env = os.environ.get('BLOG_ENV', 'dev')
flask_app = create_app('jmilkfansblog.config.%sConfig' % env.capitalize())
# 1. Each celery process needs to create an instance of the Flask application.
# 2. Register the celery object into the app object.
celery = make_celery(flask_app)

NOTE 1: 咱們之後會以 CLI 的形式來管理和控制 Celery 的 worker, 因此咱們將 celery 對象的實現模塊放置在 ./celery_runner.py 中, 而不是 jmilkfansblog/celery_runner.py. Flask app 內部的 tasks 任何的定義和實現就交由 Flask-Celery-Helper 來支持就行了, 這也是 Flask-Celery-Helper 存在的意義.vim

NOTE 2: make_celery()最重要的做用就是讓每一個 Celery 的進程中(celery對象)都包含有 app 對象的上下文, 至於爲何這麼作呢? 上述已經給出了答案.api

NOTE 3: 這裏經過 create_app() 建立的對象不可以命名爲 app, 而是命名爲 flask_app, 這是由於 Celery 會默認將命名爲 app 或 celery 的對象都做爲一個 Celery 對象來處理.

NOTE 4: celery.conf.update(app.config) 會將 app 對象的 config 更新到 celery 對象中, 固然也包括了剛剛定義的 RabbitMQ 鏈接 URL 配置.

  • 啓動 Celery 服務
(env) jmilkfan@JmilkFan-Devstack:/opt/JmilkFan-s-Blog$ celery worker -A celery_runner --loglevel=info
...

 -------------- celery@JmilkFan-Devstack v4.0.1 (latentcall) ---- **** ----- --- * *** * -- Linux-4.4.0-53-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-15 19:12:33 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: jmilkfansblog:0x7f5b24345990 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----  -------------- [queues]                 .> celery           exchange=celery(direct) key=celery


[tasks]
  . jmilkfansblog.tasks.digest
  . jmilkfansblog.tasks.log
  . jmilkfansblog.tasks.multiply
  . jmilkfansblog.tasks.remind

NOTE: 在啓動 Celery 服務的時候, 可以用 Output 看見其自身的配置信息和如今所處理的 tasks.

實現向新用戶發送歡迎郵件

下面使用 Celery 實如今用戶建立帳戶以後, 在指定的時間內異步的向新用戶發送歡迎郵件.

  • 添加 Reminder Model 用戶存放用戶的 email 地址和歡迎內容.
    vim jmilkfansblog/models.py
class Reminder(db.Model):
    """Represents Proected reminders."""

    __tablename__ = 'reminders'
    id = db.Column(db.String(45), primary_key=True)
    date = db.Column(db.DateTime())
    email = db.Column(db.String(255))
    text = db.Column(db.Text())

    def __init__(self, id, text):
        self.id = id
        self.email = text

    def __repr__(self):
        return '<Model Reminder `{}`>'.format(self.text[:20])
  • 建立一個 task
    vim jmilkfansblog/tasks.py
import smtplib
import datetime
from email.mime.text import MIMEText

from flask_mail import Message

from jmilkfansblog.extensions import flask_celery, mail

@flask_celery.task(
    bind=True,
    igonre_result=True,
    default_retry_delay=300,
    max_retries=5)
def remind(self, primary_key):
    """Send the remind email to user when registered. Using Flask-Mail. """

    reminder = Reminder.query.get(primary_key)

    msg = MIMEText(reminder.text)
    msg['Subject'] = 'Welcome!'
    msg['FROM'] = <your_email>
    msg['To'] = reminder.email

    try:
        smtp_server = smtplib.SMTP('localhost')
        smtp_server.starttls()
        smtp_server.login(<user>, <password>)
        smtp_server.sendmail(<your_email>,
                             [reminder.email],
                             msg.as_string())
        smtp_server.close()
        return

    except Exception as err:
        self.retry(exc=err)

def on_reminder_save(mapper, connect, self):
    """Callbask for task remind."""
    remind.apply_async(args=(self.id), eta=self.date)

NOTE 1: Celery Task 本質上就是一個被 celery.task()裝飾過的函數,

NOTE 2: 使用主鍵 primary_key 來獲取 reminder 對象是爲了不數據競態的發生, 由於從生成 reminder 對象到 task 被執行的過程並不能保證數據是最新的, 這也是處理異步調用時, 須要時刻注意的地方.

NOTE 3: on_reminder_save() 是一個回調函數, 當咱們在一個特定的情景下調用這個函數的時候就觸發了一個 Celery task.

  • 應用 SQLAlchemy 的 event 特性來觸發 Celery task
    vim jmilkfansblog/__init__.py
from sqlalchemy import event


def create_app(object_name):
    """Create the app instance via `Factory Method`"""
...
    # Will be callback on_reminder_save when insert recond into table `reminder`.
    event.listen(Reminder, 'after_insert', on_reminder_save)
  • NOTE 1: SQLAlchemy 容許在 Model 上註冊回調函數, 當 Model 對象發生特定的情景時, 就會執行這個回調函數, 這就是所謂的 event, 這裏咱們使用 after_insert 來指定當建立一個新的 Reminder 對象(插入一條記錄)時就觸發這個回調函數. 而是回調函數中的形參, 會由 event 來負責傳入.
相關文章
相關標籤/搜索