使用django + celery + redis 異步發送郵件

參考:http://blog.csdn.net/Ricky110/article/details/77205291 html

 

 

環境:

centos7  +  python3.6.1 + django2.0.1  +  celery4.1.0  +  redis3.2.10python

 

yum  install -y redisredis

pip3 install redis,celery,djangodjango

開始:

建立django工程my_reportjson

建立app celery_test, 以下所示 :centos

  • INSTALLED_APPS中註冊app_celery併發

  • setting中celery配置app

  • # Celery settings
    CELERY_BROKER_URL = 'redis://localhost:6379'
    #: Only add pickle to this list if your broker is secured
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_ENABLE_UTC = True
    CELERY_TIMEZONE = 'Asia/Shanghai'
    

      

  • setting中mail配置
  • EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
    EMAIL_HOST = "smtp.mail.haoyisheng.com"
    EMAIL_HOST_PASSWORD = '******'
    DEFAULT_FROM_EMAIL = EMAIL_HOST_USER = "lijianwei@mail.haoyisheng.com"
    EMAIL_PORT = 25
    EMAIL_USE_TLS = True
    

      

  • app所在目錄添加tasks.py文件(必須是該文件名), 用於處理任務
  • from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    from django.core.mail import send_mail
    import logging
    
    logger = logging.getLogger(__name__)
    
    
    @shared_task
    def celery_send_email(subject, message, from_email, recipient_list, **kwrags):
        try:
            # 使用celery併發處理郵件發送的任務
            logger.info("\n開始發送郵件")
            send_mail(subject, message, from_email, recipient_list, **kwrags)
            logger.info("郵件發送成功")
            return 'success!'
        except Exception as e:
            logger.error("郵件發送失敗: {}".format(e))
    

      

  • 配置目錄my_report中添加celery.py文件
  • from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    
    # 爲celery程序設置DJANGO_SETTINGS_MODULE環境變量
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_report.settings')
    
    app = Celery('celery_test')
    
    # 從Django的設置文件中導入CELERY設置
    app.config_from_object('django.conf:settings', namespace='CELERY')
    # 從全部已註冊的app中加載任務模塊
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

      

  • 配置目錄my_report的__init__.py文件中添加以下
  • from __future__ import absolute_import, unicode_literals
    
    # 這將保證celery app總能在django應用啓動時啓動
    from .celery import app as celery_app
    __all__ = ['celery_app']
    

      

編寫url映射和視圖this

from django.contrib import admin
from django.urls import path
from month_report import views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('send_email/', views.add_task_to_celery, name='send_email'),
]

  

編寫views:url

from django.http import HttpResponse
from celery_test.tasks import celery_send_email

def add_task_to_celery(request):
celery_send_email.delay(u'郵件主題', 'test_mail_message', 'lijianwei@mail.haoyisheng.com',
['lijianwei@mail.haoyisheng.com'])
return HttpResponse('hello world')

  

  • 在manage.py同級目錄執行以下命令, 啓動celery的worker進程(主要用於消費或執行任務)

celery -A my_report  worker --loglevel=info

執行成功:

 

 

  • 從客戶端請求
  • http://ip:port/send_email/
  • 而後會收到頁面返回hello world, 而且終端顯示時間處理結果爲成功

而後驗證郵箱有收到郵件,成功

舒適提示

  • 當前使用方法,如須要在tasks.py中新添加任務,新增後,則須要重啓django, 而且須要從起celery worker進程, worker進程默認不能動態加載事件。
相關文章
相關標籤/搜索