在開發中,咱們經常會遇到一些耗時任務,舉個例子:html
上傳並解析一個 1w 條數據的 Excel 文件,最後持久化至數據庫。python
在個人程序中,這個任務耗時大約 6s,對於用戶來講,6s 的等待已是個災難了。docker
比較好的處理方式是:數據庫
咱們按照這個思路,藉助 Celery 進行實現。django
本文所使用的環境以下:後端
- Python 3.6.7
- RabbitMQ 3.8
- Celery 4.3
Celery 依賴一個消息後端,可選方案有 RabbitMQ, Redis 等,本文選用 RabbitMQ 。bash
同時爲了安裝方便,RabbitMQ 我直接使用 Docker 安裝:app
docker run -d --name anno-rabbit -p 5672:5672 rabbitmq:3
複製代碼
啓動成功後,便可經過 amqp://localhost
訪問該消息隊列。異步
Celery 是 Python 實現的工具,安裝能夠直接經過 Pip 完成:工具
pip install celery
複製代碼
同時假設當前個人項目文件夾爲
proj
,項目名爲myproj
,應用名爲myapp
安裝完成後,在 proj/myproj/
路徑下建立一個 celery.py
文件,用來初始化 Celery 實例:
proj/myproj/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')
app = Celery('myproj',
broker='amqp://localhost//',
backend='amqp://localhost//')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.s
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
複製代碼
而後在 proj/myproj/__init__.py
中添加對 Celery 對象的引用,確保 Django 啓動後可以初始化 Celery:
proj/myproj/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
複製代碼
無其餘特殊配置的話,Celery 的基本配置就是這些。
爲了模擬一個耗時任務,咱們直接建立一個方法,使其「睡」10s ,並將其設置爲 Celery 的任務:
proj/myapp/tasks.py
import time
from myproj.celery import app as celery_app
@celery_app.task
def waste_time():
time.sleep(10)
return "Run function 'waste_time' finished."
複製代碼
Celery 配置完成,而且任務建立成功後,咱們以異步任務的模式啓動 Celery :
celery -A myproj worker -l info
複製代碼
注意到我強調了異步模式,是由於 Celery 除了支持異步任務,還支持定時任務,所以啓動時候要指明。
同時要注意,Celery 一旦啓動,對 Task(此處爲 waste_time
) 的修改必須重啓 Celery 纔會生效。
在請求處理的邏輯代碼中,調用上面建立好的任務:
proj/myapp/views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from .tasks import waste_time
@require_http_methods(["POST"])
def upload_files(request):
waste_time.delay()
# Status code 202: Accepted, 表示異步任務已接受,可能還在處理中
return JsonResponse({"results": "操做成功,正在上傳,請稍候..."}, status=202)
複製代碼
調用 waste_time.delay()
方法後, waste_time
會被加入到任務隊列中,等待空閒的 Celery Worker 調用。
當咱們發送請求時,這個接口會直接返回 {"results": "操做成功,正在上傳,請稍候..."}
的響應內容而非卡住十秒,用戶體驗要好許多。
用 Celery 處理這種異步任務是 Python 經常使用的方法,雖然實際執行成功耗時不變甚至有所增長(如 Worker 繁忙致使處理滯後),可是對於用戶體驗來講更容易接受,點擊上傳大文件後能夠繼續處理其餘事務,而不須要在頁面等待。
Celery 還有更多用法本文未介紹到,其文檔已經很是詳盡,有須要可直接參考。