本節將celery集成到Django項目中,實現異步任務處理和定時任務處理html
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。前端
消息中間件python
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQmysql
任務執行單元web
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。redis
任務結果存儲sql
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache數據庫
在虛擬環境中安裝:django
pip install django-celery==3.2.2json
pip install django-redis
pip install flower # celery 的web管理平臺(異步任務可視化)
查看集成到Django中的celery版本, pip freeze
celery==3.1.26.post2 django-celery==3.2.2 flower==0.9.2
啓動redis服務, 端口假設爲6379
發現pip安裝比較慢的狀況
pip install pillow -i https://pypi.douban.com/simple
(1)在主工程的配置文件settings.py 中應用註冊表INSTALLED_APPS中加入 djcelery
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'art', 'xadmin', 'crispy_forms', 'DjangoUeditor', 'djcelery', #加入djcelery ]
(2) 在settings.py 中加入celery配置信息
############################# # celery 配置信息 start ############################# import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_IMPORTS = ('art.tasks') CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' from celery.schedules import crontab from celery.schedules import timedelta CELERYBEAT_SCHEDULE = { #定時器策略 #定時任務一: 每隔30s運行一次 u'測試定時器1': { "task": "art.tasks.tsend_email", #"schedule": crontab(minute='*/2'), # or 'schedule': timedelta(seconds=3), "schedule":timedelta(seconds=30), "args": (), }, } ############################# # celery 配置信息 end #############################
當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的全部app目錄中的tasks.py文件,找到標記爲task的方法,將它們註冊爲celery task
BROKER_URL:broker是代理人,它負責分發任務給worker去執行。我使用的是Redis做爲broker
沒有設置 CELERY_RESULT_BACKEND,默認沒有配置,此時Django會使用默認的數據庫(也是你指定的orm數據庫)。
CELERY_IMPORTS:是導入目標任務文件
CELERYBEAT_SCHEDULER:使用了django-celery默認的數據庫調度模型,任務執行週期都被存在默認指定的orm數據庫中.
CELERYBEAT_SCHEDULE:設置定時的時間配置, 能夠精確到秒,分鐘,小時,天,周等。
(3)建立應用實例
在主工程目錄添加celery.py, 添加自動檢索django工程tasks任務
vim artproject/celery.py
#!/usr/bin/env python # encoding: utf-8 #目的是拒絕隱式引入,celery.py和celery衝突。 from __future__ import absolute_import,unicode_literals import os from celery import Celery from django.conf import settings # 設置環境變量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "artproject.settings") #建立celery應用 app = Celery('art_project') app.config_from_object('django.conf:settings') #若是在工程的應用中建立了tasks.py模塊,那麼Celery應用就會自動去檢索建立的任務。好比你添加了一個任#務,在django中會實時地檢索出來。 app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
(4) 建立任務 tasks
每一個任務本質上就是一個函數,在tasks.py中,寫入你想要執行的函數便可。
在應用art中添加咱們須要提供的異步服務和定時服務
vim art/tasks.py
#!/usr/bin/env python # encoding: utf-8 from __future__ import absolute_import import time from django.core.mail import send_mail from celery.utils.log import get_task_logger from artproject.celery import app from art.utils.send_mail import pack_html, send_email @app.task def tsend_email(): url = "http://1000phone.com" receiver = 'diyuhuan@1000phone.com' content = pack_html(receiver, url) # content = 'this is email content.' send_email(receiver, content) print('send email ok!') @app.task def add(x, y): return x+y
上述咱們把異步處理任務add和定時器任務tsend_email都放在了tasks.py 中
(5)遷移生成celery須要的數據表
python manage.py migrate
此時數據庫表結構多出了幾個
celery_taskmeta | | celery_tasksetmeta | | djcelery_crontabschedule | | djcelery_intervalschedule | | djcelery_periodictask | | djcelery_periodictasks | | djcelery_taskstate | | djcelery_workerstate
咱們能夠採用 python manage.py help 發現多出了 celery 相關選項。
(1)啓動django celery 服務
啓動服務:
python manage.py celery worker --loglevel=info
此時異步處理和定時處理服務都已經啓動了
(2)web端接口觸發異步任務處理
咱們在web端加入一個入口,觸發異步任務處理add函數
在應用art的urls.py 中加入以下對應關係
from art.views import add_handler url(r'^add', add_handler),
art/views.py 中加入處理邏輯
def add_handler(request): x = request.GET.get('x', '1') y = request.GET.get('y', '1') from .tasks import add add.delay(int(x), int(y)) res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]} return HttpResponse(json.dumps(res))
啓動web服務,經過url傳入的參數,經過handler的add.delay(x, y)計算並存入mysql
http://127.0.0.1:8000/art/add?x=188&y=22
(4) 測試定時器,發送郵件
在終端輸入 python manage.py celerybeat -l info
會自動觸發每隔30s執行一次tsend_email定時器函數,發送郵件:
CELERYBEAT_SCHEDULE = { #定時器策略 #定時任務一: 每隔30s運行一次 u'測試定時器1': { "task": "art.tasks.tsend_email", #"schedule": crontab(minute='*/2'), # or 'schedule': timedelta(seconds=3), "schedule":timedelta(seconds=30), "args": (), }, }
具體發送郵件服務程序見下面的第4節
項目中常常會有定時發送郵件的情形,好比發送數據報告,發送異常服務報告等。
能夠編輯文件 art/utils/send_mail.py, 內容編輯以下:
#!/usr/bin/env python #-*- coding:utf-8 -*- #written by diyuhuan #發送郵件(wd_email_check123帳號用於內部測試使用,不要用於其餘用途) import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.header import Header import time sender = 'wd_email_check123@163.com' subject = u'api開放平臺郵箱驗證' smtpserver = 'smtp.163.com' username = 'wd_email_check123' password = 'wandacheck1234' mail_postfix="163.com" def send_email(receiver, content): try: me = username+"<"+username+"@"+mail_postfix+">" msg = MIMEText(content, 'html', 'utf-8') msg['Subject'] = subject msg['From'] = sender msg['To'] = receiver smtp = smtplib.SMTP() smtp.connect(smtpserver) smtp.login(username, password) smtp.sendmail(sender, receiver, msg.as_string()) smtp.quit() return True except Exception as e: print('send_email has error with : ' + str(e)) return False def pack_html(receiver, url): html_content = u"<html><div>尊敬的用戶<font color='#0066FF'>%s</font> 您好!</div><br>" \ "<div>感謝您關注咱們的平臺 ,咱們將爲您提供最貼心的服務,祝您購物愉快。</div><br>" \ "<div>點擊如下連接,便可完成郵箱安全驗證:</div><br>" \ "<div><a href='%s'>%s</a></div><br>" \ "<div>爲保障您的賬號安全,請在24小時內點擊該連接; </div><br>" \ "<div>若您沒有申請過驗證郵箱 ,請您忽略此郵件,由此給您帶來的不便請諒解。</div>" \ "</html>" % (receiver, url, url) html_content = html_content return html_content if __name__ == "__main__": url = "http://1000phone.com" receiver = 'diyuhuan@1000phone.com' #content = pack_html(receiver, url) content = 'this is email content. at %s.'%int(time.time()) send_email(receiver, content)
至此,在celery ui界面能夠看到兩類,定時器處理和異步處理。
python manager celery flower
讀書網站實現搶讀功能
qd(request, id) :搶讀視圖函數
quereyQD(request,id) :查詢搶讀的視圖函數
settings.py
INSTALLED_APPS = [ 'djcelery', ] ... import djcelery # 裝載djcelery對象 djcelery.setup_loader() # 配置消息中間件的位置 BROKER_URL = 'redis://127.0.0.1:6379/12' CELERY_TIMEZONE = 'Asia/Shanghai' # 配置批量調試器 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
在主工程目錄添加celery.py, 添加自動檢索django工程tasks任務
celery.py
from __future__ import absolute_import import os from celery import Celery # 設置環境變量 from HArtPro import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HArtPro.settings') # 建立Celery對象 app = Celery('hart') # 加載配置 app.config_from_object('django.conf:settings') # 自動發現task的異步任務 app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
當前app目錄下創建tasks.py
from MArtPro.celery import app from utils import redis_cache @app.task def advanceArt(artId, userId): # 搶讀文章(artId 文章id, userId 當前用戶登陸的Id) print('用戶', userId, '正在搶讀', artId) # 判斷當前搶讀的hash對象AdvanceArt長度是否達到5個 if redis_cache.hlen('AdvanceArt') >= 5: return artId + '搶讀失敗' redis_cache.hset('AdvanceArt', userId, artId) return artId + '搶讀成功!'
views.py
from redis_ import rd # rd 對象 from art import tasks def (request, artId): # 搶讀 login_user = request.session.get('login_user') if not login_user: return JsonResponse({'status': 101, 'msg': '親,請先登陸,再搶讀,謝謝!'}) # 判斷當前用戶是否已搶過 user_id =json.loads(login_user).get('id') if redis_cache.hexists('AdvanceArt', user_id): return JsonResponse({'status': 205, 'msg': '親,你只能搶一本'}) # 任務延遲執行 tasks.advanceArt.delay(artId, user_id) return JsonResponse({'status': 201, 'msg': '正在搶讀...'}) def queryAdvance(request, artId): # 查詢搶讀是否成功 login_user = request.session.get('login_user') if not login_user: return JsonResponse({'status': 101, 'msg': '親,請先登陸,再查看搶讀,謝謝!'}) user_id = json.loads(login_user).get('id') artId = redis_cache.hget('AdvanceArt', user_id) if artId: art = Art.objects.get(id=artId.decode()) return JsonResponse({'status': 200, 'msg': '恭喜您,搶讀%s 成功'%art.title}) else: if redis_cache.hlen('AdvanceArt')< 5: return JsonResponse({'status': 202, 'msg': '正在搶讀...'}) else: return JsonResponse({'status': 203, 'msg': '搶讀失敗, 請下次碰碰運氣!'})
前端經過定時器,每秒執行查詢函數