Celery使用指南


目錄

1、Celery使用指南

Celery使用指南

前言

Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。python

它是一個專一於實時處理的任務隊列,同時也支持任務調度。redis

1. 何爲消息隊列?

任務隊列是一種在線程或機器間分發任務的機制。數據庫

消息隊列的輸入是工做的一個單元,稱爲任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。django

Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程。json

Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力。服務器

使用場景併發

  1. 異步任務

將耗時的操做任務提交給 Celery 去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等app

  1. 定時任務

相似於 crontab ,好比每日數據統計異步

2. celery組件

Celery 扮演生產者和消費者的角色,分佈式

Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 週期性的將配置中到期須要執行的任務發送給任務隊列.

Celery Worker : 執行任務的消費者, 一般會在多臺服務器運行多個消費者, 提升運行效率.

Broker : 消息代理, 隊列自己. 也稱爲消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫).

Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.

Result Backend : 任務處理完成以後保存狀態信息和結果, 以供查詢.

結構圖:

3. 消息代理

生產環境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.

4. 安裝

安裝celery相關庫:

pip install celery==3.1.5
pip install django-celery==3.2.2
pip install redis==2.10.6

安裝實時監控和管理Web界面工具:

pip install flower

若是安裝最新django-celery3.2版本,則須要celery4版本小於4.0大於3。0的版本,且redis的版本必須小於3.0,所以各依賴庫的版本號如上定義。

5. 配置

項目結構圖以下:

INSTALLED_APPS = [
	...
    'djcelery', #django-celery必須添加
    'app',
]

# 以下配置celery等信息

import djcelery
# 當settings.py中的djcelery.setup_loader()運行時,
# Celery便會查看全部INSTALLED_APPS中app目錄中的tasks.py文件, 找到標記爲task的function,
# 並將它們註冊爲celery task.
djcelery.setup_loader()     #加載djcelery

#並無北京時區,與下面TIME_ZONE應該一致
CELERY_TIMEZONE = 'Asia/Shanghai'
# 消息隊列
BROKER_URL = 'redis://127.0.0.1:6379'
# 配置backend
# CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend'

# 設置worker的併發數量爲2
CELERY_CONCURRENCY = 2
# 結果存儲位置
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
# 任務序列化和反序列化爲json
CELERY_TASK_SERIALIZER = 'json'
# 存儲結果序列化爲json
CELERY_RESULT_SERIALIZER = 'json'

在celery.py文件中定義:

from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings
# 設置環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_celery_demo.settings')
# 實例化Celery
app = Celery('task')
# 使用django的settings文件配置celery
app.config_from_object('django.conf.settings')
# Celery加載全部註冊的應用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在tasks.py文件中定義:

import time

from dj_celery_demo.celery import app

#調度任務
@app.task
def add(x, y):
    # 模擬長時間耗時操做
    print('============耗時操做=============')
    time.sleep(10)
    print('============耗時操做結束============')
    return x + y

在views.py文件中定義調用異步任務task,定義以下:

from django.http import HttpResponse

from app.tasks import add


def task_add(request):
    # delay() 方法調用任務
    # delay 返回的是一個 AsyncResult 對象,裏面存的就是一個異步的結果,
    # 當任務完成時result.ready() 爲 true,而後用 result.get() 取結果便可

    # 發出request後異步執行該task, 立刻返回response, 從而不阻塞該request,
    # 使用戶有一個流暢的訪問過程.那麼, 咱們可使用.delay,
    add.delay(3, 5)
    # Celery會將task加入到queue中, 並立刻返回.而在一旁待命的worker看到該task後, 便會按照設定執行它, 並將他從queue中移除
    return HttpResponse("僞裝操做很耗時,不等了")

6. 啓動項目

啓動celery命令:

python manage.py celery worker -l info
命令中 -l info 表示將程序中的調試信息打印在控制檯中。命令也可簡化爲python manage.py celery worker

啓動flower命令:

python manage.py celery flower --address=127.0.0.1 --port=5555 
命令中adderss和port參數能夠不用填寫,默認啓動的IP地址爲127.0.0.1,默認端口爲5555

啓動命令,控制檯中信息的展現,以下圖所示:

項目啓動命令啓動後,redis中執行任務狀況以下圖所示:

注意: 若任務被修改,Celery 須要從新啓動,不然沒法隨之更改

7. 可視化監控隊列工具

flower任務窗口界面截圖:

flower信息中間件界面截圖:

flower信息消費狀況界面截圖:

相關文章
相關標籤/搜索