異步消息隊列Celery

Celery是異步消息隊列, 能夠在不少場景下進行靈活的應用.消息中包含了執行任務所需的的參數,用於啓動任務執行, suoy因此消息隊列也能夠稱做html

在web應用開發中, 用戶觸發的某些事件須要較長事件才能完成. 能夠將任務交給celery去執行, 待任務完成後再將結果返回給用戶. 用戶同步請求觸發的其它任務, 如發送郵件,請求雲服務等也能夠交由celery來完成.python

celery的另外一個重要應用場景則是各類計劃任務.git

celery由5個主要組件組成:github

  • producer: 任務發佈者, 經過調用API向celery發佈任務的程序, 如web後端的控制器.web

  • celery beat: 任務調度, 根據配置文件發佈定時任務redis

  • worker: 實際執行任務的程序數據庫

  • broker: 消息代理, 接受任務消息,存入隊列再按順序分發給worker執行.django

  • result backend: 存儲結果的服務器, 通常爲各類數據庫服務json

總體結構如圖所示:後端

broker是celery的關鍵組件, 目前的可靠選擇有RabbitMQ和Redis, 出於穩定性等緣由咱們選擇官方推薦的RabbitMQ做爲broker.順便安裝librabbitmq做爲RabbitMQ的python客戶端.

消息的發送接受過程須要對序列進行序列化和反序列化, 從celery3.2開始官方出於安全性緣由拒絕使用python內置的pickle做爲序列化方案, 目前celery支持的序列化方案包括:

  • json: 跨語言的序列化方案

  • yaml: 擁有更多數據類型, 但python客戶端性能不如json

  • msgpack: 二進制序列化方案, 比json更小更快

若對可讀性有要求能夠採用json方案, 若追求更高的性能則能夠選用msgpack.

result backend用於存儲異步任務的結果和狀態, 目前可用的有Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等.

可使用boundless方式安裝依賴:

pip install "celery[librabbitmq,redis,msgpack]"

第一個異步任務

建立tasks.py文件, 並寫入:

from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1')


@app.task
def add(x, y):
    return x + y

這樣咱們建立了celery實例, Celery()的第一個參數爲當前module的名稱(py文件名或包名).

在終端執行命令以啓動服務器:

celery -A tasks worker -l info

-A tasks 參數指定app爲模塊tasks,-l info參數指定log級別爲info.

當看到這條log時說明celery已就緒:

[2016-09-11 18:04:43,758: WARNING/MainProcess] celery@finley-pc ready.

在python中導入任務並執行

>>> from tasks import add
>>> result = add.delay(1,2)
>>> result.result
3
>>> result.status
'SUCCESS'
>>> result.successful()
True

使用一個py文件做爲module很是不便, 在更復雜的任務中能夠採用python包做爲module.

創建python包demo,創建下列文件:

app.py:

from celery import Celery

app = Celery('demo', include=['demo.tasks'])

app.config_from_object('demo.config')

app.start()

config.py

BROKER_URL = 'redis://127.0.0.1:6379/0'

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

CELERY_TASK_SERIALIZER = 'msgpack'

CELERY_RESULT_SERIALIZER = 'json'

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

tasks.py

from demo.app import app


@app.task
def add(x, y):
    return x + y

在終端中啓動:

celery -A demo.app worker -l info

若將-A參數設爲demo則會默認嘗試啓動demo.celery.由於該module與celery重名可能在導入時出現錯誤, 因此咱們沒有采用這種作法.

celery還支持綁定,日誌,重試等特性:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(bind=True)
def div(self, x, y):
    logger.info('doing div')
    try:
        result = x / y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)
    return result

bind=true將app對象做爲self參數傳給task函數.

前文的示例須要producer主動檢查任務的狀態,存在諸多不便. 咱們能夠在task函數中主動通知producer:

from celery import Celery
from demo.app import app
from urllib.request import urlopen

@app.task
def add(x, y):
    result = x + y
    url = 'http://producerhost/callback/add?result=%d' % result
    urlopen(url)
    return result

上述示例中咱們使用GET請求將結果發送給了producer的回調API, 固然不少狀況下能夠直接調用回調函數.

Celery易於與Web框架集成, 做者常採用的交互邏輯是:

  • 提供提交任務, 查詢任務結果兩個API, 由客戶端決定什麼時候查詢結果

  • 採用websocket等技術, 服務器主動向客戶端發送結果

固然也能夠採用異步IO模式, 這須要一些擴展包的協助:

安裝tornado-celery: pip install torando-celery

編寫handler:

import tcelery
tcelery.setup_nonblocking_producer()

from demo.tasks import add

calss Users(RequestHandler):
    @asynchronous
    def get():
        add.apply_async(args=[1,2], callback=self.on_success)

    def on_success(self, response):
        users = response.result
        self.write(users)
        self.finish()

其它的Web框架也有本身的擴展包:

計劃任務

celery的計劃任務有schedule和crontab兩種方式.

在config.py中添加配置:

CELERYBEAT_SCHEDULE = {

    'add': {

        'task': 'demo.tasks.add',

        'schedule': timedelta(seconds=10),

        'args': (16, 16)

    }
}

啓動beat:

celery beat -A demo.app

而後啓動worker:

celery -A demo.app worker -l info

或者與celery app一同啓動:

celery -B -A demo.app worker -l info

'schedule'能夠接受datetime, timedelta或crontab對象:

from celery.schedules import crontab

{
    'schedule': crontab(hour=7, minute=30, day_of_week=1),
    pass
}

webhook

上文中咱們使用本地python函數做爲worker, webhook機制容許使用遠程的Web服務做爲worker.

在使用webhook做爲worker時, broker將消息封裝爲http請求發送給worker, 並按照協議解析返回值.

使用webhook須要在CELERY_IMPORTS參數中包含celery.task.http, 或者在啓動參數中指定-I celery.task.http.

broker使用GET或POST方法發送請求, 參數由調用時的關鍵字參數指定. worker返回json格式的響應:

{'status': 'success', 'retval': ...}

在失敗時返回響應:

{'status': 'failure', 'reason': ...}

咱們用django做爲worker:

from django.http import HttpResponse
import json


def add(request):
    x = int(request.GET['x'])
    y = int(request.GET['y'])
    result = x + y
    response = {'status': 'success', 'retval': result}
    return HttpResponse(json.dumps(response), mimetype='application/json')

配置django爲http://cloudservice/webhook/add提供Web服務.

從本地添加任務:

>>>from celery.task.http import URL
>>>result = URL('http://cloudservice/webhook/add').get_async(x=10, y=10)
>>>result.get()
20

URL是HttpDispatchTask的快捷方法(shortcut):

>>> from celery.task.http import HttpDispatchTask
>>> res = HttpDispatchTask.delay(
...     url='http://cloudservice/webhook/add',
...     method='GET', x=10, y=10)
>>> res.get()
20

更多關於celery的內容請參閱:

Celery latest documentation

相關文章
相關標籤/搜索