這裏Celery的中間人,我採用Redis。也能夠用Django自身和mongodb等。Celery的中間人你能夠理解爲在Celery執行過程當中的數據支持。保存列隊記錄、執行記錄等等。安裝Redis,可參考Redis在CentOS和Windows安裝過程。html
brew install redis
安裝celery-with-redis,執行命令:python
pip install celery-with-redis
該命令會自動安裝redis(python庫)、celery、kombu、billiard、amqp、vine和celery-with-redis相關庫。注意,這裏pip安裝的redis是python操做redis的庫,非redis數據庫。redis數據庫須要獨立安裝。git
打開settings.py所在的文件夾,新建celery.py文件。加入以下代碼github
from __future__ import absolute_import, unicode_literals from celery import Celery from django.conf import settings from os import path, environ project_name = path.split(path.dirname(__file__))[-1] # okr_manage project_settings = "{}.settings".format(project_name) # 設置環境變量 environ.setdefault("DJANGO_SETTINGS_MODULE", project_settings) # 實例化Celery app = Celery(project_name) # 使用django的settings文件配置celery app.config_from_object("django.conf:settings", namespace='CELERY') # Celery加載全部註冊的應用 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print("Request: {0!r}".format(self.request))
還需在settings.py中設置celery,尤爲是中間人的設置。若不設置中間人,會提示沒法鏈接中間人的錯誤。在settings.py文件中添加以下設置:web
#celery settings #celery中間人 redis://redis服務所在的ip地址:端口/數據庫號 BROKER_URL = "redis://localhost:6379/" #celery結果返回,可用於跟蹤結果 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' #celery內容等消息的格式設置 CELERY_ACCEPT_CONTENT = ['application/json',] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' #celery時區設置,使用settings中TIME_ZONE一樣的時區 CELERY_TIMEZONE = TIME_ZONE
在settings目錄的__init__.py中增長如下代碼:redis
from __future__ import absolute_import, unicode_literals # 引入celery實例對象 from .celery import app as celery_app
from __future__ import absolute_import, unicode_literals import logging from time import sleep import requests from celery import shared_task logger = logging.getLogger(__name__) @shared_task def send(email): print("start send email to %s" % email) time.sleep(5) #休息5秒 print("success") return True
view中調用便可:mongodb
#coding:utf-8 from django.shortcuts import render from django.http import HttpResponse from .models import Blog from .tasks import sendmail #引用tasks.py文件的中sendmail方法 import json def home(request): #耗時任務,發送郵件(用delay執行方法) sendmail.delay("test@test.com") #其餘行爲 data = list(Blog.objects.values('caption')) return HttpResponse(json.dumps(data), content_type = 'application/json')
啓動celery以前,確保已經安裝redis和啓動redis服務.而後啓動celery worker數據庫
Celery -A myproject worker -l info
可能會出現如下很麻煩的問題.備註如下解決方案:django
celery -A okr_manage worker -l info 無任何反應
問題是原來是async名稱更換了.開發人員已經處理了這個issue,合併了master,快速的解決方案是經過github安裝celery,命令以下:json
pip install --upgrade https://github.com/celery/celery/tarball/master
settings中設置日誌handler:
LOGGING = { "version": 1, "disable_existing_loggers": True, "formatters": { "standard": { "format": "%(asctime)s [%(threadName)s:%(thread)d] [%(name)s:%(lineno)d] [%(module)s:%(funcName)s] " "[%(levelname)s]- %(message)s" } }, "filters": {}, "handlers": { "mail_admins": { "level": "ERROR", "class": "django.utils.log.AdminEmailHandler", "include_html": True, }, "default": { "level": "DEBUG", "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("all.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 5, "formatter": "standard", }, "error": { "level": "ERROR", "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("error.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 5, "formatter": "standard", }, "console": { "level": "INFO", "class": "logging.StreamHandler", "formatter": "standard", }, "request_handler": { "level": "DEBUG", "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("script.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 5, "formatter": "standard", }, "script_handler": { "level": "DEBUG", "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("script.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 5, "formatter": "standard", }, "celery_logger" { "level": "DEBUG", "filters": None, "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("celery.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 2, "formatter": "standard" }, "celery_task_logger": { "level": "DEBUG", "filters": None, "class": "logging.handlers.RotatingFileHandler", "filename": mk_log_path("celery_tasks.log"), "maxBytes": 1024 * 1024 * 5, "backupCount": 2, "formatter": "standard" }, }, "loggers": { "django": { "handlers": ["default", "console"], "level": "DEBUG", "propagate": False, }, "django.request": { "handlers": ["request_handler"], "level": "DEBUG", "propagate": False, }, "scripts": { "handlers": ["script_handler"], "level": "INFO", "propagate": False, }, # 下面是要用到的py文件. "okr.views": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "tool_site.views": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "okr.models": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "okr.okr_middleware": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "handler.personal": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "handler.tpo_tools": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "handler.auto_view": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "mock.views": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "tools_site.views": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "tools.utils": { "handlers": ["default", "error", "console", "request_handler", "script_handler"], "level": "INFO", "propagate": True, }, "tool_site.tasks": { "handlers": ["celery_task_logger"], "level": "INFO", "propagate": True, }, "celery": { "handlers": ["celery_logger"], "level": "INFO", "propagate": True, }, }, }
tasks.py
from __future__ import absolute_import, unicode_literals from celery import shared_task from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @shared_task def send(username, uri, query_data, count=100): try: logger.info("push success") return "push success" except Exception as e: logger.exception(e) return "push failed"
在celery中不可使用 from multiprocessing import Process. 由於celery本身已經封裝了一個.使用 from billiard.context import Process便可
from __future__ import absolute_import, unicode_literals from time import sleep import requests from billiard.context import Process from celery import shared_task from celery.utils.log import get_task_logger from tool_site.models import PushInfo logger = get_task_logger(__name__) def push(url, pk): resp = requests.post(url) PushInfo.objects.write_info(pk=pk, second=resp.elapsed.total_seconds(), code=resp.status_code) @shared_task def send(p_id, uri, query_data, count=100): try: PushInfo.objects.start_push(pk=p_id) for i, v in enumerate(query_data): url = "{}?json={}".format(uri, v) p = Process(target=push, args=(url, p_id)) p.start() p.join() if not (i + 1) % int(count): logger.info("sleep:1S".format(i + 1)) sleep(1) PushInfo.objects.finish(pk=p_id) logger.info("push success") return "push success" except Exception as e: logger.exception(e) return "push failed"
Flower是基於web的監控和管理Celery的工具
安裝Flower
pip install flower
安裝時會附帶安裝其餘依賴包,包括tornado.但tornado目前最新版本有問題.須要從新再次安裝tornado
從新安裝tornado穩定版
pip install "tornado<6.0.0"
啓動flower
flower --broker=redis://172.17.102.29:6379/ --address=127.0.0.1 --port=5555