Celery是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。它是一個任務隊列,專一於實時處理,同時還支持任務調度。html
所謂任務隊列,是一個邏輯上的概念,能夠將抽象中的任務發送到指定的執行任務的組件,任務隊列能夠跨線程或機器運行。redis
Celery是基於Python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery。json
1.高併發的請求任務,好比須要發送大量請求的網絡爬蟲,就可使用Celery來加速爬取。後端
2.異步任務,將耗時的操做交給Celery來完成,好比發送/接收郵件、消息推送等等。服務器
3.定時任務,須要定時運行的程序,好比天天定時執行爬蟲爬取數據。網絡
下圖是我找到的一張表示Celery架構的圖:架構
任務生產者:產生任務而且把任務提交到任務隊列的就是任務生產者。併發
任務調度Beat:Celery會根據配置文件對任務進行調配,能夠按必定時間間隔週期性地執行某些任務。app
中間人Broker:Celery使用消息進行通訊,須要中間人在客戶端和Worker之間進行傳遞,接收客戶端發送過來的任務,並將任務分配給Worker。異步
在Celery的文檔中,能夠找到官方給出的實現Broker的工具備:
名稱 | 狀態 | 監控 | 遠程控制 |
RabbitMQ | 穩定 | 是 | 是 |
Redis | 穩定 | 是 | 是 |
Amazon SQS | 穩定 | 否 | 否 |
Zookeeper | 實驗性 | 否 | 否 |
消費者Worker:Worker是執行任務的單元,在Celery任務隊列中屬於消費者。Worker會不斷地監聽隊列,一旦有任務添加進來,就會將任務取出來進行執行。Worker還能夠運行在多臺機器上,只要它們都指向同一個Broker就能夠。
結果存儲Backend:結果存儲Backend,顧名思義就是將Worker執行後獲得的結果存儲起來。Celery中有幾個內置的結果存儲可供選擇,包括SQLAlchemy / Django ORM、Redis、RabbitMQ、Mamcached等。
Celery4.0版本是支持Python2.7的最後一個版本,因此若是你還在用py2的話,可能要選擇安裝Celery3或者更早的版本。我本人用的Python版本是Python3.7,而後安裝的Celery版本是4.3。安裝的話使用pip安裝就好:
pip install celery
若是pip安裝出錯的話,能夠去這個網址進行下載。在使用pip安裝的時候會自動安裝一些相關依賴,若是這些依賴安裝出錯的話,搜一下相應版本的Wheel文件下載安裝便可。
中間件Broker我選擇使用的是Redis,這裏就不說Redis怎麼安裝了,上一篇博客中有Ubuntu下安裝Redis的介紹。
在使用Celery的時候,第一件事是要建立一個Celery實例,通常稱之爲應用,簡稱爲app。建立一個test.py,其中代碼以下:
1 from celery import Celery 2 3 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") 5 6 7 @app.task 8 def add(x, y): 9 return x + y
在建立好應用以後,就可使用Celery命令執行程序運行Worker了:
celery -A test worker -l info
運行後能夠看到以下圖:
有關可用命令行選項的完整列表,執行以下命令:
celery worker --help
要調用任務,可使用delay()方法。
該任務會返回一個AsyncResult實例,可用於查詢任務狀態、獲取任務返回值等。此時查看前面運行的服務器,會看到有以下信息:
Received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]
Task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5
在前面定義的時候,已經選擇使用Redis做爲結果後端了,因此任務執行後的結果會保存到Redis中。並且,在調用任務的時候,還能夠進行以下操做:
其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。
因爲Celery的配置信息比較多,所以通常會建立一個配置文件來保存這些配置信息,一般會命名爲celeryconfig.py。在test.py所在文件夾下新建配置文件celeryconfig.py,其中的代碼以下:
1 # broker(消息中間件來接收和發送任務消息) 2 BROKER_URL = 'redis://127.0.0.1:6379' 3 # backend(存儲worker執行的結果) 4 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379' 5 6 # 設置時間參照,不設置默認使用的UTC時間 7 CELERY_TIMEZONE = 'Asia/Shanghai' 8 # 指定任務的序列化 9 CELERY_TASK_SERIALIZER = 'json' 10 # 指定執行結果的序列化 11 CELERY_RESULT_SERIALIZER = 'json'
而後修改下test.py中的代碼:
1 from celery import Celery 2 3 4 app = Celery("test") 5 app.config_from_object("celerystudy.celeryconfig") 6 7 8 @app.task 9 def add(x, y): 10 return x + y