Celery是異步消息隊列, 能夠在不少場景下進行靈活的應用.消息中包含了執行任務所需的的參數,用於啓動任務執行, suoy因此消息隊列也能夠稱做html
在web應用開發中, 用戶觸發的某些事件須要較長事件才能完成. 能夠將任務交給celery去執行, 待任務完成後再將結果返回給用戶. 用戶同步請求觸發的其它任務, 如發送郵件,請求雲服務等也能夠交由celery來完成.python
celery的另外一個重要應用場景則是各類計劃任務.git
celery由5個主要組件組成:github
producer: 任務發佈者, 經過調用API向celery發佈任務的程序, 如web後端的控制器.web
celery beat: 任務調度, 根據配置文件發佈定時任務redis
worker: 實際執行任務的程序數據庫
broker: 消息代理, 接受任務消息,存入隊列再按順序分發給worker執行.django
result backend: 存儲結果的服務器, 通常爲各類數據庫服務json
總體結構如圖所示:後端
broker是celery的關鍵組件, 目前的可靠選擇有RabbitMQ和Redis, 出於穩定性等緣由咱們選擇官方推薦的RabbitMQ做爲broker.順便安裝librabbitmq做爲RabbitMQ的python客戶端.
消息的發送接受過程須要對序列進行序列化和反序列化, 從celery3.2開始官方出於安全性緣由拒絕使用python內置的pickle做爲序列化方案, 目前celery支持的序列化方案包括:
json: 跨語言的序列化方案
yaml: 擁有更多數據類型, 但python客戶端性能不如json
msgpack: 二進制序列化方案, 比json更小更快
若對可讀性有要求能夠採用json方案, 若追求更高的性能則能夠選用msgpack.
result backend用於存儲異步任務的結果和狀態, 目前可用的有Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等.
可使用boundless方式安裝依賴:
pip install "celery[librabbitmq,redis,msgpack]"
建立tasks.py
文件, 並寫入:
from celery import Celery app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1') @app.task def add(x, y): return x + y
這樣咱們建立了celery實例, Celery()的第一個參數爲當前module的名稱(py文件名或包名).
在終端執行命令以啓動服務器:
celery -A tasks worker -l info
-A tasks
參數指定app爲模塊tasks,-l info
參數指定log級別爲info.
當看到這條log時說明celery已就緒:
[2016-09-11 18:04:43,758: WARNING/MainProcess] celery@finley-pc ready.
在python中導入任務並執行
>>> from tasks import add >>> result = add.delay(1,2) >>> result.result 3 >>> result.status 'SUCCESS' >>> result.successful() True
使用一個py文件做爲module很是不便, 在更復雜的任務中能夠採用python包做爲module.
創建python包demo,創建下列文件:
app.py:
from celery import Celery app = Celery('demo', include=['demo.tasks']) app.config_from_object('demo.config') app.start()
config.py
BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
tasks.py
from demo.app import app @app.task def add(x, y): return x + y
在終端中啓動:
celery -A demo.app worker -l info
若將-A
參數設爲demo
則會默認嘗試啓動demo.celery
.由於該module與celery重名可能在導入時出現錯誤, 因此咱們沒有采用這種作法.
celery還支持綁定,日誌,重試等特性:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info('doing div') try: result = x / y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
bind=true
將app對象做爲self參數傳給task函數.
前文的示例須要producer主動檢查任務的狀態,存在諸多不便. 咱們能夠在task函數中主動通知producer:
from celery import Celery from demo.app import app from urllib.request import urlopen @app.task def add(x, y): result = x + y url = 'http://producerhost/callback/add?result=%d' % result urlopen(url) return result
上述示例中咱們使用GET請求將結果發送給了producer的回調API, 固然不少狀況下能夠直接調用回調函數.
Celery易於與Web框架集成, 做者常採用的交互邏輯是:
提供提交任務, 查詢任務結果兩個API, 由客戶端決定什麼時候查詢結果
採用websocket等技術, 服務器主動向客戶端發送結果
固然也能夠採用異步IO模式, 這須要一些擴展包的協助:
安裝tornado-celery: pip install torando-celery
編寫handler:
import tcelery tcelery.setup_nonblocking_producer() from demo.tasks import add calss Users(RequestHandler): @asynchronous def get(): add.apply_async(args=[1,2], callback=self.on_success) def on_success(self, response): users = response.result self.write(users) self.finish()
其它的Web框架也有本身的擴展包:
Django: django-celery
Tornado: tornado-celery
web2py: web2py-celery
Pylons: celery-pylons
Pyramid: pyramid_celery
celery的計劃任務有schedule和crontab兩種方式.
在config.py中添加配置:
CELERYBEAT_SCHEDULE = { 'add': { 'task': 'demo.tasks.add', 'schedule': timedelta(seconds=10), 'args': (16, 16) } }
啓動beat:
celery beat -A demo.app
而後啓動worker:
celery -A demo.app worker -l info
或者與celery app一同啓動:
celery -B -A demo.app worker -l info
'schedule'能夠接受datetime, timedelta或crontab對象:
from celery.schedules import crontab { 'schedule': crontab(hour=7, minute=30, day_of_week=1), pass }
上文中咱們使用本地python函數做爲worker, webhook機制容許使用遠程的Web服務做爲worker.
在使用webhook做爲worker時, broker將消息封裝爲http請求發送給worker, 並按照協議解析返回值.
使用webhook須要在CELERY_IMPORTS
參數中包含celery.task.http
, 或者在啓動參數中指定-I celery.task.http
.
broker使用GET或POST方法發送請求, 參數由調用時的關鍵字參數指定. worker返回json格式的響應:
{'status': 'success', 'retval': ...}
在失敗時返回響應:
{'status': 'failure', 'reason': ...}
咱們用django做爲worker:
from django.http import HttpResponse import json def add(request): x = int(request.GET['x']) y = int(request.GET['y']) result = x + y response = {'status': 'success', 'retval': result} return HttpResponse(json.dumps(response), mimetype='application/json')
配置django爲http://cloudservice/webhook/add
提供Web服務.
從本地添加任務:
>>>from celery.task.http import URL >>>result = URL('http://cloudservice/webhook/add').get_async(x=10, y=10) >>>result.get() 20
URL是HttpDispatchTask的快捷方法(shortcut):
>>> from celery.task.http import HttpDispatchTask >>> res = HttpDispatchTask.delay( ... url='http://cloudservice/webhook/add', ... method='GET', x=10, y=10) >>> res.get() 20
更多關於celery的內容請參閱: