Celery簡介

一. 什麼是Clelery

1. 簡介

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統
專一於實時處理的異步任務隊列
同時也支持任務調度

2. Celery架構

1551270322137

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。前端

  • 消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等python

  • 任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。redis

  • 任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等django

3. 版本支持狀況

# 1.新版本支持到3.5
Celery version 4.0 runs on
    Python ❨2.7, 3.4, 3.5❩
    PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

# 2.以後開發的版本不在支持2.7 
    If you’re running an older version of Python, you need to be running an older version of Celery:
    Python 2.6: Celery series 3.1 or earlier.
    Python 2.5: Celery series 3.0 or earlier.
    Python 2.4 was Celery series 2.2 or earlier.

# 3.對於window支持的不夠好
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

4. 消息隊列

簡介

博客地址: https://www.jianshu.com/p/689ce4205021windows

消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題。實現高性能、高可用、可伸縮和最終一致性架構。是大型分佈式系統不可缺乏的中間件。
目前在生產環境,使用較多的消息隊列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

做用

  • 異步的提交任務
  • 對某些業務進行了解耦合(例如訂單系統和庫存系統)
  • 流量削峯:秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。

二. 使用場景

異步任務: 將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等架構

__定時任務:__定時執行某件事情,好比天天數據統計併發

三. Celery的安裝配置

pip install celery
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')

四. Celery執行異步任務

1. 基本使用

1)任務的建立

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

2)任務的提交

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

3)任務的執行

celery worker -A celery_app_task -l info
注:windows下:celery worker -A celery_app_task -l info -P eventlet
from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

4)結果的查看

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

2. 多任務結構

1)架構圖

pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    #  全部任務函數
    │   └── tasks2.py    #  全部任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

2)celery.py文件的編寫

from celery import Celery
cel = Celery('celery_task',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             include=['celery_task.tasks1',
                      'celery_task.tasks2'])
cel.conf.timezone = 'Asia/Shanghai' # 時區
cel.conf.enable_utc = False # 是否使用UTC

3)任務的編寫

# task1的文件
import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務結果:%s"%res


# task2文件
import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務結果:%s"%res

3)check_result.py結果檢測

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

# result.forget() # 將結果刪除,執行完成,結果不會自動刪除
# async.revoke(terminate=True)  # 不管如今是何時,都要終止
# async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。

4) send_task.py任務發送到隊列

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 當即告知celery去執行test_celery任務,並傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)

5)啓動異步任務框架

添加任務(執行send_task.py),
開啓work:celery worker -A celery_task -l info  -P  eventlet
檢查任務執行結果(執行check_result.py)

五. Celery執行定時任務

1. 延遲任務

1)方式一

from celery_app_task import add
from datetime import datetime

v1 = datetime(2019, 2, 13, 18, 19, 56)   # 建立一個時間對象
v2 = datetime.utcfromtimestamp(v1.timestamp())
result = add.apply_async(args=[1, 3], eta=v1)  # 若是args沒有參數,就寫成空列表,不能不寫

2)方式二

from datetime import timedelta, datetime

ctime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 默認用utc時間
task_time = utc_ctime + timedelta(seconds=10)

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)

2. 定時任務

1)編寫

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('celery_task', 
             broker='redis://127.0.0.1:6379/1', 
             backend='redis://127.0.0.1:6379/2', 
             include=['celery_task.tasks1',
                      'celery_task.tasks2',])

cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名,每10秒發送一個任務到隊列中
    'add-every-10-seconds': {
        # 必需要註冊到include中
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': ['test']
    },
    
    # 固定某一天去執行任務
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

2)啓動

啓動一個beat:
    celery beat -A celery_task -l info
啓動work執行:
    celery worker -A celery_task -l info -P  eventlet

六. Django中使用Celery

1. 安裝包

# 在windows下面固定的版本
celery==3.1.25
django-celery==3.1.20

2. 在項目目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30

3. 在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

4. 視圖函數views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

5. settings.py下的配置

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
相關文章
相關標籤/搜索