執行異步任務:python
多任務結構:redis
Celery的架構由三部分組成,消息中間件,任務執行單元和任務執行結果存儲(task result store )組成。django
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等架構
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。併發
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等app
異步任務:將耗時操做任務提交給Celery取異步執行 ,好比發送 短信、郵件、消息推送、音頻/視頻處理等等框架
定時任務:定時執行某件事情,好比天天數據統計異步
pip install celeryasync
app=Celery('任務名', backend='xxx',broker='xxx')分佈式
基本使用
建立項目:celerytest
建立py文件:task.py
`from celery import Celery import time # broker:消息中間人用redis broker = 'redis://127.0.0.1:6397/1' # 結果存儲在redis中 backend = 'redis://127.0.0.1:6379/2' # 第一個參數是別名,能夠隨便寫 app = Celery('test', broker=broker, backend=backend) @app.task def add(x, y): time.sleep(3) return x + y
建立py文件:add_task.py,添加任務
import task if __name__ == '__main__': # 以前這樣寫,直接就執行 函數 task.add() # 如今把函數添加到執行隊列中,參數寫在delay中 # result不是函數的執行結果,他是個對象 result = task.add.delay(2, 3) # 這個任務惟一的id print(result.id)
建立py文件:result.py,查看任務執行結果
from celery.result import AsyncResult from task import app async = AsyncResult(id='ac2a7e52-ef66-4caa-bffd-81414d869f85', app=app) if async.successful(): # 任務執行的結果,也就是返回值 result = async.get() print(result) # result.forget() #將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
執行add_task.py,添加任務,並獲取任務ID
執行命令:celery worker -A celery_app_task -l info -P eventlet
執行result.py檢查任務狀態並獲取結果
設定時間讓celery執行一個任務
add_task.py
from celery_app_task import add from datetime import datetime # 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async並設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
啓動一個beat: celery beat -A celery_task -l info
啓動work執行:celery worker -A celery_task -l info -P eventlet
在項目目錄下建立celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些狀況能夠防止死鎖 CELERYD_FORCE_EXECV=True # 設置併發worker數量 CELERYD_CONCURRENCY=4 #容許重試 CELERY_ACKS_LATE=True # 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30
在app01目錄下建立tasks.py
from celery import task @task def add(a,b): with open('a.text', 'a',encoding='utf-8') as f: f.write('a') print(a+b)
視圖函數views.py
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from django_celery import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'