python manage.py celery -A TSDRM work -l info
python manage.py celery -A TSDRM flower -l info
Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統。專一於實時處理的異步任務隊列,同時也支持定時任務python
消息中間件(message broker):redis
任務執行單元(worker):數據庫
任務執行結果存儲(task result store):django
user是提交任務的人,user把任務提交到消息中間件broker中,好比用戶提交個3+5的任務,這個程序只負責提交,另外一個程序負責執行。windows
workers負責執行代碼,在消息中間件取一個一個的任務去執行,執行的結果須要存儲在task result store中,用戶user只須要去存儲結果的task result store中看一下就知道這個任務有沒有執行。架構
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等併發
任務執行單元:Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。app
任務結果存儲:Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等異步
Celery version 4.0 runs on Python ❨2.7, 3.4, 3.5❩ PyPy ❨5.4, 5.5❩ This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required. If you’re running an older version of Python, you need to be running an older version of Celery: Python 2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows.
Please don’t open any issues related to that platform.
場景1:用戶提交一個任務,把視頻轉成MP4格式。傳上去以後轉格式須要耗費很長時間。因此就能夠把任務提交到消息中間件裏,只返回一個消息告訴用戶,視頻正在轉,你該幹嗎就幹嗎去。開啓workes執行傳視頻的任務,視頻轉完以後,把結果放在結果存儲store中。而後用戶再發請求去結果存儲中看一下視頻有沒有轉好。async
場景2:註冊發郵件。一個用戶一旦註冊了以後。把發郵件的任務提交到消息中間件裏。異步調用發郵件發郵件。因此當用戶註冊成功以後,後臺有郵件正在發。
場景3:秒殺商品,把秒殺的任務提交到任務隊列中,起個worker去執行。
一點擊搶購,顯示你正在排隊中。正常的邏輯是,點擊搶購,會到數據庫裏查看是否還有此商品,有的話,數據庫商品就減一,生成一個訂單返回給用戶,用戶進行支付。可是,你有沒有想過,秒殺的時候,一瞬間來了好多好多人,這時候就會考慮到程序運行效率問題,解決方法是會在秒殺商品的時候提早把數據放在redis中,不在直接操做數據庫而是redis。若是程序還不行的時候,就會對任務作緩衝,來我的把任務提交上去,來我的把任務提交上去,後面開啓多個workers來執行任務,這就至關於把整個任務作成異步的,任務提交完成立馬返回,你正在排隊。
場景4:利用Celery能夠作頁面靜態化。首頁常常被訪問,每次都要查數據庫很是耗時,能夠把首頁作成靜態頁面,數據是寫死的,誰過來訪問,就能看到首頁。
pip3 install celery
pip3 install redis
from celery import Celery # 指定broker(消息中間件),指定backend(結果儲存) backend = 'redis://127.0.0.1:6379/0' broker = 'redis://127.0.0.1:6379/1' # 0,1指定的是redis中的庫
# 實例化產生一個Celery對象,必定要指定一個名字 app = Celery('test', broker=broker, backend=backend) # 指定任務,就是一個函數 # 須要用一個裝飾器綁定任務,表示該任務是被celery管理的,而且能夠用celery執行的 @app.task def add(x, y): import time time.sleep(3) return x+y
# 用於提交任務的py文件 import celery_task_s1 # res = celery_task_s1.add(3, 7) # 正常同步執行任務 # print(res) # 打印結果爲 10 # 提交任務到消息中間件中 res = celery_task_s1.add.delay(3, 7) # 只是把任務提交到消息中間件中,並無執行 print(res) # 6e6464e8-3b0a-49f9-ad92-824725c2a573 任務的id號
注意:windows下啓動須要安裝依賴
pip3 install eventlet
from celery.result import AsyncResult from celery_task_s1 import app async = AsyncResult(id="6e6464e8-3b0a-49f9-ad92-824725c2a573", app=app) # id是worker執行任務後拿到的id if async.successful(): # 取出它return的值 result = async.get() print(result) # 執行成功,打印的結果爲10 # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
celery_mul_Demo ├── celery_task # celery相關文件夾 │ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字 │ └── order_task.py # 全部任務函數 │ └── user_task.py # 全部任務函數 ├── add_task.py # 提交任務 └── celery_result.py # 查看結果
新建py文件:celery.py(必須叫這個名字)
# 這個文件必須叫celery,生成celery對象 from celery import Celery broker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/1' app = Celery('test', broker=broker, backend=backend, # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類 include=['celery_task.order_task', 'celery_task.user_task', ] )
新建py文件:order_task.py
from celery_task.celery import app @app.task def order_add(x, y): import time time.sleep(3) return x+y
新建py文件:user_task.py
from celery_task.celery import app @app.task def user_add(x, y): import time time.sleep(3) return x+y
from celery_task.order_task import order_add from celery_task.user_task import user_add # res = order_add.delay(10, 20) # 打印結果爲 26a025a6-f910-4e71-8173-c119c665df84 res = user_add.delay(100, 200) # 打印結果爲 c67e5176-3972-49c2-9e13-13f7d6eea4af print(res)
等待提交任務
提交任務order_task
提交任務user_task
from celery.result import AsyncResult from celery_task.celery import app async = AsyncResult(id="c67e5176-3972-49c2-9e13-13f7d6eea4af", app=app) # id是worker執行user_task任務後拿到的id if async.successful(): # 取出它return的值 result = async.get() print(result) # 執行成功,打印的結果爲300 # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
from celery import Celery # 指定broker(消息中間件),指定backend(結果儲存) backend = 'redis://127.0.0.1:6379/0' broker = 'redis://127.0.0.1:6379/1' # 0,1指定的是redis中的庫 # 實例化產生一個Celery對象,必定要指定一個名字 app = Celery('test', broker=broker, backend=backend) # 指定任務,就是一個函數 # 須要用一個裝飾器綁定任務,表示該任務是被celery管理的,而且能夠用celery執行的 @app.task def add(x, y): import time time.sleep(3) return x+y
# 用於提交任務的py文件 import celery_task_s1 # 執行定時任務:指定時間執行add_task任務 from datetime import datetime v1 = datetime(2019, 7, 12, 20, 59, 56) # 指定的幾時幾點幾分執行任務 print(v1) # 2019-07-12 20:59:56 v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) # 2019-07-12 12:59:56 # 取出要執行任務的時間對象,調用apply_async方法,args是參數,eta是執行的時間 result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2) print(result.id) # e28be60b-1546-4b92-987f-dfbf357e623f
# 這個文件必須叫celery,生成celery對象 from celery import Celery broker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/1' app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', ] ) from datetime import timedelta app.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.order_task.order_add', # 每隔3秒執行一次 # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), 'schedule': timedelta(seconds=3), # 傳遞參數 'args': (5, 6) }, # 每一年什麼時間執行什麼 # 'add-every-12-seconds': { # 'task': 'celery_task.order_task.order_add', # # 每一年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': (16, 16) # }, }
新建py文件:order_task.py
from celery_task.celery import app @app.task def order_add(x, y): import time time.sleep(3) return x+y
from celery_task.user_task import user_add def index(request): res = user_add.delay(3, 7) return HttpResponse(res.id) def check_result(request): res = request.GET.get('id') from celery.result import AsyncResult from celery_task.celery import app async = AsyncResult(id=res, app=app) # id是worker執行任務後拿到的id if async.successful(): # 取出它return的值 res = async.get() print(res) return HttpResponse('ok')
url(r'^index/', views.index), url(r'^check/', views.check_result),
import os if __name__ == '__main__': os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings") import django django.setup() from app01 import models books = models.Book.objects.all() print(books)