參考:http://blog.csdn.net/Ricky110/article/details/77205291 html
centos7 + python3.6.1 + django2.0.1 + celery4.1.0 + redis3.2.10python
yum install -y redisredis
pip3 install redis,celery,djangodjango
建立django工程my_reportjson
建立app celery_test, 以下所示 :centos
INSTALLED_APPS中註冊app_celery併發
setting中celery配置app
# Celery settings CELERY_BROKER_URL = 'redis://localhost:6379' #: Only add pickle to this list if your broker is secured CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_HOST = "smtp.mail.haoyisheng.com" EMAIL_HOST_PASSWORD = '******' DEFAULT_FROM_EMAIL = EMAIL_HOST_USER = "lijianwei@mail.haoyisheng.com" EMAIL_PORT = 25 EMAIL_USE_TLS = True
from __future__ import absolute_import, unicode_literals from celery import shared_task from django.core.mail import send_mail import logging logger = logging.getLogger(__name__) @shared_task def celery_send_email(subject, message, from_email, recipient_list, **kwrags): try: # 使用celery併發處理郵件發送的任務 logger.info("\n開始發送郵件") send_mail(subject, message, from_email, recipient_list, **kwrags) logger.info("郵件發送成功") return 'success!' except Exception as e: logger.error("郵件發送失敗: {}".format(e))
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 爲celery程序設置DJANGO_SETTINGS_MODULE環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_report.settings') app = Celery('celery_test') # 從Django的設置文件中導入CELERY設置 app.config_from_object('django.conf:settings', namespace='CELERY') # 從全部已註冊的app中加載任務模塊 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
from __future__ import absolute_import, unicode_literals # 這將保證celery app總能在django應用啓動時啓動 from .celery import app as celery_app __all__ = ['celery_app']
編寫url映射和視圖this
from django.contrib import admin from django.urls import path from month_report import views urlpatterns = [ path('admin/', admin.site.urls), path('send_email/', views.add_task_to_celery, name='send_email'), ]
編寫views:url
from django.http import HttpResponse from celery_test.tasks import celery_send_email
def add_task_to_celery(request):
celery_send_email.delay(u'郵件主題', 'test_mail_message', 'lijianwei@mail.haoyisheng.com',
['lijianwei@mail.haoyisheng.com'])
return HttpResponse('hello world')
celery -A my_report worker --loglevel=info
執行成功:
而後驗證郵箱有收到郵件,成功