settings.pymysql
INSTALLED_APPS = ( 'djcelery', ) import djcelery djcelery.setup_loader() BROKER_URL = 'redis://localhost:6379' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_LOG_FILE = os.path.join(os.path.join(os.path.join(BASE_DIR, 'logs'), 'celery'), 'celery.log') CELERYBEAT_LOG_FILE = os.path.join(os.path.join(os.path.join(BASE_DIR, 'logs'), 'celery'), 'beat.log') CELERY_TASK_RESULT_EXPIRES = 10
項目同名目錄下的__init__.pyredis
# -*- coding:utf-8 -*- from __future__ import absolute_import,unicode_literals from .celery import app as celery_app __all__ = ['celery_app'] import pymysql pymysql.install_as_MySQLdb()
項目同名目錄下新建celery.pysql
# -*- coding:utf-8 -*- from __future__ import absolute_import,unicode_literals import os from celery import Celery, platforms from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'WebSite.settings') # WebSite主應用名 app = Celery('WebSite') platforms.C_FORCE_ROOT = True app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) app.conf.timezone = 'UTC' # from datetime import timedelta # app.conf.update( # CELERYBEAT_SCHEDULE = { # 'task_add': { # 'task': 'apps.tasks.start_running', # 'schedule': timedelta(seconds=10) # }, # }, # )
執行定時任務的app下的adminx.pydjango
# -*- coding:utf-8 -*- from __future__ import absolute_import, unicode_literals from djcelery.models import ( TaskState, WorkerState, PeriodicTask, IntervalSchedule, CrontabSchedule, ) import xadmin xadmin.site.register(IntervalSchedule) # 存儲循環任務設置的時間 xadmin.site.register(CrontabSchedule) # 存儲定時任務設置的時間 xadmin.site.register(PeriodicTask) # 存儲任務 xadmin.site.register(TaskState) # 存儲任務執行狀態 xadmin.site.register(WorkerState) # 存儲執行任務的worker
執行定時任務的app下新建tasks.pyjson
# -*- coding:utf-8 -*- from __future__ import absolute_import from WebSite.celery import app from celery import task, shared_task @app.task def start_running(info): print(info) print('--->>開始執行任務<<---') print('好比發送短信或郵件') print('>---任務結束---<') @shared_task def mul(x, y): print('乘法',x*y) return x * y