Celery的經常使用知識

什麼是Clelery

  Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統。專一於實時處理的異步任務隊列。同時也支持任務調度。
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。redis

** 消息中間件 **
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等架構

** 任務執行單元 **
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。併發

** 任務結果存儲 **
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等app

使用場景

異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等異步

定時任務:定時執行某件事情,好比天天數據統計async

Celery的安裝配置

pip install celery分佈式

消息中間件:RabbitMQ/Redis加密

app=Celery('任務名',backend='xxx',broker='xxx').net

實例分析

基本使用
建立項目celerytestcode

建立py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

建立py文件:add_task.py,添加任務

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

建立py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info
run.py

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

建立py文件:result.py,查看任務執行結果

```
from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
result = async.get()
print(result)
# result.forget() # 將結果刪除
elif async.failed():
print('執行失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常後正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
```執行 add_task.py,添加任務,並獲取任務ID

執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀態並獲取結果

更進一步的請參考下面參考文檔

參考文檔

相關文章
相關標籤/搜索