本文對Celery進行了研究,因爲其實現相對比較複雜沒有足夠的時間和精力對各方各面的源碼進行分析,所以本文根據Celery的使用方法以及實際行爲分析其運行原理,並根據查閱相關代碼進行了必定程度的驗證。
但願本文能有助於讀者理解celery是如何工做的,從而可以更好地使用這個任務框架,而不只僅是複製官網上的例子來配置。python
Celery是Python中任務隊列的事實標準。其特色在於:redis
下面咱們結合Celery的基本使用來分析一下Celery是怎麼工做的。本文以Python2爲例。django
首先,咱們須要定義咱們的Celery進程訪問哪一個Redis進程(假設咱們使用Redis做爲message backend,在celery的術語中叫作broker)。
Celery提供的方式是建立一個celery instance。咱們假設文件目錄以下:服務器
lab - play - __init__.py - celery.py - tasks.py
而後建立lab/play/celery.py
文件:app
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('play', broker='redis://127.0.0.1:6379', include=['play.tasks']) if __name__ == '__main__': app.start()
因爲可能會有多個celery進程訪問同一個redis,爲了讓它們之間隔離開就須要給每一個celery實例一個名字,咱們這裏就叫play
。
除了name和broker參數之外,還使用了include
參數來告訴全部的works到哪裏去import tasks的代碼,由於workers纔是真正執行全部這些任務的單位。框架
好了,接下來就能夠啓動celery了。在lab
目錄下執行:函數
celery -A play.celery worker -l info
便可啓動celery進程。Python的路徑和模塊系統仍是比較複雜的,所以在指定包名的時候要注意。code
除了使用celery命令之外,因爲咱們再celery.py中已經加了
if __name__ == '__main__':
部分代碼,所以也能夠在lab
下直接執行:
python -m play.celery -A play.celery worker -l info
對象
在啓動了celery之後,celery進程監聽redis消息,並fork出多個worker進程準備將監聽到的消息分發給它們執行。隊列
如今執行的部分有了,咱們開始定義真正須要執行的部分。
咱們能夠專門寫一個文件來存聽任務代碼(也能夠直接寫在celery.py裏面):
# lab/play/tasks.py from __future__ import absolute_import, unicode_literals import time from celery import Celery app = Celery('play', broker='redis://127.0.0.1:6379') @app.task def say_hi(): print 'hi!'
使用另外一個Python進程(也可使用交互式python或者ipython),在lab
下執行:
>>> from play.tasks import say_hi >>> say_hi.delay() >>> <AsyncResult: db6737ba-ecee-4fd2-8227-a76c594ba338> >>>
結果就是say_hi
函數向消息隊列中發出了一個調用請求由某個worker執行。Celery進程會輸出:
[2017-09-03 13:49:57,340: INFO/MainProcess] Received task: play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192] [2017-09-03 13:49:57,343: WARNING/ForkPoolWorker-1] hi! [2017-09-03 13:49:57,344: INFO/ForkPoolWorker-1] Task play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192] succeeded in 0.0016004400095s: None
如今咱們來分析一下tasks.py這個文件。很奇怪的一點是,一上來咱們又建立了一個app實例。當咱們import了task文件後會不會又建立了一個celery進程呢?答案是不會的,由於只有調用了app.start()纔會啓動。這隻有手動調用或者藉助celery命令執行後纔會發生。若是隻是new了一個instance出來,至關於建立了一個配置文件,不會發生任何重要的實質性的操做。
可是這個app對象也不是什麼都不幹的。接下來咱們定義了兩個task函數,並將這個兩個函數使用@app.task
包裝了起來。這樣的效果是把這兩個普通函數包裝成了celery的task對象,這樣他們就有了delay
方法。當咱們執行delay
方法時,這些task會找本身所屬的那個celery instance,從中獲取配置信息(主要是broker的地址)後將調用請求發往消息隊列。
不過,這樣定義task的方法並非很好,由於須要在代碼中就顯式將task函數和一個具體的celery instance綁定了起來。這就使得咱們沒法複用這些tasks。所以咱們可使用celery的另外一種定義tasks的方式來重寫咱們現有的代碼(這也是推薦給django使用的方案):
from __future__ import absolute_import, unicode_literals import time from celery import shared_task @shared_task def say_hi(): print 'hi!'
這裏咱們再也不建立app實例,而是直接使用@shared_task
來包裝。這樣就沒有綁定哪一個app的問題了。可是正如咱們以前所說,在調用tasks的時候,task仍是會去尋找本身屬於哪一個celery instance從而獲取配置信息。若是你都不綁定app instance,配置信息哪裏來呢?
答案是,tasks和celery instance之間仍然具備綁定或關聯的關係,只不過再也不是顯式的了。簡單來講,每一個celery instance被建立之後,它就會被自動的註冊到某個全局的位置。當一個shared task被執行時,這個task就會本身去這個全局的位置找有哪些celery instances能夠從中獲取配置信息。若是有多個celery instance都註冊了,那麼可能它們的消息隊列都會被這個task發消息(沒有確認過,只是猜想。但這可能就是shared_task的來源)。這就意味着,只要在咱們Python進程的任何一個地方(對Django服務器進程也是如此),只要隨便哪一個地方建立一個celery instance就能夠,而後只要import tasks而後使用delay執行便可。這樣就解決了celery tasks複用的問題。代碼之間的耦合也更小。
更進一步,在咱們的python進程中,甚至都不用再手寫一遍celery instance的建立調用。直接import play.celery 就能夠了,這個文件雖然被celery進程用做了配置文件,但這不妨礙咱們在本身的進程中也用這個文件。不如說這是更好的一種解決方案。