1、Celery使用指南
Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。python
它是一個專一於實時處理的任務隊列,同時也支持任務調度。redis
任務隊列是一種在線程或機器間分發任務的機制。數據庫
消息隊列的輸入是工做的一個單元,稱爲任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。django
Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程。json
Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力。服務器
使用場景併發
將耗時的操做任務提交給 Celery 去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等app
相似於 crontab ,好比每日數據統計異步
Celery 扮演生產者和消費者的角色,分佈式
Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 週期性的將配置中到期須要執行的任務發送給任務隊列.
Celery Worker : 執行任務的消費者, 一般會在多臺服務器運行多個消費者, 提升運行效率.
Broker : 消息代理, 隊列自己. 也稱爲消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫).
Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.
Result Backend : 任務處理完成以後保存狀態信息和結果, 以供查詢.
結構圖:
生產環境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.
安裝celery相關庫:
pip install celery==3.1.5 pip install django-celery==3.2.2 pip install redis==2.10.6
安裝實時監控和管理Web界面工具:
pip install flower
若是安裝最新django-celery3.2版本,則須要celery4版本小於4.0大於3。0的版本,且redis的版本必須小於3.0,所以各依賴庫的版本號如上定義。
項目結構圖以下:
INSTALLED_APPS = [ ... 'djcelery', #django-celery必須添加 'app', ] # 以下配置celery等信息 import djcelery # 當settings.py中的djcelery.setup_loader()運行時, # Celery便會查看全部INSTALLED_APPS中app目錄中的tasks.py文件, 找到標記爲task的function, # 並將它們註冊爲celery task. djcelery.setup_loader() #加載djcelery #並無北京時區,與下面TIME_ZONE應該一致 CELERY_TIMEZONE = 'Asia/Shanghai' # 消息隊列 BROKER_URL = 'redis://127.0.0.1:6379' # 配置backend # CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend' # 設置worker的併發數量爲2 CELERY_CONCURRENCY = 2 # 結果存儲位置 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' # 任務序列化和反序列化爲json CELERY_TASK_SERIALIZER = 'json' # 存儲結果序列化爲json CELERY_RESULT_SERIALIZER = 'json'
在celery.py文件中定義:
from __future__ import absolute_import import os from celery import Celery from django.conf import settings # 設置環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_celery_demo.settings') # 實例化Celery app = Celery('task') # 使用django的settings文件配置celery app.config_from_object('django.conf.settings') # Celery加載全部註冊的應用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在tasks.py文件中定義:
import time from dj_celery_demo.celery import app #調度任務 @app.task def add(x, y): # 模擬長時間耗時操做 print('============耗時操做=============') time.sleep(10) print('============耗時操做結束============') return x + y
在views.py文件中定義調用異步任務task,定義以下:
from django.http import HttpResponse from app.tasks import add def task_add(request): # delay() 方法調用任務 # delay 返回的是一個 AsyncResult 對象,裏面存的就是一個異步的結果, # 當任務完成時result.ready() 爲 true,而後用 result.get() 取結果便可 # 發出request後異步執行該task, 立刻返回response, 從而不阻塞該request, # 使用戶有一個流暢的訪問過程.那麼, 咱們可使用.delay, add.delay(3, 5) # Celery會將task加入到queue中, 並立刻返回.而在一旁待命的worker看到該task後, 便會按照設定執行它, 並將他從queue中移除 return HttpResponse("僞裝操做很耗時,不等了")
啓動celery命令:
python manage.py celery worker -l info 命令中 -l info 表示將程序中的調試信息打印在控制檯中。命令也可簡化爲python manage.py celery worker
啓動flower命令:
python manage.py celery flower --address=127.0.0.1 --port=5555 命令中adderss和port參數能夠不用填寫,默認啓動的IP地址爲127.0.0.1,默認端口爲5555
啓動命令,控制檯中信息的展現,以下圖所示:
項目啓動命令啓動後,redis中執行任務狀況以下圖所示:
注意: 若任務被修改,Celery 須要從新啓動,不然沒法隨之更改
flower任務窗口界面截圖:
flower信息中間件界面截圖:
flower信息消費狀況界面截圖: