開啓服務 #日誌 級別
celery -A write_file worker -l info
file name :write_file.py
from celery import Celery celery_app = Celery("write_file",broker="redis://127.0.0.1:6379") import time import random @celery_app.task def test(): #發送短信 或者 一些容易阻塞的功能 with open(random.randint(0,1000).__str__()+'.txt','wt',encoding="utf-8") as f: time.sleep(int(random.randint(0,2))) f.write(random.randint(0,2).__str__())
python3 manage.py runserver
file name : manage.py
from flask import Flask from flask_script import Manager from write_file import * app = Flask(__name__) manager = Manager(app) @app.route('/') def index():
t=test.delay()
#返回的是任務編號 print(t) print(type(t)) return 'test' manager.run()