Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統redis
專一於實時處理的異步任務隊列數據庫
django
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。windows
架構
併發
app
異步
async
分佈式
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')
from celery import Celery # broker='redis://127.0.0.1:6379/2' 不加密碼 backend='redis://:123456@127.0.0.1:6379/0' #最後的0是指定的庫 broker='redis://:123456@127.0.0.1:6379/1' app = Celery('test',broker=broker,backend=backend) #必定要指定一個名字,每次實例化都要起一個名字 #任務實際上是一個函數 #須要用到一個裝飾器,表示該任務是被celery管理的,而且能夠用celery執行的 @app.task() def add(x,y): import time time.sleep(2) return x+y
如今須要把任務提交到消息隊列裏,是另外一個程序須要新建一個add_task.py
#提交任務到消息隊列中 #只是把任務提交到消息隊列中,並無執行 ret = celery_task_s1.add.delay(4,5) print(ret) #c94054bb-9aac-425f-a4e2-9b315b2a2fe1返回了字符串,這就是任務的標識號,那這個去數據庫查
注:windows下:celery worker -A celery_app_task -l info -P eventlet
在命令行輸入命令
而後數據庫就有結果了
而後查看任務執行結果建立celery_result.py
from celery.result import AsyncResult from celery_task_s1 import app async = AsyncResult(id="c94054bb-9aac-425f-a4e2-9b315b2a2fe1", app=app) if async.successful(): #取出return值 result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
這就是分佈式
-1 指定broker(消息中間件),指定backend(結果存儲) -2 實例化產生一個Celery對象 app=Celery('名字',broker,backend) -3 加裝飾器綁定任務,在函數(add)上加裝飾器app.task -4 其餘程序提交任務,先導入add,add.delay(參,參數),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print會打印出任務的id,之後用id去查詢任務是否執行完成 -5 啓動worker去執行任務:celery worker -A celery_task_s1 -l info windows下:celery worker -A celery_task_s1 -l info -P eventlet -6 查看結果:根據id去查詢
pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 全部任務函數 │ └── tasks2.py # 全部任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務
celery.py
from celery import Celery backend='redis://:123456@127.0.0.1:6379/1' #最後的0是指定的庫 broker='redis://:123456@127.0.0.1:6379/0' app = Celery('test',broker=broker,backend=backend, include=['celery_task.order_task', 'celery_task.user_task' ]) #必定要指定一個名字,每次實例化都要起一個名字 # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類 #時區 # app.conf.timezone = 'Asia/Shanghai' #是否使用UTC # app.conf.enable_utf = False
order_task.py
from celery_task.celery import app import time @app.task def order_add(x,y): time.sleep(1) return x+y
user_task.py
from celery_task.celery import app import time @app.task def user_add(x,y): time.sleep(1) return x+y
add_task.py
from celery_task.order_task import order_add from celery_task.user_task import user_add ret = order_add.delay(5,6) print(ret) ret2 =user_add.delay(10,6) print(ret2)
先執行add_task拿到惟一id存到redis裏的backend結果存儲而後再到命令行執行
-啓動worker,celery_task是包的名字
celery worker -A celery_task -l info -P eventle
查詢結果celery_result.py
from celery.result import AsyncResult from celery_task.celery import app async = AsyncResult(id="48da55ab-6109-4913-8f43-0d6a6ad4a174", app=app) if async.successful(): #取出return值 result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
v1 = datetime(2019, 7, 13, 9, 48, 56) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) #取出我要執行任務的時間對象,調用apply_async方法,args是參數,eta是執行的時間 result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2) print(result.id)
方式二:第二種取時間的方法,獲取當前之間,定時執行
ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta #取10秒以後的時間對象 time_delay = timedelta(seconds=3) task_time = utc_ctime + time_delay # 使用apply_async並設定時間 result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time) print(result.id)
from celery import Celery from datetime import timedelta from celery.schedules import crontab backend='redis://:123456@127.0.0.1:6379/1' #最後的0是指定的庫 broker='redis://:123456@127.0.0.1:6379/0' app = Celery('test',broker=broker,backend=backend, include=['celery_task.order_task', 'celery_task.user_task' ]) #必定要指定一個名字,每次實例化都要起一個名字 # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類 #時區 app.conf.timezone = 'Asia/Shanghai' #是否使用UTC app.conf.enable_utf = False -----******----- app.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.order_task.order_add', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=2), # 傳遞參數 'args': (5,6) }, # 'add-every-12-seconds': { # 'task': 'celery_task.order_task.order_add', # # 每一年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
啓動一個beat:celery beat -A celery_task -l info
安裝包
celery==3.1.25
django-celery==3.1.20
因爲django-celery很差用因此直接把多任務結構中的celery_task文件複製到django目錄下
-******在celery的任務函數中不能直接調用django的環境,須要手動添加
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
import django
django.setup()
不多有人使用django的celery