Celery是一個簡單,靈活且能處理異步任務,定時任務及大量消息的分佈式系統python
專一於實時處理的異步任務隊列redis
同時也支持任務調度django
Celery框架由三部分組成:消息中間件(AMQP broker),任務執行單元(celery workers),任務執行結果存儲(task result store)組成windows
celery自己不提供消息服務,可是方便和第三方提供的消息中間件集成,包括RabbitMQ,redis等併發
Worker是celery提供的任務執行單元worker併發的運行在任務節點中app
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等框架
Celery version 4.0 runs on Python ❨2.7, 3.4, 3.5❩ PyPy ❨5.4, 5.5❩ This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
2.使用場景
異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件,消息推送,音視頻處理等等異步
定時任務:定時執行某件事情,好比天天數據統計async
pip install celery 消息中間件:RabbitMQ/Redis app=Celery('人物名',backend='xxx',broker='xxx')
建立項目:celerytest分佈式
建立py文件:tasks.py
from celery import Celery import time broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('test',broker=broker,backend=backend) @app.task def add(x,y): return x+y
建立py文件:add_task.py,添加任務
from tasks import add result = add.delay(4,5) print(result.id)
建立py文件,run.py,執行任務,或者使用命令執行:celery worker -A tasks -l info
注:windows下:celery worker -A tasks -l info -P eventlet
from tasks import add if __name__ == '__main__': add.worker_main() # cel.worker_main(argv=['--loglevel=info')
建立py文件:result.py ,查看任務執行結果
from celery.result import AsyncResult from tasks import add async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行') 執行add_task.py,添加任務,並獲取任務ID 執行run.py ,或者執行命令:celery worker -A tasks -l info -P eventlet 執行result.py 檢查任務狀態並獲取結果
multi_celery
├── celery_task# celery相關文件夾
│ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字
│ └── tasks1.py # 全部任務函數
│ └── tasks2.py # 全部任務函數
├── result.py # 檢查結果
└── add_task.py # 觸發任務
celery.py
from celery import Celery # broker:消息中間人用redis broker='redis://127.0.0.1:6379/1' # 結果存儲在redis中 backend='redis://127.0.0.1:6379/2' # 第一個參數是別名,能夠隨便寫 # include=[] app=Celery('test',broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2']) # 時區 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False
task1.py
from .celery import app @app.task def add(x,y): return x+y
taks2.py
from .celery import app @app.task def write_file(s): with open('a.txt','a',encoding='utf-8')as f: f.write(s) return '寫成功'
result.py
from celery.result import AsyncResult # 導入celery對象 from celery_task.celery import app async = AsyncResult(id="ac2a7e52-ef66-4caa-bffd-81414d869f85", app=app) if async.successful(): # 任務執行的結果,也就是返回值 result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
add_task.py
from celery_task import task1 from celery_task import task2 # 往隊列中添加一個2+3的任務 result=task1.add.delay(2,3) print(result.id) # 往隊列中添加一個寫文件的任務 result=task2.write_file.delay('lqz') print(result.id)
添加任務(執行add_task.py),開啓worker:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行result.py)
add_task.py
from celery_app_task import add from datetime import datetime # 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async並設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
多任務結構中celery.py修改以下
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.tasks1', 'celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.tasks1.test_celery', # 每隔2秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=2), # 傳遞參數 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每一年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
啓動一個beat:celery beat -A celery_task -l info
啓動work執行:celery worker -A celery_task -l info -P eventlet
在項目中建立celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS = ( 'app01.tasks', ) # 有些狀況能夠防止死鎖 CELERYD_FORCE_EXECV = True # 設置併發worker數量 CELERYD_CONCURRENCY = 4 # 容許重試 CELERY_ACKS_LATE = True # 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 100 # 超時時間 CELERYD_TASK_TIME_LIMIT = 12 * 30
在app01目錄下面建立tasks.py
from celery import task import time @task def add(x,y): time.sleep(3) return x+y
視圖函數views.py
from django.shortcuts import render,HttpResponse # Create your views here. from app01 import tasks def test(request): result=tasks.add.delay(2,4) print(result.id) return HttpResponse('ok')
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from djagocele import celeryconfig BROKER_BACKEND='redis' BROKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
起worker:
python3 manage.py celery worker