在django中使用Celery 和 Celery-Flower

在django中使用Celery 和 Celery-Flower

一、Celery方式的選擇

這裏Celery的中間人,我採用Redis。也能夠用Django自身和mongodb等。Celery的中間人你能夠理解爲在Celery執行過程當中的數據支持。保存列隊記錄、執行記錄等等。安裝Redis,可參考Redis在CentOS和Windows安裝過程。html

安裝redis

brew install redis

安裝celery-with-redis,執行命令:python

pip install celery-with-redis

該命令會自動安裝redis(python庫)、celery、kombu、billiard、amqp、vine和celery-with-redis相關庫。注意,這裏pip安裝的redis是python操做redis的庫,非redis數據庫。redis數據庫須要獨立安裝。git

二、Django加入Celery

打開settings.py所在的文件夾,新建celery.py文件。加入以下代碼github

from __future__ import absolute_import, unicode_literals

from celery import Celery
from django.conf import settings
from os import path, environ

project_name = path.split(path.dirname(__file__))[-1]  # okr_manage
project_settings = "{}.settings".format(project_name)
# 設置環境變量
environ.setdefault("DJANGO_SETTINGS_MODULE", project_settings)

# 實例化Celery
app = Celery(project_name)

# 使用django的settings文件配置celery
app.config_from_object("django.conf:settings", namespace='CELERY')

# Celery加載全部註冊的應用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

還需在settings.py中設置celery,尤爲是中間人的設置。若不設置中間人,會提示沒法鏈接中間人的錯誤。在settings.py文件中添加以下設置:web

#celery settings
#celery中間人 redis://redis服務所在的ip地址:端口/數據庫號
BROKER_URL = "redis://localhost:6379/"
#celery結果返回,可用於跟蹤結果
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
 
#celery內容等消息的格式設置
CELERY_ACCEPT_CONTENT = ['application/json',]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
 
#celery時區設置,使用settings中TIME_ZONE一樣的時區
CELERY_TIMEZONE = TIME_ZONE

在settings目錄的__init__.py中增長如下代碼:redis

from __future__ import absolute_import, unicode_literals

# 引入celery實例對象
from .celery import app as celery_app

三、把耗時任務丟給celery處理(在app文件夾中新建tasks.py文件)

from __future__ import absolute_import, unicode_literals

import logging
from time import sleep

import requests
from celery import shared_task

logger = logging.getLogger(__name__)


@shared_task
def send(email):
    print("start send email to %s" % email)
    time.sleep(5) #休息5秒
    print("success")
    return True

view中調用便可:mongodb

#coding:utf-8
from django.shortcuts import render
from django.http import HttpResponse
 
from .models import Blog
from .tasks import sendmail #引用tasks.py文件的中sendmail方法
import json
 
def home(request):
    #耗時任務,發送郵件(用delay執行方法)
    sendmail.delay("test@test.com")
 
    #其餘行爲
    data = list(Blog.objects.values('caption'))
    return HttpResponse(json.dumps(data), content_type = 'application/json')

四、本地啓動celery並測試

啓動celery以前,確保已經安裝redis和啓動redis服務.而後啓動celery worker數據庫

Celery -A myproject worker -l info

注意:

可能會出現如下很麻煩的問題.備註如下解決方案:django

celery -A okr_manage worker -l info  無任何反應

問題是原來是async名稱更換了.開發人員已經處理了這個issue,合併了master,快速的解決方案是經過github安裝celery,命令以下:json

pip install --upgrade https://github.com/celery/celery/tarball/master

五、django中記錄celery的日誌

settings中設置日誌handler:

LOGGING = {
    "version": 1,
    "disable_existing_loggers": True,
    "formatters": {
        "standard": {
            "format": "%(asctime)s [%(threadName)s:%(thread)d] [%(name)s:%(lineno)d] [%(module)s:%(funcName)s] "
                      "[%(levelname)s]- %(message)s"
        }
    },
    "filters": {},
    "handlers": {
        "mail_admins": {
            "level": "ERROR",
            "class": "django.utils.log.AdminEmailHandler",
            "include_html": True,
        },
        "default": {
            "level": "DEBUG",
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("all.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 5,
            "formatter": "standard",
        },
        "error": {
            "level": "ERROR",
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("error.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 5,
            "formatter": "standard",
        },
        "console": {
            "level": "INFO",
            "class": "logging.StreamHandler",
            "formatter": "standard",
        },
        "request_handler": {
            "level": "DEBUG",
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("script.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 5,
            "formatter": "standard",
        },
        "script_handler": {
            "level": "DEBUG",
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("script.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 5,
            "formatter": "standard",
        },
        "celery_logger" {
            "level": "DEBUG",
            "filters": None,
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("celery.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 2,
            "formatter": "standard"
        },
        "celery_task_logger": {
            "level": "DEBUG",
            "filters": None,
            "class": "logging.handlers.RotatingFileHandler",
            "filename": mk_log_path("celery_tasks.log"),
            "maxBytes": 1024 * 1024 * 5,
            "backupCount": 2,
            "formatter": "standard"
        },
    },
    "loggers": {
        "django": {
            "handlers": ["default", "console"],
            "level": "DEBUG",
            "propagate": False,
        },
        "django.request": {
            "handlers": ["request_handler"],
            "level": "DEBUG",
            "propagate": False,
        },
        "scripts": {
            "handlers": ["script_handler"],
            "level": "INFO",
            "propagate": False,
        },
        # 下面是要用到的py文件.
        "okr.views": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "tool_site.views": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "okr.models": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },

        "okr.okr_middleware": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "handler.personal": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "handler.tpo_tools": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "handler.auto_view": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "mock.views": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "tools_site.views": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "tools.utils": {
            "handlers": ["default", "error", "console", "request_handler", "script_handler"],
            "level": "INFO",
            "propagate": True,
        },
        "tool_site.tasks": {
            "handlers": ["celery_task_logger"],
            "level": "INFO",
            "propagate": True,
        },
        "celery": {
            "handlers": ["celery_logger"],
            "level": "INFO",
            "propagate": True,
        },
    },
}

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task
def send(username, uri, query_data, count=100):
    try:
        logger.info("push success")
        return "push success"
    except Exception as e:
        logger.exception(e)
        return "push failed"

六、celery中使用多進程

在celery中不可使用 from multiprocessing import Process. 由於celery本身已經封裝了一個.使用 from billiard.context import Process便可

from __future__ import absolute_import, unicode_literals


from time import sleep

import requests
from billiard.context import Process
from celery import shared_task
from celery.utils.log import get_task_logger

from tool_site.models import PushInfo

logger = get_task_logger(__name__)


def push(url, pk):
    resp = requests.post(url)
    PushInfo.objects.write_info(pk=pk, second=resp.elapsed.total_seconds(), code=resp.status_code)


@shared_task
def send(p_id, uri, query_data, count=100):
    try:
        PushInfo.objects.start_push(pk=p_id)

        for i, v in enumerate(query_data):
            url = "{}?json={}".format(uri, v)
            p = Process(target=push, args=(url, p_id))
            p.start()
            p.join()
            if not (i + 1) % int(count):
                logger.info("sleep:1S".format(i + 1))
                sleep(1)

        PushInfo.objects.finish(pk=p_id)
        logger.info("push success")
        return "push success"
    except Exception as e:
        logger.exception(e)
        return "push failed"

七、使用Flower

Flower是基於web的監控和管理Celery的工具

  • 安裝Flower

    pip install flower

安裝時會附帶安裝其餘依賴包,包括tornado.但tornado目前最新版本有問題.須要從新再次安裝tornado

  • 從新安裝tornado穩定版

    pip install "tornado<6.0.0"
  • 啓動flower

    flower --broker=redis://172.17.102.29:6379/ --address=127.0.0.1 --port=5555
相關文章
相關標籤/搜索