用於異步週期任務的處理python
(1)任務 app (2)記錄任務的緩存(一般用redis或rabbitMQ) 任務記錄 -broker 任務返回記錄-backend (3)Worker 員工 主動執行任務 主動反饋結果
s1.pyredis
from celery import Celery import time #使用redis鏈接url格式 :redis://:password@hostname:port/db_number my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中task表明你對任務在內部的稱呼 @my_task.task def my_func1(a,b): time.sleep(10) return a+b @my_task.task def my_func2(): time.sleep(10) return 2 @my_task.task def my_func3(): time.sleep(10) return 3
命令行運行json
Linux:Linux - celery worker -A s1 -l INFO Windows:celery worker -A s1 -l INFO -P eventlet #Windows下須要下載eventlet模塊模塊,不然celery4的版本不支持windows #l:日誌輸出 #c:數量
s2.pyflask
from s1 import my_func1,my_func2,my_func3 pid=my_func1.delay(10,20) print(pid) pid=my_func2.delay() print(pid) pid=my_func3.delay() print(pid)
s3.pywindows
from celery.result import AsyncResult from s1 import my_task #運行s2.py獲得的pid res=AsyncResult(id='2b36d20f-da07-42fe-b203-1e56fbaafd5e',app=my_task) if res.successful(): print(res.get()) else: print("任務正在進行中")
在caiji.py中緩存
from flask import Flask,request as requ,jsonify,render_template,send_file import pymongo import json import time import urllib import requests import re from urllib import request import uuid from celery import Celery import time #使用redis鏈接url格式 :redis://:password@hostname:port/db_number my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中task表明你對任務在內部的稱呼 #獲取各類分類的歌曲列表 @my_task.task def getcontent(): # content=requ.form.get("content") # print(content) headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"} url="https://www.ximalaya.com/ertong/ertongwenxue" request=urllib.request.Request(url,headers=headers) response=urllib.request.urlopen(request) response_text=response.read().decode("utf-8") title_id_list=re.findall('"album-title line-2 lg.+?" title="(.+?)" href="/ertong/(\d+?)/">',response_text) anthor_list=re.findall('"album-author.+?" title="(.+?)" href',response_text) response_list=[] i = 0 for i in range(len(title_id_list)): response_dict={} response_dict={ "title":title_id_list[i][0], "id":title_id_list[i][1], "author":anthor_list[i] } response_list.append(response_dict) # print("返回",response_list) return response_list #獲取music的二進制文件 @my_task.task def getmusic(id): print(id) url="http://m.ximalaya.com/ertong/"+id+"/" response=requests.get(url) response.encoding="utf-8" path=re.findall('"isCopyright":.+?"src":"(.+?)","albumId"',response.text)[0] print("res",path) d_data = requests.get(path) get_str=str(uuid.uuid4()) print(get_str) name="./music/"+get_str + ".mp3" with open(name,"wb") as f: f.write(d_data.content) return send_file(name) # getcontent()
在results.py中app
from caiji import getcontent,getmusic res1=getcontent.delay() print(res1) for i in res1.get(): res2 = getmusic.delay(i["id"]) print(res2)
在s4.py中異步
from celery import Celery import time my_task=Celery("task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中表明你對任務在內部的稱呼 @my_task.task def my_func1(a,b): return 1
在s5.py中async
import datetime import time from s4 import my_func1 tp = time.time() utc_time = datetime.datetime.utcfromtimestamp(tp) add_time = datetime.timedelta(seconds=10) utc_time = utc_time + add_time res = my_func1.apply_async(args=(2,3),eta=utc_time) print(res)
task_one.py函數
from celery import Celery import time my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") @my_task.task def my_func1(): time.sleep(10) return "十秒鐘執行的"
task_two.py
import time from task_one import my_task @my_task.task def my_func2(): time.sleep(5) return "五秒鐘執行的"
s6.py
from celery import Celery from celery.schedules import crontab celery_task = Celery("task", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379", include=["task_one","task_two"]) #我要要對beat任務生產作一個配置,這個配置的意思就是每10秒執行一次Celery_task.task_one任務參數是(10,10) celery_task.conf.beat_schedule={ "each10s_task":{ "task":"task_one.my_func1", "schedule":10, # 每10秒鐘執行一次 # "args":(10,20) }, "each5s_task": { "task": "task_two.my_func2", "schedule":5, # 每5秒 # "args": (50, 60) }, } # celery beat -A Celery_task # celery worker -A Celery_task -l INFO -P eventlet
celery beat -A Celery_task
在selery.py中
from celery import Celery my_task = Celery("task", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379", include=["Celery_task.task_one","Celery_task.task_two"])
在task_one.py中
from Celery_task.celery import my_task @my_task.task def func1(): return 1 @my_task.task def func3(): return 3
在task_two.py中
from Celery_task.celery import my_task @my_task.task def func2(): return 2
celery worker -A Celery_task -l INFO -P eventlet