用 Celery 實現郵件推送系統

系統需求

本文以Celery 實現分佈式任務隊列爲基礎,簡述了一個郵件推送系統的模型。html

Celery 是 Distributed Task Queue,分佈式任務隊列,分佈式決定了能夠有多個 worker 的存在,隊列表示其是異步操做,即存在一個產生任務提出需求的工頭,和一羣等着被分配工做的碼農。前端

需求:ajax

1.在郵件推送系統中,咱們須要對成千上萬的用戶發送郵件,發送郵件具備時效性,即不能說今天開始發郵件,要等到明天才能發送完畢。redis

2.發送郵件過程當中,可能會遇到過於頻繁,郵件服務器上信件堆積沒法及時接受新信件而產生的拒信,或者郵件服務器將咱們的郵件判決爲垃圾郵件。數據庫

3.郵件發送的 I/O 時間較長,不能讓程序在等待郵件服務器返回消息上浪費時間。django

因此咱們的推送系統要有如下特性:1.分佈式處理做業;2.閉環監控;3.異步式分發做業服務器

系統框圖

系統框圖

前端經過 ajax 調用 views 中的 callpush 接口,該接口將被推送用戶的篩選條件傳入 service,而後 service 請求數據庫,將返回數據做爲參數調用 celery 接口中 addtask 函數。celery 接口中 addtask 根據 action 參數來判斷所要添加的任務類型,根據不一樣的類型分別進行處理,放入隊列。app

系統的另一頭,worker 從隊列中取出任務,用 mail 函數推送郵件,若是發送失敗就調用 error_handler 進行異常處理,此處咱們將全部 task 的執行狀況放入 redis 中,給每一個任務進行標記,若是成功則標記爲 1,失敗則 0.異步

前端能夠經過 ajax 調用 pushstatus 來向 redis 中讀取任務執行狀況,此處咱們返回了成功和失敗任務的個數。分佈式

僞代碼實現

# Controller
from redis import StrictRedis
red = StrictRedis(host='localhost', port=6379, db=0)

def callpush(request):
  area = request.POST.get('area')
  return HttpResponse(str(mailpush(area)))

def pushstatus(request):
  failure = red.scard('status:0:task')
  success = red.scard('status:1:task')
  return HttpResponse('Failures: ' + str(failure) + '\nSuccess: ' + str(success))

# Service
def mailpush(**kargs):
  targets = MtUser.objects.filter(kargs).values('username', 'address')
  addtask(action='mailpush', data=targets, content='Hello %s!', subject='Greetings')
  return len(targets)

# Celery Interface (Dispatcher)
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

def addtask(action, data, **kargs):
  if action == 'mailpush':
    for (address, username) in data:
      app.send_task('worker.mail', args=[kargs['subject'], kargs['content'] % username, address], link_error=app.signature('worker.error_handler'))
  elif action == 'messagepush':
    pass
  else:
    pass

# Celery Backend (Worker)
from celery import Celery
from celery import Task
from redis import StrictRedis

app = Celery()
app.config_from_object('celeryconfig')
red = StrictRedis(host='localhost', port=6379, db=0)

@app.task(bind=True)
def mail(self, subject, content, address):
    from django.core.mail import EmailMessage
    msg = EmailMessage(subject, content, 'admin@admin.com', address)
    msg.content_subtype = 'html'
    msg.send()
    red.sadd('status:1:task', self.request.id)

# Overwrite the on_failure function in trace.py
@app.task
def error_handler(uuid, args):
    print uuid
    print args
    red.set(uuid, args)
    red.sadd('status:0:task', uuid)
    red.srem('status:1:task', uuid)
相關文章
相關標籤/搜索