python 3.5.2python
rabbitmqgit
pip install -r requirements.txt
github
建立一個名爲proj的Django項目redis
django-admin startproject proj
sql
建立一個用於演示的django app,這裏名爲demo數據庫
django-admin startapp demo
django
在建立的app中,增長tasks.py文件,用於編寫celery任務json
修改proj/settings.py配置文件,增長celery相關配置。瀏覽器
修改settings.py中INSTALLED_APPS,增長djcelery及app服務器
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', 'demo' ]
若是僅僅需求使用celery異步執行任務的話,如下最基礎的配置就能夠知足需求
# 導入tasks文件,由於咱們使用autodiscover_tasks # 會自動導入每一個app下的tasks.py,因此這個配置不是很必要 # 若是須要導入其餘非tasks.py的模塊,則須要再此配置須要導入的模塊 # CELERY_IMPORTS = ('demo.tasks', ) # 配置 celery broker CELERY_BROKER_URL = 'amqp://user:password@127.0.0.1:5672//' # 配置 celery backend 用Redis會比較好 # 由於手上沒有redis服務器,因此演示時用RabbitMQ替代 CELERY_RESULT_BACKEND = 'amqp://user:password@127.0.0.1:5672//'
在proj目錄下,編輯celery.py文件,用於建立celery實例
from celery import Celery from django.conf import settings # 建立celery應用 celery_app = Celery('proj', broker=settings.CELERY_BROKER_URL) # 從配置文件中加載除celery外的其餘配置 celery_app.config_from_object('django.conf:settings') # 自動檢索每一個app下的tasks.py celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
在以前建立的demo/tasks.py中,編寫一個用於演示的異步任務。
注意每一個異步任務以前都須要使用@celery_app.task裝飾器。
celery_app實際是以前在proj/celery.py中建立的celery的實例,若是你的實例名稱不同,作對應的修改便可。
import logging from proj.celery import celery_app @celery_app.task def async_task(): logging.info('run async_task')
在demo/views.py中定義一個頁面,只用來調用異步任務。
from django.http import HttpResponse from demo.tasks import async_demo_task # Create your views here. def demo_task(request): # delay表示將任務交給celery執行 async_demo_task.delay() return HttpResponse('任務已經運行')
在proj/urls.py中註冊對應的url。
from django.contrib import admin from django.urls import path from demo.views import demo_task urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), ]
使用命令啓動worker:
manage.py celery -A proj worker -l info
對參數作個簡單的說明:
-A proj是指項目目錄下的celery實例。演示項目名爲proj,因此-A的值是proj。若是項目名是其餘名字,將proj換成項目對應的名字。
-l info 是指日誌記錄的級別,這裏記錄的是info級別的日誌。
若是配置沒有問題,能成功鏈接broker,則會有相似如下的日誌:
-------------- celery@Matrix.local v3.1.26.post2 (Cipater) ---- **** ----- --- * *** * -- Darwin-17.5.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x108ab1eb8 - ** ---------- .> transport: amqp://user:**@127.0.0.1:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . demo.tasks.async_demo_task [2018-04-24 08:24:47,656: INFO/MainProcess] Connected to amqp://user:**@127.0.0.1:5672//
須要注意的是日誌中的tasks部分,能夠看到已經自動識別到了demo.tasks.async_demo_task這個用於演示的任務。
若是沒有識別到,檢查下celery實例是否調用autodiscover_tasks方法,或配置文件的CELERY_IMPORTS是否配置正確。
在demo/views.py中定義一個頁面,只用來調用異步任務。
from django.http import HttpResponse from demo.tasks import async_demo_task # Create your views here. def demo_task(request): # delay表示將任務交給celery執行 async_demo_task.delay() return HttpResponse('任務已經運行')
在proj/urls.py中註冊對應的url。
from django.contrib import admin from django.urls import path from demo.views import demo_task urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), ]
最後,啓動django,訪問url http://127.0.0.1:8000/async_demo_task 調用異步任務。
在worker的日誌中,能夠看到相似的執行結果,即說明任務已經由celery異步執行。
若是出現"Using settings.DEBUG leads to a memory leak, never "的警告信息,則在生產環境中關閉掉django的debug模式便可。
[2018-04-24 09:25:52,677: INFO/MainProcess] Received task: demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] [2018-04-24 09:25:52,681: INFO/Worker-4] run async_task [2018-04-24 09:25:52,899: INFO/MainProcess] Task demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] succeeded in 0.21868160199665s: None
請參考這裏celery-demo
一樣請參這裏celery-demo
使用djcelery,而不直接使用celery的好處就在於能夠經過Django Admin對Celery的計劃任務進行管理。
使用計劃任務時,除了保證原先的worker正常運行外(worker的啓動方式見上),還須要啓動beats:
python manage.py celery beat
也能夠beat和worker一塊兒啓動
python manage.py celery -A project worker -l info --beat
python manage.py migrate
建立Django Admin和djcelery對應的表,這裏的數據庫使用默認的sqlite。
python manage.py createsuperuser
,依次輸入超級管理員賬號、郵箱、密碼。
演示項目中設置賬號:admin 密碼: superplayer123
在settings.py中,增長兩項配置:
# 設定時區,配置計劃任務時須要 CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
訪問 http://127.0.0.1:8000/admin/djcelery/periodictask/add/,用於建立定時任務。
簡單的解釋下建立定時任務的選項:
字段 | 說明 |
---|---|
名稱 | 便於理解的計劃任務名稱 |
Task (registered) | 選擇一個已註冊的任務 |
Task (custom) | |
Enabled | 任務是否啓用 |
Interval | 按某個時間間隔執行 |
Crontab | 定時任務, 和Interval二選一 |
Arguments | 以list的形式傳入參數,json格式 |
Keyword arguments: | 以dict的形式傳入參數,json格式 |
Expires | 任務到期時間 |
Queue | 指定隊列,隊列名須要在配置文件的 CELERY_QUEUES定義好 |
Exchange | Exchange |
Routing key | Routing key |
本質上來講,就是對PeriodicTask這個model的操做。
下面模擬一個簡單的增長計劃任務的接口:
def add_task(request): interval = IntervalSchedule.objects.filter(every=30, period='seconds').first() periodic_task = PeriodicTask(name='test', task='demo.tasks.async_demo_task', interval=interval) periodic_task.save() return HttpResponse('任務已經添加')
在proj/urls.py中增長url地址進行訪問:
urlpatterns = [ path('admin/', admin.site.urls), path('async_demo_task', demo_task), path('add_task', add_task), path('get_periodic_task_list', get_periodic_task_list), ]
經過瀏覽器訪問http://127.0.0.1:8000/add_task 就能夠直接添加一個間隔30秒的計劃任務了。
而後在beat中能夠看到相似日誌,檢測到了Schedule改變,而且自動運行剛剛添加的任務。
[2018-05-03 17:18:10,012: INFO/MainProcess] DatabaseScheduler: Schedule changed. [2018-05-03 17:18:10,013: INFO/MainProcess] Writing entries (0)... [2018-05-03 17:18:40,020: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task) [2018-05-03 17:19:10,021: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)
一樣的,經過獲取PeriodicTask的數據,也能夠獲得正在運行的任務。
def get_periodic_task_list(request): """ 獲取週期性任務列表 :return: """ periodic_task_list = PeriodicTask.objects.all() data = [model_to_dict(periodic_task) for periodic_task in periodic_task_list] resp = json.dumps(data, cls=CustomJSONEncoder, ensure_ascii=False) return HttpResponse(resp, content_type='application/json', status=200)
更多的功能均可以經過操做djcelery的model進行實現。