Celery是一個異步任務的調度工具。 能夠提供消息隊列的後臺執行方式,也能夠提供跟進時間的計劃任務。 官方文檔 Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程。html
是一個消息傳輸的中間件,異步程序調用celery任務的時候,會向broker傳遞消息,存儲消息隊列,以後worker進行消息的消費。python
用於存儲這些消息以及celery的一下直接結果和執行信息。redis
1.定義任務函數
2.運行celery服務
3.客戶端程序調用
如下在Windows下測試經過app
建立文件 tasks.py
異步
from celery import Celery broker = 'redis://127.0.0.1:6379/5' backend = 'redis://127.0.0.1:6379/6' # 指定任務名 tasks跟文件名一致 app = Celery('tasks', broker=broker, backend=backend) # 建立任務函數 @app.task def add(x, y): return x + y
運行命令:函數
celery -A tasks worker --loglevel=info
程序調用:工具
In [0]:from tasks import add In [1]: r = add.delay(2, 2) In [2]: add.delay(2, 2) Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> In [3]: r = add.delay(3, 3) In [4]: r.ready() Out[4]: True
以上是celery的簡單配置,重在理解原理,如下加入配置文件方式。 建立python包,命名爲proj:測試
☁ proj tree . ├── __init__.py ├── celery.py # 建立 celery 實例 ├── config.py # 配置文件 └── tasks.py # 任務函數
celery.pycode
# -*- coding:utf-8 -*- from __future__ import absolute_import from celery import Celery app = Celery('proj', include=['proj.tasks']) app.config_from_object('proj.config') if __name__ == '__main__': app.start()
config.pyhtm
# -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6'
tasks.py
# -*- coding:utf-8 -*- from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y
運行命令:
celery -A proj worker -l info
程序調用:
from proj.tasks import add ... (同上)
每隔一段時間執行一次任務 config.py
# -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'proj.tasks.add', #這裏注意路徑 'schedule': timedelta(seconds=30), 'args': (16, 16) }, }
一旦使用了 scheduler, 啓動 celery須要加上-B 參數在Windows下須要另開一個終端分別運行。
celery -A proj worker -l info celery -A proj beat -s celerybeat-schedule
config.py (未測試)
# -*- coding:utf-8 -*- from __future__ import absolute_import CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5' BROKER_URL = 'redis://127.0.0.1:6379/6' CELERY_TIMEZONE = 'Asia/Shanghai' from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }