本文以Celery 實現分佈式任務隊列爲基礎,簡述了一個郵件推送系統的模型。html
Celery 是 Distributed Task Queue,分佈式任務隊列,分佈式決定了能夠有多個 worker 的存在,隊列表示其是異步操做,即存在一個產生任務提出需求的工頭,和一羣等着被分配工做的碼農。前端
需求:ajax
1.在郵件推送系統中,咱們須要對成千上萬的用戶發送郵件,發送郵件具備時效性,即不能說今天開始發郵件,要等到明天才能發送完畢。redis
2.發送郵件過程當中,可能會遇到過於頻繁,郵件服務器上信件堆積沒法及時接受新信件而產生的拒信,或者郵件服務器將咱們的郵件判決爲垃圾郵件。數據庫
3.郵件發送的 I/O 時間較長,不能讓程序在等待郵件服務器返回消息上浪費時間。django
因此咱們的推送系統要有如下特性:1.分佈式處理做業;2.閉環監控;3.異步式分發做業服務器
前端經過 ajax 調用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,而後 service 請求數據庫,將返回數據做爲參數調用 celery 接口中 addtask 函數。celery 接口中 addtask 根據 action 參數來判斷所要添加的任務類型,根據不一樣的類型分別進行處理,放入隊列。app
系統的另一頭,worker 從隊列中取出任務,用 mail 函數推送郵件,若是發送失敗就調用 error_handler 進行異常處理,此處咱們將全部 task 的執行狀況放入 redis 中,給每一個任務進行標記,若是成功則標記爲 1,失敗則 0.異步
前端能夠經過 ajax 調用 pushstatus 來向 redis 中讀取任務執行狀況,此處咱們返回了成功和失敗任務的個數。分佈式
# Controller from redis import StrictRedis red = StrictRedis(host='localhost', port=6379, db=0) def callpush(request): area = request.POST.get('area') return HttpResponse(str(mailpush(area))) def pushstatus(request): failure = red.scard('status:0:task') success = red.scard('status:1:task') return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success)) # Service def mailpush(**kargs): targets = MtUser.objects.filter(kargs).values('username', 'address') addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings') return len(targets) # Celery Interface (Dispatcher) from celery import Celery app = Celery() app.config_from_object('celeryconfig') def addtask(action, data, **kargs): if action == 'mailpush': for (address, username) in data: app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler')) elif action == 'messagepush': pass else: pass # Celery Backend (Worker) from celery import Celery from celery import Task from redis import StrictRedis app = Celery() app.config_from_object('celeryconfig') red = StrictRedis(host='localhost', port=6379, db=0) @app.task(bind=True) def mail(self, subject, content, address): from django.core.mail import EmailMessage msg = EmailMessage(subject, content, 'admin@admin.com', address) msg.content_subtype = 'html' msg.send() red.sadd('status:1:task', self.request.id) # Overwrite the on_failure function in trace.py @app.task def error_handler(uuid, args): print uuid print args red.set(uuid, args) red.sadd('status:0:task', uuid) red.srem('status:1:task', uuid)