①redis
②在windows系統運行時安裝 eventlet 模塊windows
運行時輸入代碼:app
③ 在其餘系統運行時異步
直接輸入代碼:
celery worker -A 消費者文件名 -l info
from celery import Celery import time c=Celery("task",broker="redis://192.168.226.133:6379/2",backend="redis://192.168.226.133:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3"
from s1 import myfun1,myfun2,myfun3,c
from celery.result import AsyncResult
#多個生產者
# for i in range(10):
# s=myfun1.delay()
# print(s)async
s=myfun1.delay(10,20)
print(s.id)
r=AsyncResult(id=s.id,app=c)
#獲取狀態
# print(r.status)
# print(r.successful())
#獲取值
# print(r.get())
#只獲取報錯信息
print(r.get(propagate=False))
#獲取具體出錯的位置
# print(r.traceback)spa
from celery import Celery import time c=Celery("task",broker="redis://192.168.226.133:6379/2",backend="redis://192.168.226.133:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3"
from s1 import myfun1,myfun2,myfun3,c from celery.result import AsyncResult from datetime import timedelta #指定多長時間之後執行 # s=myfun1.apply_async((10,20),countdown=5) #第二種方式,使用utc時間 s=myfun1.apply_async((10,20),eta="utc") print(s.id) # 延時 # 重試
retry_policy : 重試策略. max_retries : 最大重試次數, 默認爲 3 次. interval_start : 重試等待的時間間隔秒數, 默認爲 0 , 表示直接重試不等待. interval_step : 每次重試讓重試間隔增長的秒數, 能夠是數字或浮點數, 默認爲 0.2 interval_max : 重試間隔最大的秒數, 即 經過 interval_step 增大到多少秒以後, 就不在增長了, 能夠是數字或者浮點數, 默認爲 0.2 .
from celery import Celery import time c=Celery("task",broker="redis://192.168.226.133:6379/2",backend="redis://192.168.226.133:6379/1") @c.task def myfun1(a,b): return f"myfun1{a}{b}" @c.task def myfun2(): return "myfun2" @c.task def myfun3(): return "myfun3"
from s1 import c from celery.beat import crontab c.conf.beat_schedule = { "name": { "task": "s1.myfun1", "schedule": 3, #每三秒執行一次 "args": (10, 20) }, "crontab": { "task": "s1.myfun1", "schedule": crontab(minute=44), #每小時的第44分鐘執行 "args": (10, 20) } }
①運行s13d
②運行s2code