基於 Django 2.0.4 的 djcelery 配置

Django Celery 配置實踐

所需環境

python 3.5.2python

rabbitmqgit

安裝所需的包

pip install -r requirements.txtgithub

QuickStart

建立Django項目

建立一個名爲proj的Django項目redis

django-admin startproject projsql

建立Django App

建立一個用於演示的django app,這裏名爲demo數據庫

django-admin startapp demodjango

在建立的app中,增長tasks.py文件,用於編寫celery任務json

基礎配置項目

修改proj/settings.py配置文件,增長celery相關配置。瀏覽器

增長djcelery app

修改settings.py中INSTALLED_APPS,增長djcelery及app服務器

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery',
    'demo'
]

celery相關的參數配置

若是僅僅需求使用celery異步執行任務的話,如下最基礎的配置就能夠知足需求

# 導入tasks文件,由於咱們使用autodiscover_tasks
# 會自動導入每一個app下的tasks.py,因此這個配置不是很必要
# 若是須要導入其餘非tasks.py的模塊,則須要再此配置須要導入的模塊
# CELERY_IMPORTS = ('demo.tasks', )
# 配置 celery broker
CELERY_BROKER_URL = 'amqp://user:password@127.0.0.1:5672//'
# 配置 celery backend 用Redis會比較好
# 由於手上沒有redis服務器,因此演示時用RabbitMQ替代
CELERY_RESULT_BACKEND = 'amqp://user:password@127.0.0.1:5672//'

建立Celery實例

在proj目錄下,編輯celery.py文件,用於建立celery實例

from celery import Celery
from django.conf import settings

# 建立celery應用
celery_app = Celery('proj', broker=settings.CELERY_BROKER_URL)
# 從配置文件中加載除celery外的其餘配置
celery_app.config_from_object('django.conf:settings')
# 自動檢索每一個app下的tasks.py
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

編寫異步任務

在以前建立的demo/tasks.py中,編寫一個用於演示的異步任務。

注意每一個異步任務以前都須要使用@celery_app.task裝飾器。

celery_app實際是以前在proj/celery.py中建立的celery的實例,若是你的實例名稱不同,作對應的修改便可。

import logging
from proj.celery import celery_app

@celery_app.task
def async_task():
    logging.info('run async_task')

調用異步任務

在demo/views.py中定義一個頁面,只用來調用異步任務。

from django.http import HttpResponse
from demo.tasks import async_demo_task

# Create your views here.
def demo_task(request):
    # delay表示將任務交給celery執行
    async_demo_task.delay()
    return HttpResponse('任務已經運行')

在proj/urls.py中註冊對應的url。

from django.contrib import admin
from django.urls import path
from demo.views import demo_task

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
]

啓動Celery Worker

使用命令啓動worker:

manage.py celery -A proj worker -l info

對參數作個簡單的說明:

-A proj是指項目目錄下的celery實例。演示項目名爲proj,因此-A的值是proj。若是項目名是其餘名字,將proj換成項目對應的名字。

-l info 是指日誌記錄的級別,這裏記錄的是info級別的日誌。

若是配置沒有問題,能成功鏈接broker,則會有相似如下的日誌:

 -------------- celery@Matrix.local v3.1.26.post2 (Cipater)
---- **** ----- 
--- * ***  * -- Darwin-17.5.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x108ab1eb8
- ** ---------- .> transport:   amqp://user:**@127.0.0.1:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . demo.tasks.async_demo_task

[2018-04-24 08:24:47,656: INFO/MainProcess] Connected to amqp://user:**@127.0.0.1:5672//

須要注意的是日誌中的tasks部分,能夠看到已經自動識別到了demo.tasks.async_demo_task這個用於演示的任務。

若是沒有識別到,檢查下celery實例是否調用autodiscover_tasks方法,或配置文件的CELERY_IMPORTS是否配置正確。

調用異步任務

在demo/views.py中定義一個頁面,只用來調用異步任務。

from django.http import HttpResponse
from demo.tasks import async_demo_task

# Create your views here.
def demo_task(request):
    # delay表示將任務交給celery執行
    async_demo_task.delay()
    return HttpResponse('任務已經運行')

在proj/urls.py中註冊對應的url。

from django.contrib import admin
from django.urls import path
from demo.views import demo_task

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
]

最後,啓動django,訪問url http://127.0.0.1:8000/async_demo_task 調用異步任務。

在worker的日誌中,能夠看到相似的執行結果,即說明任務已經由celery異步執行。

若是出現"Using settings.DEBUG leads to a memory leak, never "的警告信息,則在生產環境中關閉掉django的debug模式便可。

[2018-04-24 09:25:52,677: INFO/MainProcess] Received task: demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391]
[2018-04-24 09:25:52,681: INFO/Worker-4] run async_task
[2018-04-24 09:25:52,899: INFO/MainProcess] Task demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] succeeded in 0.21868160199665s: None

爲任務分配隊列

請參考這裏celery-demo

配置計劃任務

一樣請參這裏celery-demo

使用Django Admin管理Celery計劃任務

使用djcelery,而不直接使用celery的好處就在於能夠經過Django Admin對Celery的計劃任務進行管理。

啓動進程

使用計劃任務時,除了保證原先的worker正常運行外(worker的啓動方式見上),還須要啓動beats:

python manage.py celery beat

也能夠beat和worker一塊兒啓動

python manage.py celery -A project worker -l info --beat

建立數據庫

python manage.py migrate

建立Django Admin和djcelery對應的表,這裏的數據庫使用默認的sqlite。

建立管理員

python manage.py createsuperuser,依次輸入超級管理員賬號、郵箱、密碼。

演示項目中設置賬號:admin 密碼: superplayer123

修改配置文件

在settings.py中,增長兩項配置:

# 設定時區,配置計劃任務時須要
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

建立計劃任務

訪問 http://127.0.0.1:8000/admin/djcelery/periodictask/add/,用於建立定時任務。

簡單的解釋下建立定時任務的選項:

字段 說明
名稱 便於理解的計劃任務名稱
Task (registered) 選擇一個已註冊的任務
Task (custom)  
Enabled 任務是否啓用
Interval 按某個時間間隔執行
Crontab 定時任務, 和Interval二選一
Arguments 以list的形式傳入參數,json格式
Keyword arguments: 以dict的形式傳入參數,json格式
Expires 任務到期時間
Queue 指定隊列,隊列名須要在配置文件的 CELERY_QUEUES定義好
Exchange Exchange
Routing key Routing key

經過Model操做計劃任務

本質上來講,就是對PeriodicTask這個model的操做。

下面模擬一個簡單的增長計劃任務的接口:

def add_task(request):
    interval = IntervalSchedule.objects.filter(every=30, period='seconds').first()
    periodic_task = PeriodicTask(name='test', task='demo.tasks.async_demo_task', interval=interval)
    periodic_task.save()
    return HttpResponse('任務已經添加')

在proj/urls.py中增長url地址進行訪問: 

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
    path('add_task', add_task),
    path('get_periodic_task_list', get_periodic_task_list),
]

經過瀏覽器訪問http://127.0.0.1:8000/add_task 就能夠直接添加一個間隔30秒的計劃任務了。

而後在beat中能夠看到相似日誌,檢測到了Schedule改變,而且自動運行剛剛添加的任務。

[2018-05-03 17:18:10,012: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2018-05-03 17:18:10,013: INFO/MainProcess] Writing entries (0)...
[2018-05-03 17:18:40,020: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)
[2018-05-03 17:19:10,021: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)

一樣的,經過獲取PeriodicTask的數據,也能夠獲得正在運行的任務。

def get_periodic_task_list(request):
    """
    獲取週期性任務列表
    :return:
    """
    periodic_task_list = PeriodicTask.objects.all()
    data = [model_to_dict(periodic_task) for periodic_task in periodic_task_list]
    resp = json.dumps(data, cls=CustomJSONEncoder, ensure_ascii=False)
    return HttpResponse(resp, content_type='application/json', status=200)

更多的功能均可以經過操做djcelery的model進行實現。

相關文章
相關標籤/搜索