3-django進階之celery

Django集成Celery到項目

​ 本節將celery集成到Django項目中,實現異步任務處理和定時任務處理html

Celery工做流程

celery流程圖

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。前端

消息中間件python

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQmysql

任務執行單元web

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。redis

任務結果存儲sql

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache數據庫

1 Celery安裝與配置

在虛擬環境中安裝:django

pip install django-celery==3.2.2json

pip install django-redis

pip install flower # celery 的web管理平臺(異步任務可視化)

查看集成到Django中的celery版本, pip freeze

celery==3.1.26.post2 django-celery==3.2.2 flower==0.9.2

啓動redis服務, 端口假設爲6379

發現pip安裝比較慢的狀況

pip install pillow -i https://pypi.douban.com/simple

2 Django中配置

(1)在主工程的配置文件settings.py 中應用註冊表INSTALLED_APPS中加入 djcelery

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'art',
    'xadmin',
    'crispy_forms',
    'DjangoUeditor',
    'djcelery',       #加入djcelery
]

(2) 在settings.py 中加入celery配置信息

#############################
# celery 配置信息 start
#############################
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_IMPORTS = ('art.tasks')
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' 
from celery.schedules import crontab
from celery.schedules import timedelta

CELERYBEAT_SCHEDULE = {    #定時器策略
    #定時任務一: 每隔30s運行一次
    u'測試定時器1': {
        "task": "art.tasks.tsend_email",
        #"schedule": crontab(minute='*/2'),  # or 'schedule':   timedelta(seconds=3),
        "schedule":timedelta(seconds=30),
        "args": (),
    },
}
#############################
# celery 配置信息 end
#############################

​ 當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的全部app目錄中的tasks.py文件,找到標記爲task的方法,將它們註冊爲celery task

​ BROKER_URL:broker是代理人,它負責分發任務給worker去執行。我使用的是Redis做爲broker

​ 沒有設置 CELERY_RESULT_BACKEND,默認沒有配置,此時Django會使用默認的數據庫(也是你指定的orm數據庫)。

CELERY_IMPORTS:是導入目標任務文件

CELERYBEAT_SCHEDULER:使用了django-celery默認的數據庫調度模型,任務執行週期都被存在默認指定的orm數據庫中.

CELERYBEAT_SCHEDULE:設置定時的時間配置, 能夠精確到秒,分鐘,小時,天,周等。

(3)建立應用實例

​ 在主工程目錄添加celery.py, 添加自動檢索django工程tasks任務

​ vim artproject/celery.py

#!/usr/bin/env python  
# encoding: utf-8  
#目的是拒絕隱式引入,celery.py和celery衝突。
from __future__ import absolute_import,unicode_literals 
import os
from celery import Celery
from django.conf import settings

# 設置環境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "artproject.settings")

#建立celery應用
app = Celery('art_project')
app.config_from_object('django.conf:settings')

#若是在工程的應用中建立了tasks.py模塊,那麼Celery應用就會自動去檢索建立的任務。好比你添加了一個任#務,在django中會實時地檢索出來。
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

(4) 建立任務 tasks

每一個任務本質上就是一個函數,在tasks.py中,寫入你想要執行的函數便可。

在應用art中添加咱們須要提供的異步服務和定時服務

vim art/tasks.py

#!/usr/bin/env python  
# encoding: utf-8  
from __future__ import absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app

from art.utils.send_mail import pack_html, send_email

@app.task
def tsend_email():
   url = "http://1000phone.com"
   receiver = 'diyuhuan@1000phone.com'
   content = pack_html(receiver, url)
   # content = 'this is email content.'
   send_email(receiver, content)
   print('send email ok!')


@app.task
def add(x, y):
   return x+y

上述咱們把異步處理任務add和定時器任務tsend_email都放在了tasks.py 中

(5)遷移生成celery須要的數據表

python manage.py migrate

此時數據庫表結構多出了幾個

celery_taskmeta            |
| celery_tasksetmeta         |
| djcelery_crontabschedule   |
| djcelery_intervalschedule  |
| djcelery_periodictask      |
| djcelery_periodictasks     |
| djcelery_taskstate         |
| djcelery_workerstate

3 啓動服務,測試

咱們能夠採用 python manage.py help 發現多出了 celery 相關選項。

(1)啓動django celery 服務

啓動服務:

python manage.py celery worker --loglevel=info

此時異步處理和定時處理服務都已經啓動了

(2)web端接口觸發異步任務處理

咱們在web端加入一個入口,觸發異步任務處理add函數

在應用art的urls.py 中加入以下對應關係

from art.views import add_handler


url(r'^add', add_handler),

art/views.py 中加入處理邏輯

def add_handler(request):
   x = request.GET.get('x', '1')
   y = request.GET.get('y', '1')
   from .tasks import add
   add.delay(int(x), int(y))
   res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]}
   return HttpResponse(json.dumps(res))

啓動web服務,經過url傳入的參數,經過handler的add.delay(x, y)計算並存入mysql

http://127.0.0.1:8000/art/add?x=188&y=22

(4) 測試定時器,發送郵件

在終端輸入 python manage.py celerybeat -l info

會自動觸發每隔30s執行一次tsend_email定時器函數,發送郵件:

CELERYBEAT_SCHEDULE = {    #定時器策略
    #定時任務一: 每隔30s運行一次
    u'測試定時器1': {
        "task": "art.tasks.tsend_email",
        #"schedule": crontab(minute='*/2'),  # or 'schedule': timedelta(seconds=3),
        "schedule":timedelta(seconds=30),
        "args": (),
    },
}

具體發送郵件服務程序見下面的第4節

4 郵件發送服務

項目中常常會有定時發送郵件的情形,好比發送數據報告,發送異常服務報告等。

能夠編輯文件 art/utils/send_mail.py, 內容編輯以下:

#!/usr/bin/env python
#-*- coding:utf-8 -*-
#written by diyuhuan
#發送郵件(wd_email_check123帳號用於內部測試使用,不要用於其餘用途)

import smtplib  
from email.mime.multipart import MIMEMultipart  
from email.mime.text import MIMEText  
from email.mime.image import MIMEImage 
from email.header import Header
import time

sender = 'wd_email_check123@163.com'  
subject = u'api開放平臺郵箱驗證'
smtpserver = 'smtp.163.com'
username = 'wd_email_check123'
password = 'wandacheck1234'
mail_postfix="163.com"

def send_email(receiver, content):
    try:
        me = username+"<"+username+"@"+mail_postfix+">"
        msg = MIMEText(content, 'html', 'utf-8')
        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = receiver
        smtp = smtplib.SMTP()  
        smtp.connect(smtpserver)  
        smtp.login(username, password)
        smtp.sendmail(sender, receiver, msg.as_string())  
        smtp.quit()
        return True
    except Exception as e:
        print('send_email has error with : ' + str(e))
        return False


def pack_html(receiver, url):
    html_content = u"<html><div>尊敬的用戶<font color='#0066FF'>%s</font> 您好!</div><br>" \
                   "<div>感謝您關注咱們的平臺 ,咱們將爲您提供最貼心的服務,祝您購物愉快。</div><br>" \
                   "<div>點擊如下連接,便可完成郵箱安全驗證:</div><br>"  \
                   "<div><a href='%s'>%s</a></div><br>"  \
                   "<div>爲保障您的賬號安全,請在24小時內點擊該連接; </div><br>" \
                   "<div>若您沒有申請過驗證郵箱 ,請您忽略此郵件,由此給您帶來的不便請諒解。</div>" \
                   "</html>" % (receiver, url, url)
    html_content = html_content
    return html_content


if __name__ == "__main__":
    url = "http://1000phone.com"
    receiver = 'diyuhuan@1000phone.com'
    #content = pack_html(receiver, url)
    content = 'this is email content. at %s.'%int(time.time())
    send_email(receiver,  content)

至此,在celery ui界面能夠看到兩類,定時器處理和異步處理。

5 啓動flower服務

​ python manager celery flower

案例

clipboard.png

讀書網站實現搶讀功能

qd(request, id) :搶讀視圖函數

quereyQD(request,id) :查詢搶讀的視圖函數

settings.py

INSTALLED_APPS = [
   'djcelery',
]

...
import djcelery

# 裝載djcelery對象
djcelery.setup_loader()
# 配置消息中間件的位置
BROKER_URL = 'redis://127.0.0.1:6379/12'

CELERY_TIMEZONE = 'Asia/Shanghai'
# 配置批量調試器
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

在主工程目錄添加celery.py, 添加自動檢索django工程tasks任務

celery.py

from __future__ import absolute_import
import os
from celery import Celery

# 設置環境變量
from HArtPro import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HArtPro.settings')

# 建立Celery對象
app = Celery('hart')

# 加載配置
app.config_from_object('django.conf:settings')

# 自動發現task的異步任務
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

當前app目錄下創建tasks.py

from MArtPro.celery import app
from utils import redis_cache


@app.task
def advanceArt(artId, userId):
    # 搶讀文章(artId 文章id, userId 當前用戶登陸的Id)
    print('用戶', userId, '正在搶讀', artId)

    # 判斷當前搶讀的hash對象AdvanceArt長度是否達到5個
    if redis_cache.hlen('AdvanceArt') >= 5:
        return artId + '搶讀失敗'

    redis_cache.hset('AdvanceArt', userId, artId)

    return artId + '搶讀成功!'

views.py

from redis_ import rd  # rd 對象
from art import tasks

def (request, artId):
    # 搶讀
    login_user = request.session.get('login_user')

    if not login_user:
        return JsonResponse({'status': 101,
                             'msg': '親,請先登陸,再搶讀,謝謝!'})

    # 判斷當前用戶是否已搶過
    user_id =json.loads(login_user).get('id')
    if redis_cache.hexists('AdvanceArt', user_id):
        return JsonResponse({'status': 205,
                             'msg': '親,你只能搶一本'})
    # 任務延遲執行
    tasks.advanceArt.delay(artId, user_id)
    return JsonResponse({'status': 201,
                         'msg': '正在搶讀...'})


def queryAdvance(request, artId):
    # 查詢搶讀是否成功
    login_user = request.session.get('login_user')

    if not login_user:
        return JsonResponse({'status': 101,
                             'msg': '親,請先登陸,再查看搶讀,謝謝!'})

    user_id = json.loads(login_user).get('id')

    artId = redis_cache.hget('AdvanceArt', user_id)
    if artId:
        art = Art.objects.get(id=artId.decode())
        return JsonResponse({'status': 200,
                             'msg': '恭喜您,搶讀%s 成功'%art.title})
    else:
        if redis_cache.hlen('AdvanceArt')< 5:
            return JsonResponse({'status': 202,
                                 'msg': '正在搶讀...'})
        else:
            return JsonResponse({'status': 203,
                                 'msg': '搶讀失敗, 請下次碰碰運氣!'})

前端經過定時器,每秒執行查詢函數

相關文章
相關標籤/搜索