Celery使用

一:什麼是Celery

一:Celery介紹python

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統redis

專一於實時處理的異步任務隊列django

同時也支持任務調度windows

 

架構詳解:架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。併發

消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等app

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。異步

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等async

 

二:使用場景分佈式

異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等

定時任務:定時執行某件事情,好比天天數據統計

 

三:Celery的安裝與配置

pip install celery

消息中間件:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

 

二:Celery執行異步任務

Demo介紹:

建立項目celerytest

建立py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

建立py文件:add_task.py,添加任務

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

建立py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info  (一般使用命令開啓worker)

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

建立py文件:result.py,查看任務執行結果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

執行 add_task.py,添加任務,並獲取任務ID

執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀態並獲取結果

 

多任務結構

項目結構

pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    #  全部任務函數
    │    └── tasks2.py    #  全部任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

celery.py

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務結果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務結果:%s"%res

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
    # async.revoke(terminate=True)  # 不管如今是何時,都要終止
    # async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 當即告知celery去執行test_celery任務,並傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)

添加任務(執行send_task.py),開啓work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

 

三:Celery執行定時任務

設定時間讓celery執行一個任務

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

 

相似於contab的定時任務

多任務結構中celery.py修改以下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.tasks1.test_celery',       
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        # 每隔2秒執行一次
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': ('test2',)
    # },
}

啓動一個beat:celery beat -A celery_task -l info

啓動work執行:celery worker -A celery_task -l info -P eventlet

 

四:Django中使用Celery

安裝django-celery

在項目目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

視圖函數views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

urls.py

from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^test/', views.test),
]

 

開啓worker: python3 manage.py celery worker

相關文章
相關標籤/搜索