第1章 celery
python
Celery是一個簡單靈活,且可靠,處理大量消息的分佈式系統redis
用python寫的,用來執行定時任務和異步任務的框架docker
消息中間àbrokerdjango
任務執行單元àworkersession
任務執行結果存儲àstore併發
異步任務:將耗時操做任務交給celery去異步執行,好比發送短信/郵件,消息推送,音視頻處理等app
定時任務:定時執行某件事情,好比天天的數據統計框架
pip install celeryless
消息中間件:rabbitmq/redis,這裏使用redis異步
docker run -p 6379:6379 --name=redis -d redis:latest redis-server
定義任務:
from celery import Celery
import time
# 任務產生存儲
broker = 'redis://10.211.55.8:16379/1'
# 任務執行結果存儲
backend = 'redis://10.211.55.8:16379/2'
# 第一個參數是別名,隨便寫便可
app = Celery('first_celery', backend=backend, broker=broker)
@app.task
def test_celery(x, y):
time.sleep(2)
return x + y
添加任務:
import task
if __name__ == '__main__':
# result不是函數的執行結果,是一個對象
result = tasks.add.delay(2,3)
print(result.id)
使用celery命令開啓work進程
celery –A tasks worker –l info
當有多個任務須要執行時,會有多個任務函數,位了方便解藕,咱們能夠使用這種方式,方便管理
celery.py
from celeryimport Celery
broker = 'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含兩個任務的函數,去相應文件中找對應的任務函數
cel = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])
# 時區,有時候咱們想在固定的時間來執行相關任務,就須要設置時區相關配置
cel.conf.timezone = 'Asia/Shanghai'
# 是否UTC
cel.conf.enable_utc = False
task.py
from .celeryimport app
@app.task
def sum(x, y):
returnx+y
task2.py
from .celeryimport app
@app.task
def less(x, y):
returnx-y
在celery_task目錄下建立添加任務的py文件
from celery_taskimport task
from celery_task import task2
task.sum.delay(1, 2)
task2.less.delay(3, 1)
from celery_taskimport task
from celery_task import task2
from datetime import datetime, timedelta
# # 方式一:
# #獲取時間對象
# time = datetime(2019, 8, 3, 15, 0, 0)
# print(time)
# # 將時間轉換成UTC時間
# utc_time = datetime.utcfromtimestamp(time.timestamp())
# print(utc_time)
# # 用apply_async,args是函數參數,eta是指定時間
# result = task.add.apply_async(args=[1, 2], eta=utc_time)
# 方式二:
ctime = datetime.now()
# 把當前時間轉換成UTC時間
utc_time = datetime.utcfromtimestamp(ctime.utcfromtimestamp())
# 當前時間延遲十秒
time_delta = timedelta(seconds=10)
task_time = utc_time + time_delta
result = task.add.apply_async(args=[2, 3], eta=task_time)
# task.sum.delay(1, 2)
# task2.less.delay(3, 1)
多任務結構中celery.py文件以下
from celeryimport Celery
from datetime import timedelta
from celery.schedules import crontab
broker = 'redis://10.211.55.8:16379/2'
backend = 'redis://10.211.55.8:16379/3'
# include包含兩個任務的函數,去相應文件中找對應的任務函數
app = Celery('test', broker=broker, backend=backend, include=['celery_task.task', 'celery_task.task2'])
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否UTC
app.conf.enable_utc = False
app.conf.beat_schedules = {
#別名,可隨意編寫
'add_crontab': {
#執行task下的add函數
'task': 'celery_task.task.add',
#每隔兩秒執行
'schedules': timedelta(seconds=2),
#固定時間執行,每一年4月11號,8點42分執行
#'schedules': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
#傳遞參數
'args': [1, 2],
}
}
在項目的目錄下建立celeryconfig.py
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30
在app目錄下建立tasks.py
from celeryimport task
@task
def add(x, y):
returnx+y
試圖函數views中使用
from django.shortcutsimport render,HttpResponse
from app01.tasks import add
from datetime import datetime, timedelta
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay= timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
returnHttpResponse('ok')
settings中註冊
INSTALLED_APPS= [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'djcelery',
'app01',
]
from djceleryimport celeryconfig
# 默認使用rabbitmq,redis的話須要指定下
BROKER_BACKEND='redis'
BROKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'