celery 學習筆記 01-介紹

celery 學習筆記 01-介紹

celery 是 python 中的經常使用的任務隊列框架,常常用於異步調用、後臺任務等工做。celery 自己以 python 寫,但協議可在不一樣的語言中實現,其它語言也能夠用 celery 執行相應的任務。在 web 應用,爲提升系統響應速度,發送郵件、數據整理等須要長時間執行的任務,一般以異步任務的方式執行,這時就須要用到像 celery 類的框架。另外一種常見的場景是大型系統的分佈式處理,爲了提高系統性能,各個組件一般以多個實例運行不一樣主機上,而組件之間的調用就須要用到 celery 這樣的框架。使用 celery (或消息隊列),有助於下降系統組件之間的耦合,有助於實現灰度發佈、實現服務的分佈式、實現水平擴展,最終提高系統健壯性和處理性能。python

celery (和相似框架)的核心是任務隊列。用戶發起任務,celery 負責把任務排隊和整理,而後交到任務執行器 worker 中。 worker 監視任務隊列,獲取新任務並執行。在 celery 內部,以消息機制協調各個組件工做,消息須要藉助一箇中間人 broker 進行,以下 ::web

client → celery task → broker → celery worker
    ↑                                   ↓
    ←       ←        ←        ← result backend

client 發起任務時,通常是以異步方式(除非必要的同步 rpc ),得到一個任務的 id 並保存下來,後續可經過 id 到 result backend 中查詢任務執行結果。broker 是第三方組件,可以使用消息隊列( rabbitmq 等)、redis、數據庫等,只要能實現消息的存儲和分發理論上都能使用。 worker 以線程或進程的形式運行,從 broker 中取任務執行,而後把結果保存到 result backend 。redis

目前 rabbitmq 的 broker 實現的功能最完備,在開發環境中也可使用 sqlite 等比較方便的方式,但性能會不好,不能用在生產環境上。sql

另外須要注意的是,因爲不一樣操做系統的進程模型的差別,celery 會在 windows 上產生一些配置方面的怪異問題。docker

celery 可直接經過 pip 安裝,在 virtualenv 下,直接運行 ::數據庫

pip install celery

再安裝 broker 所須要的驅動,例如使用 rabbitmq ,則安裝 ::json

pip install amqp

同時安裝好 rabbitmq (建議經過 docker 安裝,使用 rabbitmq:management 鏡像,可在 15672 端口查看管理控制檯)。canvas

而後使用下面的代碼示例(摘錄來自: Ask Solem. 「Celery Manual, Version 3.1「) ::windows

# hello.py
from celery import Celery

app = Celery('hello', broker='amqp://guest:guest@localhost//')

@app.task
def hello():
    return 'hello world'

if __name__ == '__main__':
    r = hello.delay()

而後,啓動 worker ::網絡

celery -A hello worker --loglevel=info

client 執行任務 ::

python hello.py

app.task 裝飾器標記一個函數爲 celery 任務,client 用 delay 方法執行時。 delay 調用 apply_async() 進行異步執行, apply_async 還可配置如隊列、countdown 等執行選項。 celery 返回一個 AsyncResult 對象,若是 result backend 配置正確,client 可暫時把對象中的任務 id 保存到數據庫,後面再經過這個 id 獲取異步執行的結果。

上面的簡單例子是沒有參數的,若是增長參數,以下 ::

# add.py
from celery import Celery

app = Celery('add', broker='amqp://guest:guest@localhost//',
             backend='db+sqlite:///celery_result.db')

@app.task
def add(x, y):
    return x+y

if __name__ == '__main__':
    r = add.delay(1, 2)
    print(r.wait())

啓動 worker ::

celery -A add worker --l info

調用 ::

python add.py

當任務結果用 amqp 保存時,結果只能取一次, 所以沒法在後續調用中查詢任務結果。這個例子用 sqlite 保存了任務執行結果,所以 client 可在 r.wait() 查詢任務的結果、任務的狀態等等不少信息,可把 r.id 保存到數據庫,而後將來查詢任務的 AsyncResult ::

r2 = app.AsyncResult(r.id)
print(r2.wait())
print(r2.successful())

add.py 中使用了兩個參數 x y ,而 celery 須要經過 broker 傳遞這兩個參數,這時須要對數據進行序列化,將 x y 對象轉換爲無結構的數據,而後 worker 接收到後再把數據還原爲 x y 對象。 celery 內置的序列化方法包括 pickle 、 json 等等,若是對象比較複雜,須要本身定義序列化方法。

若是不想當即執行任務,而是把任務傳遞到其它地方,經過 celery 的 subtask 支持。 subtask 是對 task 的調用參數和執行選項的一個封裝,如 ::

add.subtask((2,2), countdown=10)
add.s(2,2)

subtask 或 s 返回的是一個 task 的簽名(celery.canvas.Signature),它可實現工做流、偏函數等效果。subtask 支持和 task 一樣的調用方法,如 ::

s = add.s(2)    # subtask ,partial
s.delay(2)      # 發送消息開始異步執行

在 celery 工做流中組織 subtask 的方式有 group / chain / chord 等等, group 中任務併發執行,chain 中任務順序執行,chord 中進行回調。而這些組織方式自己也是 subtask ,可嵌套使用 ::

# workflow.py
from celery import Celery, group, chain

app = Celery('add', broker='amqp://guest:guest@localhost//',
             backend='db+sqlite:///celery_result.db')

@app.task
def add(x, y):
    return x+y

if __name__ == '__main__':
    g = group((add.s(i, i) for i in range(10)))
    r = g.delay()
    print(r.get())

    c = chain(add.s(1, 2) | add.s(3))
    r2 = c.delay()
    print(r2.get())

celery 的任務調用經過網絡發送任務的名字和參數,不發送任務代碼, worker 收到任務後根據任務名和參數執行相應的代碼。所以不一樣 worker 中的代碼版本不同時,會有不一樣的處理結果。若是 worker 中不能處理相應的任務名,就會報錯。

相關文章
相關標籤/搜索