celery

前言:在正式學習以前,先簡單的對celery作一個介紹!redis

1.什麼是celery

  Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,專一於實時處理的異步任務隊列,同時也支持任務調度。django

2.celery架構

  Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。windows

 

3.消息中間件

  celery自己不提供消息服務,可是能夠方便你的和第三方提供的消息中間件集成,包括,RabbitMQ, Redis等等架構

4.任務執行單元

  Worker是celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。併發

5.任務結果存儲

  Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等app

1、版本支持狀況

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

2、使用場景

異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等異步

定時任務:定時執行某件事情,好比天天數據統計async

頁面靜態化:耗時操做均可以用cerely分佈式

3、Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis函數

使用

使用步驟以下

  寫一個py文件:celery_task

    1.指定broker(消息中間件),指定backend(用於結果存儲)

    2.實例化產生一個celery對象 app=Celery('任務名',backend='xxx',broker='xxx')

    3.加裝飾器綁定任務,在函數(add)上加裝飾器app.task

    4.其餘程序提交任務,先導入add,add.delay(參數1,參數2),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print,會獲得你執行的一個id

   5 啓動worker去執行任務

    6.查看任務執行的結果,根據前面的id去檢查,由於任務是異步,因此要從新寫一個py文件

啓動worker的命令

Linux:celery worker -A celery_task_s1 -l info   
windows下:celery worker -A celery_task_s1 -l info -P eventlet

查看任務的執行結果

async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=app)
    if async.successful():
        #取出它return的值
        result = async.get()
        print(result)                    

4、Celery執行異步任務

基本使用

建立項目celerytest

建立py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

建立py文件:add_task.py,添加任務

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

建立py文件:run.py,執行任務

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

或者使用命令執行

Linux:celery worker -A celery_task_s1 -l info   
windows下:celery worker -A celery_task_s1 -l info -P eventlet

建立py文件:result.py,查看任務執行結果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

執行 add_task.py,添加任務,並獲取任務ID

執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀態並獲取結果

多任務結構

  新建一個項目,項目結構以下

          pro_cel
                    ├── celery_task# celery相關文件夾
                    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
                    │   └── order_task.py    #  全部任務函數
                    │   └── user_task.py    #  全部任務函數
                    ├── check_result.py # 檢查結果
                    └── send_task.py    # 觸發任務

注意:因爲後面執行worker命令的緣由,因此文件夾裏面的名字必須叫作celery

celery.py內容

#必須叫celery,生成celery對象

from celery import  Celery
from datetime import timedelta
from celery.schedules import crontab
broker='redis://127.0.0.1:6379/0'
backend='redis://127.0.0.1:6379/1'
app=Celery('test',broker=broker,backend=backend,
           # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
           include=['celery_task.order_task',
                    'celery_task.user_task',
                    ]
           )

主要就是作的生成celery對象,有多個任務就去相應的任務下面執行具體的代碼

order_task.py內容

from celery_task.celery import app
@app.task
def order_add(x,y):
    import time
    time.sleep(1)
    return x+y

user_task.py

from celery_task.celery import app
@app.task
def user_add(x,y):
    import time
    time.sleep(1)
    return x+y

添加任務send_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add

# 當即告知celery去執行test_celery任務,並傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
    # async.revoke(terminate=True)  # 不管如今是何時,都要終止
    # async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

添加任務(執行send_task.py),開啓work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

celer執行定時任務

  設定時間讓celery去執行一個任務,好比幾點幾分幾秒去執行某個任務,添加任務的方式以下兩種,其實主要就是時間獲取的方式有兩種

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

天天的何時執行任務

app.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.order_task.order_add',
        # 每隔2秒執行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': (5,6)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.order_task.order_add',
    #     # 每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

啓動一個beat:celery beat -A celery_task -l info

啓動work執行:celery worker -A celery_task -l info -P eventlet

django中使用celery

對於多任務結構,直接拷貝過來直接使用,注意,因爲celery的任務函數不能直接調用django的環境,因此咱們須要手動添加以下幾行代碼,這樣就能夠正常使用了

import osif __name__ == "__main__":
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
    import django
    django.setup()

django-celery

安裝包

celery==3.1.25
django-celery==3.1.20

在項目目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

視圖函數views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

可是,這個要求比較高,有一個不符合就不會成功,因此咱們推薦使用第一個,就是直接將整個celery的項目文件拷貝過來使用,只須要本身作一個django環境的配置便可!

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息