分佈式任務隊列--Celery的學習筆記

1、Celery簡介

  Celery是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。它是一個任務隊列,專一於實時處理,同時還支持任務調度。html

  所謂任務隊列,是一個邏輯上的概念,能夠將抽象中的任務發送到指定的執行任務的組件,任務隊列能夠跨線程或機器運行。redis

  Celery是基於Python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery。json

 

2、Celery使用場景

  1.高併發的請求任務,好比須要發送大量請求的網絡爬蟲,就可使用Celery來加速爬取。後端

  2.異步任務,將耗時的操做交給Celery來完成,好比發送/接收郵件、消息推送等等。服務器

  3.定時任務,須要定時運行的程序,好比天天定時執行爬蟲爬取數據。網絡

 

3、Celery架構

  下圖是我找到的一張表示Celery架構的圖:架構

  

  任務生產者:產生任務而且把任務提交到任務隊列的就是任務生產者。併發

  任務調度Beat:Celery會根據配置文件對任務進行調配,能夠按必定時間間隔週期性地執行某些任務。app

  中間人Broker:Celery使用消息進行通訊,須要中間人在客戶端和Worker之間進行傳遞,接收客戶端發送過來的任務,並將任務分配給Worker。異步

  在Celery的文檔中,能夠找到官方給出的實現Broker的工具備:

名稱 狀態 監控 遠程控制
RabbitMQ 穩定
Redis 穩定
Amazon SQS 穩定
Zookeeper 實驗性

  消費者Worker:Worker是執行任務的單元,在Celery任務隊列中屬於消費者。Worker會不斷地監聽隊列,一旦有任務添加進來,就會將任務取出來進行執行。Worker還能夠運行在多臺機器上,只要它們都指向同一個Broker就能夠。

  結果存儲Backend:結果存儲Backend,顧名思義就是將Worker執行後獲得的結果存儲起來。Celery中有幾個內置的結果存儲可供選擇,包括SQLAlchemy / Django ORM、Redis、RabbitMQ、Mamcached等。

 

4、Celery安裝

     Celery4.0版本是支持Python2.7的最後一個版本,因此若是你還在用py2的話,可能要選擇安裝Celery3或者更早的版本。我本人用的Python版本是Python3.7,而後安裝的Celery版本是4.3。安裝的話使用pip安裝就好:

pip install celery

  若是pip安裝出錯的話,能夠去這個網址進行下載。在使用pip安裝的時候會自動安裝一些相關依賴,若是這些依賴安裝出錯的話,搜一下相應版本的Wheel文件下載安裝便可。

  中間件Broker我選擇使用的是Redis,這裏就不說Redis怎麼安裝了,上一篇博客中有Ubuntu下安裝Redis的介紹。

 

5、Celery使用示例

 1.應用

   在使用Celery的時候,第一件事是要建立一個Celery實例,通常稱之爲應用,簡稱爲app。建立一個test.py,其中代碼以下:

1 from celery import Celery
2 
3 
4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")
5 
6 
7 @app.task
8 def add(x, y):
9     return x + y

2.運行Celery服務器

  在建立好應用以後,就可使用Celery命令執行程序運行Worker了:

celery -A test worker -l info

  運行後能夠看到以下圖:  

  

  有關可用命令行選項的完整列表,執行以下命令:

celery worker --help

3.調用任務

  要調用任務,可使用delay()方法。

  

  該任務會返回一個AsyncResult實例,可用於查詢任務狀態、獲取任務返回值等。此時查看前面運行的服務器,會看到有以下信息:

Received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]

Task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5

4.查看結果

  在前面定義的時候,已經選擇使用Redis做爲結果後端了,因此任務執行後的結果會保存到Redis中。並且,在調用任務的時候,還能夠進行以下操做:

  

  其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。

 5.配置文件

  因爲Celery的配置信息比較多,所以通常會建立一個配置文件來保存這些配置信息,一般會命名爲celeryconfig.py。在test.py所在文件夾下新建配置文件celeryconfig.py,其中的代碼以下:

 1 # broker(消息中間件來接收和發送任務消息)
 2 BROKER_URL = 'redis://127.0.0.1:6379'
 3 # backend(存儲worker執行的結果)
 4 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
 5 
 6 # 設置時間參照,不設置默認使用的UTC時間
 7 CELERY_TIMEZONE = 'Asia/Shanghai'
 8 # 指定任務的序列化
 9 CELERY_TASK_SERIALIZER = 'json'
10 # 指定執行結果的序列化
11 CELERY_RESULT_SERIALIZER = 'json'

  而後修改下test.py中的代碼:

 1 from celery import Celery
 2 
 3 
 4 app = Celery("test")
 5 app.config_from_object("celerystudy.celeryconfig")
 6 
 7 
 8 @app.task
 9 def add(x, y):
10     return x + y
相關文章
相關標籤/搜索