Celery

Celeryhtml

  Celery 官網:http://www.celeryproject.org/
  Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
  Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/redis

Celery架構django

  Celery的架構由三部分組成,消息中間件(message broker)、任務執行單元(worker)和 任務執行結果存儲(backend  -  task result store)組成。windows

消息中間件: Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等api

任務執行單元: Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。緩存

任務結果存儲: Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等架構

使用場景併發

  異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等app

  定時任務:定時執行某件事情,好比天天數據統計框架

Celery的安裝配置:  pip install celery

 

Celery執行異步任務

project
    ├── celery_task      # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery鏈接和配置相關文件,且名字必須交celery.py
    │   └── tasks.py    # 全部任務函數
    ├── add_task.py      # 添加任務
    └── get_result.py   # 獲取結果

 

不涉及django

簡單使用

celery_task/celery.py

from celery import Celery

# 經過Celery功能產生一個celery應用
broker = 'redis://127.0.0.1:6379/14'  # 任務倉庫
backend = 'redis://127.0.0.1:6379/15'  # 結果倉庫
include = ['celery_task.task1', 'celery_task.task2']  # 任務們,完成需求的函數所在的文件
app = Celery(broker=broker, backend=backend, include=include)
# 在(luffy) C:\Users\Administrator.SC-201906081554\Desktop\luffy\luffyapi\scripts\celery框架\高級使用下啓動worker: pip3 install eventlet  celery worker -A celery_task -l info -P eventlet
# 非windows  命令:celery worker -A celery_task -l info
# 手動添加任務

celery_task/task1.py

from .celery import app
@app.task
def add(n1, n2):
    print('運算數', n1, n2)
    print('運算結果: %s' % (n1 + n2))
    return n1 + n2

celery_task/task2.py

from .celery import app
@app.task
def low(n1, n2):
    print(('減法:%s') % (n1 - n2))
    return n1 - n2

add_task.py   添加任務

from celery_task import task1,task2

# 添加當即執行任務 # 使用模塊中的函數, 和celery沒有任何關係 # res
= task1.add(10,5) # print(res) # res1 = task2.low(10,15) # print(res1) # 調用celery框架方法,完成任務添加 # res = task1.add.delay(100, 150) # print(res) # res1 = task2.low.delay(88,22) # print(res1)
# 添加延遲任務
from datetime import datetime, timedelta def eta_second(second): ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) time_delay = timedelta(seconds=second) return utc_ctime + time_delay res = task2.low.apply_async(args=(200, 50), eta=eta_second(10)) print(res)

get_result.py  得到結果

from celery_task.celery import app
from celery.result import AsyncResult
# id = '00c4f3c8-cdb8-48bb-b73e-4cde9f530151'  # 失敗任務
id = 'feb1c004-e40f-45a0-9796-32e9ddde07c9'  # 成功任務
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任務失敗')
    elif async.status == 'PENDING':
        print('任務等待中被執行')
    elif async.status == 'RETRY':
        print('任務異常後正在重試')
    elif async.status == 'STARTED':
        print('任務已經開始被執行')

高級使用

celery_task/celery.py

from celery import Celery
broker = 'redis://127.0.0.1:6379/14'
backend = 'redis://127.0.0.1:6379/15'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'jump_task': {
        'task': 'celery_task.tasks.jump',  # task:任務源
        'schedule': timedelta(seconds=3),  # schedule:添加任務的時間配置,每3秒添加一次任務
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
        'args': (25, 3),  # args:執行任務所需參數
    }
}

celery_task/tasks.py

from .celery import app
@app.task
def jump(n1, n2):
    print("積: %s" % (n1 * n2))
    return n1 *n2

@app.task
def full(n1, n2):
    print("商: %s" % (n1 // n2))
    return n1 // n2

django中使用

celery_task/celery.py

非windows:  命令:celery worker -A celery_task -l info

windows:(luffy) C:\Users\Administrator.SC-201906081554\Desktop\luffy\luffyapi>

啓動worker:pip3 install eventlet     celery worker -A celery_task -l info -P eventlet

啓動beat:celery beat -A celery_task -l info     添加任務

# 若是對環境變量操做不是很透徹,將celery_task包放在項目根目錄下
# 配置django環境
import os, django
# import sys
# sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()

from celery import Celery
broker = 'redis://127.0.0.1:6379/14'  # 任務倉庫
backend = 'redis://127.0.0.1:6379/15'  # 結果倉庫
include = ['celery_task.tasks']  # 任務們,完成需求的函數所在的文件
app = Celery(broker=broker, backend=backend, include=include)

# 在高級使用文件夾下啓動worker:celery worker -A celery_task -l info -P eventlet
# 在高級使用文件夾下在另一個終端啓動beat:celery beat -A celery_task -l info
# beat也是一個socket,啓動後會根據配置文件,自動添加任務(定時任務)
# beat添加任務,worker執行任務 # 時區 app.conf.timezone
= 'Asia/Shanghai' app.conf.enable_utc = False from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'update_banner_list_task': { 'task': 'celery_task.tasks.update_banner_list', # task:任務源 'schedule': timedelta(seconds=10), # schedule:添加任務的時間配置 # 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點 'args': (), # args:執行任務所需參數 } }

celery_task/tasks.py

from .celery import app
from home.models import Banner
from settings.const import BANNER_COUNT
from home.serializers import BannerModelSerializer
from django.core.cache import cache
@app.task
def update_banner_list():
    # 獲取最新內容
    banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:BANNER_COUNT]
    # 序列化
    banner_data = BannerModelSerializer(banner_query, many=True).data
    for banner in banner_data:
        banner['image'] = 'http://127.0.0.1:8000' + banner['image']
    # 更新緩存
    cache.set('banner_list', banner_data)
    return True
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息