Celery的基本使用

Celery

一、什麼是Celery

  • Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,專一於實時處理的異步任務隊列,同時也支持任務調度。
  • 用Python寫的執行 定時任務和異步任務的框架

執行異步任務:python

  • 建立任務:tasks.py
  • 把任務添加到隊列中:add_task.py
  • 開啓work,執行任務
    • 用命令:celery -A tasks worker -l info
    • 在 Windows下:celery -A tasks worker -l info -P eventlet
  • 查看任務結果:task_resut.py

多任務結構:redis

  • 重點:執行work的時候:celery -A tasks worker -l info -P eventlet

二、Celery架構

Celery的架構由三部分組成,消息中間件,任務執行單元和任務執行結果存儲(task result store )組成。django

消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等架構

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。併發

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等app

三、使用場景

異步任務:將耗時操做任務提交給Celery取異步執行 ,好比發送 短信、郵件、消息推送、音頻/視頻處理等等框架

定時任務:定時執行某件事情,好比天天數據統計異步

四、Celery的安裝配置

pip install celeryasync

app=Celery('任務名', backend='xxx',broker='xxx')分佈式

五、Celery執行異步任務

基本使用

建立項目:celerytest

建立py文件:task.py

`from celery import Celery
import time

# broker:消息中間人用redis
broker = 'redis://127.0.0.1:6397/1'
# 結果存儲在redis中
backend = 'redis://127.0.0.1:6379/2'
# 第一個參數是別名,能夠隨便寫
app = Celery('test', broker=broker, backend=backend)


@app.task
def add(x, y):
    time.sleep(3)
    return x + y

建立py文件:add_task.py,添加任務

import task

if __name__ == '__main__':
    # 以前這樣寫,直接就執行 函數
    task.add()
    # 如今把函數添加到執行隊列中,參數寫在delay中
    # result不是函數的執行結果,他是個對象
    result = task.add.delay(2, 3)
    # 這個任務惟一的id
    print(result.id)

建立py文件:result.py,查看任務執行結果

from celery.result import AsyncResult

from task import app

async = AsyncResult(id='ac2a7e52-ef66-4caa-bffd-81414d869f85', app=app)

if async.successful():
    # 任務執行的結果,也就是返回值
    result = async.get()
    print(result)
    # result.forget() #將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

執行add_task.py,添加任務,並獲取任務ID

執行命令:celery worker -A celery_app_task -l info -P eventlet

執行result.py檢查任務狀態並獲取結果

六、Celery執行定時任務

設定時間讓celery執行一個任務

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

啓動一個beat: celery beat -A celery_task -l info

啓動work執行:celery worker -A celery_task -l info -P eventlet

七、Django中使用Celery

在項目目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a',encoding='utf-8') as f:
        f.write('a')
    print(a+b)

視圖函數views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from django_celery import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
相關文章
相關標籤/搜索