整個項目工程以下html
__init__.pypython
""" 注意點:python3.7 須要執行 pip install --upgrade https://github.com/celery/celery/tarball/master 不然會報 from . import async, base SyntaxError: invalid syntax celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis celery -A __init__ worker -l info -Q save_mongo cd /Users/admin/PycharmProjects/function_test/adminSyS/mq&&celery -A __init__ worker --concurrency=5 celery 啓動 --autoscale=10,3 當worker不足時自動加3-10個 celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3 supervisor 配置 [program:celery] directory = /data/app/adminSyS/mq command = /data/app/adminSyS/venv/bin/celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis autostart = true autorestart = true startsecs = 5 startretries = 3 監控: 文檔 https://flower-docs-cn.readthedocs.io/zh/latest/install.html pip install flower celery flower --broker=redis://:z1234567@47.93.235.228:6379/5 --port=5555 測試 http://47.93.235.228:9114 """ import os import sys sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..") from celery import Celery app = Celery(include=["mq.tasks"]) app.config_from_object("celeryconfig") # 指定配置文件
celeryconfig.pygit
import os import sys sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..") from kombu import Queue, Exchange BROKER_URL = "redis://:redis@127.0.0.1:6379/5" # 消息代理 CELERY_RESULT_BACKEND = "redis://:redis@127.0.0.1:6379/6" # 任務執行結果存放地址 CELERY_TASK_SERIALIZER = "json" # 任務序列化和翻序列化的方案 CELERY_RESULT_SERIALIZER = "json" # 讀取任務結果序列化方式 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_ACCEPT_CONTENT = ["json", "msgpack"] # 接受的任務類型 BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400} # 任務重發時間 CELERYD_MAX_TASKS_PER_CHILD = 400 # 每一個worker執行了多少任務就會自動重啓防止內存泄漏 # 指定任務隊列 save_redis save_redis2 CELERY_QUEUES = ( Queue("default", Exchange("default"), routing_key="default"), Queue("save_redis", Exchange("save_redis"), routing_key="save_redis"), Queue("save_redis2", Exchange("save_redis2"), routing_key="save_redis2") ) # mq.tasks.taskA mq.tasks.taskB 具體函數名 CELERY_ROUTES = { 'mq.tasks.taskA': {"queue": "save_redis", "routing_key": "save_redis"}, 'mq.tasks.taskB': {"queue": "save_redis2", "routing_key": "save_redis2"} }
tasks.pygithub
from mq import app import redis import time rds = redis.Redis( host="localhost", port=6379, db=7, password="redis") # 客戶端 @app.task def taskA(x, y): time.sleep(10) rds.setex(name="taskA", value=x + y, time=3600) return x + y @app.task def taskB(x, y, c): time.sleep(10) rds.setex(name="taskB{}".format(c), value=x + y, time=3600) return x + y
test.pyweb
import sys import os sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..") from mq.tasks import taskA, taskB #re1 = taskA.delay(100, 200) # print(re1) # print(re1.status) # 服務端 for i in range(10): re2 = taskB.delay(100, i, i) print(re2) print(re2.status)
celery客戶端啓動 save_redis2,save_redis 表明函數名 --autoscale=10,3 當work不夠用時自動起3-10個進程redis
celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3
# 啓動方式打印log
command=/home/op/saiyan_game_center/venv/bin/celery -A __init__ worker --concurrency=8 -l INFO -Q upload_box_task --autoscale=5,3 --logfile=/home/op/saiyan_game_center/logs/log.log
celery web監控json
文檔:https://flower-docs-cn.readthedocs.io/zh/latest/install.htmlapp
安裝:pip install flowerasync
啓動:celery flower --broker=代理url --port=5555ide
celery flower --broker=redis://:redis@127.0.0.1:6379/5 --port=5555