django-celery配置

一、項目啓動順序:

  啓動項目:python

python manage.py runserver

  啓動celery beatredis

python manage.py celery beat

  啓動celery workersql

python manage celeryd -l info

  啓動celery flower監控任務運行狀況django

celery flower --broker=redis://auth:root@localhost:6379

二、Django 結合 celery動態配置任務

  一、項目使用的版本 Django==1.11.7   celery==3.1.18 django-celery==3.2.2json

  安裝django-celery 安裝celeryapp

pip install celery==3.1.18
pip install django-celrey==3.2.2 

  二、Django結合celery分佈式

  (1)、在項目的初始文件夾下添加celery.py 文件ide

celery.pypost

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

from django.conf import settings  # noqa

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DSPProject.settings')

app = Celery('DSPProject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

  (2)  添加 celery配置到項目的settings.py 文件中spa

  settings.py

import djcelery
djcelery.setup_loader()

BROKER_URL = 'redis://auth:root@localhost:6379'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'  # 定時任務
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERYD_CONCURRENCY = 10
CELERYD_MAX_TASKS_PER_CHILD = 1  #  每一個worker最多執行1個任務就會被銷燬,可防止內存泄露

LOGIN_REDIRECT_URL = '/index/'
UPLOAD_FILE_DIR = os.path.join(BASE_DIR, "people/upload/")
CHANGE_UPLOAD_DIR = os.path.join(BASE_DIR, "change/upload/")
LOG_FILE_DIR = os.path.join(BASE_DIR, "log/")
BACKUP_USER_INFO_DIR = '/home/jiuyang/django/backup/user_info/'

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'standard': {
                'format': '%(levelname)s %(asctime)s %(message)s'
                },
    },
    'filters': {
    },
    'handlers': {
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'formatter':'standard',
        },
        'people_handler': {
            'level':'DEBUG',
            'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'people.log'),
            'formatter':'standard',
        },
        'report_handler': {
            'level':'DEBUG',
                   'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'report.log'),
            'formatter':'standard',
        },
        'change_handler': {
            'level':'DEBUG',
                   'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'change.log'),
            'formatter':'standard',
        },
        'dtmt_handler': {
            'level':'DEBUG',
                   'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'dtmt.log'),
            'formatter':'standard',
        },
        'scheduled_tasks_handler': {
            'level':'DEBUG',
                   'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'scheduled_tasks.log'),
            'formatter':'standard',
        },
        'business_query_handler': {
            'level':'DEBUG',
                   'class':'logging.handlers.RotatingFileHandler',
            'filename':'%s%s' % (LOG_FILE_DIR, 'business_query.log'),
            'formatter':'standard',
        },
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        },
        'people_log':{
            'handlers': ['people_handler'],
            'level': 'INFO',
            'propagate': False
        },
         'report':{
            'handlers': ['report_handler'],
            'level': 'INFO',
                          'propagate': False
        },
         'change':{
            'handlers': ['change_handler'],
            'level': 'INFO',
                          'propagate': False
        },
         'dtmt':{
            'handlers': ['change_handler'],
            'level': 'INFO',
                          'propagate': False
        },
         'scheduled_tasks':{
            'handlers': ['scheduled_tasks_handler'],
            'level': 'INFO',
                          'propagate': False
        },
         'business_query':{
            'handlers': ['business_query_handler'],
            'level': 'INFO',
                          'propagate': False
        },
    }
}

 

  (3)、寫celery task實現具體的任務模板

 task.py

from celery import shared_task

@shared_task(name='run_py')
def run_worke(*args):
    # import
    for line in args:
        print('run,runrurnrurnhr',line)
    return 'run python file ok'

@shared_task(name='run_add')
def add():
    x= 1
    y =3
    print(x+y,'ppppppppppppppppppppppppppppppppppppppppppppppppppppppppp')
    return x + y

  (4)實現定時任務添加配置

#主要代碼

def add_periodic_task_spiders(request):
    # 提交新增週期任務數據
    response = HttpResponse()
    cur = Currency(request)
    rq_post = getattr(cur, 'rq_post')
    jdata = rq_post('data')
    data = json.loads(jdata)
    task_spiders = data['task_spiders']
    crontab = data['crontab']
    is_enable = data['is_enable']
    is_encrypt = data['is_encrypt']
    # is_sensitive = data['is_sensitive']
    task_name = data['task_name']
    task_template = data['task_template']

    #將數據信息導入到celery的執行對列中
    schedule = CrontabSchedule.objects.get(pk=crontab).schedule
    create_or_update_task = DatabaseScheduler.create_or_update_task
    schedule_dict = {
        'schedule': schedule,
        'task': task_template,
        'args': [task_spiders],
        'enabled': is_enable
    }
    create_or_update_task(task_name, **schedule_dict)
    # mail_excel(mail_header, task_name, sql_list, **mailpara)
    response.write(json.dumps({'status': 0, 'msg': ['操做成功']}))
    return response

三、celery 實現分佈式配置

在成功運行Django-celery以後,copy   celery task源碼到須要分佈式的機器,完成任務配置

相關文章
相關標籤/搜索