轉載:異步處理 celery

1.什麼是Celery?

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統前端

專一於實時處理的異步任務隊列,同時也支持任務調度linux

2.Celery架構

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。redis

 

消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等數據庫

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。windows

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等cookie

執行流程:架構

user至關於提交任務的人,提交給broker也就是消息中間件,worker至關於工人,broker裏面有了用戶也就是程序提交的任務,worker就去取出來執行相似於生產者消費者模型,store說簡單點就是worker執行結束後的返回結果併發

 

版本支持狀況

複製代碼
Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
複製代碼

 

 注:Celery不支持windows但並非不可使用能夠用第三方模塊來完成,可是windows上使用出了問題官方不會提供幫助app

3.Celery的安裝配置

pip install celery異步

消息中間件:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

4.測試案例

 

複製代碼
# 先導入
from celery import Celery

# 下面是配置信息,這裏使用redis爲列
# broker='redis://127.0.0.1:6379/2' 不加密碼
# 消息中間件
backend = 'redis://:lmdxxx@139.196.**.**:6380/7'
# 處理結果
broker = 'redis://:lmdxxx@139.196.**.**:6380/8'
# 注:redis有密碼的狀況下前面加@輸入密碼便可,最後面的是指定儲存在redis的那個庫中
# 實列話產生一個對celery象,一個項目中可能會用到多個Celery # 第一個參數是當前任務的名字,必定要寫 APP = Celery('test', broker=broker, backend=backend)
複製代碼

 

1.首先沒得說了確定要先導入celery,上文說了selery有消息中間件,處理者,結果存儲,這裏使用redis來做爲測試

2.配置消息中間件,配置結果儲存位置

APP = Celery('test', broker=broker, backend=backend)
# 實列話產生一個對celery象,一個項目中可能會用到多個Celery,因此在第一個參數中傳入指定的名稱,不能夠重複
# 對象名稱無所謂

3.建立一個selery任務

複製代碼
# 任務其實就是一個函數
# 須要用一個裝飾器去裝飾才能說明這是一個被celery管理的任務,且能夠用celery執行
# 裝飾器實際就是實列化出來的那個對象,裏面的一個固定方法
@APP.task
def add(x, y):
    import time
    time.sleep(2)
    return x + y
複製代碼

 4.建立一個用於提交任務的

正常同步提交任務

提交任務不執行,有worker纔會去執行

注:獲得的這個ID其實咱們工做中能夠把它set到cookie中,用戶就能夠經過輪詢去查詢redis數據庫中去尋找結果

也能夠直接return返回給前端,給前端處理

 

返回的對應的是存放redis中間件裏面的提交任務屬性的ID用於查詢返回結果,看不懂不要緊

5.任務提交任務後須要建立工人執行任務

建立py文件:run.py,執行任務,或者使用命令執行(win不可使用):celery worker -A celery_task_cs -l info (celery_test_cs是建立任務的那個名字 -l info是打印的日誌級別)

windows下:celery worker -A celery_test_cs -l info -P eventlet

win安裝:pip install eventlet

代碼執行一般不用:

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

提交任務的時候注意導入方式,有時候導入方式問題會產生報錯

啓動後以下:

收到任務後分配任務切返回執行信息 7 就是咱們的執行結果 前面的就是執行的時間 這個是info級別的日誌打印的

 

redis 存放結果數據以下,執行狀態 結果 之類都在裏面

6.查看結果

 

流程梳理:

celery的使用
1.先安裝 pip install celery
2.寫一個py文件:celery_task
3.指定broker(消息中間件),指定backend(結果存儲)
4.實例化產生一個Celery對象 app=Celery('名字',broker,backend)
5.加裝飾器綁定任務,在函數(add)上加裝飾器app.task
6.其餘程序提交任務,先導入add,add.delay(參數,參數),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print會打印出任務的id,之後用id去查詢任務是否執行完成
7.啓動worker去執行任務:
linux:celery worker -A 建立的任務的那個py文件 -l info
windows下:celery worker 建立任務的那個py文件 -A -l info -P eventlet
8.查看結果:根據id去查詢

 

應用場景

異步任務:將耗時操做任務提交給Celery去異步執行,好比生成圖表,發送短信/郵件、消息推送、音視頻處理等等

定時任務:定時執行某件事情,好比天天數據統計

相關文章
相關標籤/搜索