html
redis
數據庫
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.htmldjango
windows
api
異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等緩存
定時任務:定時執行某件事情,好比天天數據統計app
路飛案例:框架
pip install celery
project # 通常爲項目名如:大路飛
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery鏈接和配置相關文件,且名字必須交celery.py
│ └── tasks.py # 全部任務函數
├── add_task.py # 添加任務腳本文件
└── get_result.py # 獲取結果腳本文件異步
#1.celery配置 :
# 導入模塊
from celery import Celery broker = 'redis://127.0.0.1:6379/14' # 任務庫:實際狀況選擇
backend = 'redis://127.0.0.1:6379/15' # 結果庫:實際狀況選擇
include=['celery_task.tasks'] # 任務們,tasks爲任務py文件注意路徑 # 實例產生app對象
app = Celery(broker=broker,backend=backend,include=include)
#2.任務:
# 導入app實例對象 from .celery import app # 任務1 @app.task def add(n, m): print(n) print(m) print('n+m的結果:%s' % (n + m)) return n + m # 任務2 @app.task def low(n, m): print(n) print(m) print('n-m的結果:%s' % (n - m)) return n - m
#3.啓動celey服務:啓動服務後就會去任務庫裏那任務,拿到就執行,沒有就阻塞
1.cd到celery包文件夾所在的目錄文件夾
2.啓動celery(app)服務:celery_task爲包文件名
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:celery_task爲包文件名
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
#4.添加任務
4.1添加任務方式1
4.2添加任務方式2:腳本
# 導入任務文件tasks.py from celery_task import tasks # 添加當即執行任務:任務函數.delay(參數們) t1 = tasks.add.delay(10, 20) t2 = tasks.low.delay(100, 50) print(t1.id) # 添加延遲任務:任務函數.apply_async(args=(參數), eta=延遲時間)
from datetime import datetime, timedelta
def eta_second(second):
# 獲取當前utc時間 ctime = datetime.now() utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) # 延遲多長時間執行這裏單位爲秒,實際狀況更改 time_delay = timedelta(seconds=second) return utc_ctime + time_delay #tasks.任務函數.apply_async(args=(參數們))
tasks.low.apply_async(args=(200, 50), eta=eta_second(10))
# 5.獲取結果:
# 導入實例對象 from celery_task.celery import app from celery.result import AsyncResult # 任務id,實際狀況更改 id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任務失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
# 注意cd 到project文件夾下載執行命令
# 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet # 3)添加任務:手動添加,要自定義添加任務的腳本,右鍵執行腳本 # 4)獲取結果:手動獲取,要自定義獲取任務的腳本,右鍵執行腳本
pip install celery
project # 通常爲項目名如:大路飛
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery鏈接和配置相關文件,且名字必須交celery.py
│ └── tasks.py # 全部任務函數
└── get_result.py # 獲取結果
#1.celery配置 :
# 導入模塊 from celery import Celery broker = 'redis://127.0.0.1:6379/14' # 任務庫:實際狀況選擇 backend = 'redis://127.0.0.1:6379/15' # 結果庫:實際狀況選擇 include=['celery_task.tasks'] # 任務們,注意路徑 # 實例產生app對象 app = Celery(broker=broker,backend=backend,include=include) # 時區,實際狀況選擇 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC,實際狀況選擇 app.conf.enable_utc = False # 任務的定時配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { # 任意任務名 'low-task': { 'task': 'celery_task.tasks.low', # 任務源,注意路徑 'schedule': timedelta(seconds=3), # 3秒加載執行一次 # 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點 'args': (300, 150), # 任務的參數 } }
# 2.獲取結果:
from celery_task.celery import app from celery.result import AsyncResult id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任務失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
#3.啓動服務和添加任務
# 注意cd 到project文件夾下載執行命令 1.worker cmd窗口 # 非windows # 命令:celery worker -A celery_task -l info # windows: # pip3 install eventlet # celery worker -A celery_task -l info -P eventlet 2. 添加任務cmd窗口 # 3)添加任務:自動添加任務,因此要啓動一個添加任務的服務 # 命令:celery beat -A celery_task -l info 3.獲取結果:手動獲取,要自定義獲取任務的腳本,右鍵執行腳本
#2.celery配置:celery.py
# django環境配置 # 開啓django支持 import os, django # 若是包文件不在根目錄下根據實際狀況將大路飛添加環境便令 #import sys # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev') django.setup() # 導入模塊 from celery import Celery broker = 'redis://127.0.0.1:6379/14' # 任務庫 backend = 'redis://127.0.0.1:6379/15' # 結果庫 include = ['celery_task.tasks'] # 實例產生app對象 app = Celery(broker=broker,backend=backend,include=include) # 時區 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任務的定時配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'update_banner_list_task': { 'task': 'celery_task.tasks.update_banner_list', 'schedule': timedelta(seconds=10), # 'schedule': crontab(hour=8, day_of_week=1), # 每週一早八點 'args': (), } }
#3.celery任務:tasks.py
# 導入app對象 from .celery import app # 導入home應用的模型表 from home.models import Banner # 導入設置中的輪播數量 from settings.const import BANNER_COUNT # 導入home應用的序列化 from home.serializers import BannerModelSerializer from django.core.cache import cache @app.task def update_banner_list(): banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:BANNER_COUNT] # 序列化 banner_data = BannerModelSerializer(banner_query,many=True).data for banner in banner_data: banner['image']= 'http://127.0.0.1:8000'+ banner['image'] # 存入緩存數據庫 cache.set('banner_list',banner_data) return True
#4.啓動服務和添加任務
# 注意cd 到project文件夾下載執行命令如大路飛
1.worker cmd窗口
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
2. 添加任務cmd窗口 # 3)添加任務:自動添加任務,因此要啓動一個添加任務的服務 # 命令:celery beat -A celery_task -l info