celery(芹菜)是一個異步任務隊列/基於分佈式消息傳遞的做業隊列。 php
Brokers: 提供隊列服務,Celery支持的Brokers有: node
Worker: 真正幹活的,實際運行任務的節點。 web
在你正式開始使用 Celery 以前,你須要選擇、安裝並運行一個 broker。 redis
Broker 是一種負責接收、發送任務消息(task messages)的服務 sql
你能夠從如下幾種 broker 中選擇一個: 數據庫
功能完整、安全、飽經實踐檢驗的 broker。若是對你而言,不丟失消息很是重要,RabbitMQ 將是你最好的選擇。 django
請查看 [[/技術分享/Celery/broker-installation]] 以便得到更多關於 RabbitMQ 安裝和配置相關的信息。 後端
也是一個功能完整的 broker,但電源故障或異常將致使數據丟失。 安全
請查看 [[/技術分享/Celery/otherqueues-redis]] 以便配置相關的信息。 ruby
不推薦使用數據庫做爲消息隊列,但在很是小的應用中可使用。Celery 可使用 SQLAlchemy 和 Django ORMS。請查看 [[/技術分享/Celery/otherqueues-sqlalchemy]] 或 [[/技術分享/Celery/otherqueues-django]]。
除了上面列出的之外,還有其餘能夠選擇的傳輸實現,例如 CouchDB, Beanstalk, MongoDB, and SQS。請查閱 Kombu 的文檔以便得到更多信息。
#安裝celery $ pip install celery #安裝時區的模塊,否則會有時間慢8小時的問題 $ pip install pytz
在這個教程裏,咱們將建立一個簡單的「任務」(Task) —— 把兩個數加起來。一般,咱們在 Python 的模塊中定義「任務」。
按照慣例,咱們將調用模塊 file:tasks.py,看起來會像這個樣子:
file:tasks.py
from celery.task import task @task def add(x, y): return x + y
此時, @task 裝飾器實際上建立了一個繼承自 :class:~celery.task.base.Task 的「類」(class)。除非須要修改「任務類」的缺省行爲,不然咱們推薦只經過裝飾器定義「任務」(這是咱們推崇的最佳實踐)。
seealso: 關於建立任務和任務類的完整文檔能夠在 ../userguide/tasks 中找到。
Celery 使用一個配置模塊來進行配置。這個模塊缺省北命名爲 :file:celeryconfig.py。
爲了能被 import,這個配置模塊要麼存在於當前目錄,要麼包含在 Python 路徑中。
同時,你能夠經過使用環境變量 CELERY_CONFIG_MODULE 來隨意修改這個配置文件的名字。
如今來讓咱們建立配置文件 celeryconfig.py.
AMQP 後端缺省是非持久化的,你只能取一次結果(一條消息)。
能夠閱讀 :ref:conf-result-backend 瞭解可使用的後端清單和相關參數。
咱們只有一個剛開始添加的任務模塊 :file:tasks.py::
CELERY_IMPORTS = ("tasks", )
這就好了。
你還有更多的選項可使用,例如:你指望使用多少個進程來並行處理(:setting:CELERY_CONCURRENCY 設置),或者使用持久化的結果保存後端。能夠閱讀 :ref:configuration 查看更多的選項。
note:
你能夠也使用 $ celery -A tasks worker --loglevel=info
爲了方便測試,咱們將在前臺運行 worker 服務器,這樣咱們就能在終端上看到 celery 上發生的事情:
$ celeryd --loglevel=INFO
在生產環境中,也許你但願將 worker 在後臺以守護進程的方式運行。若是你但願這麼作,你能夠利用平臺或者相似於 supervisord_ (查閱 :ref:daemonizing 以得到更多信息) 的工具來實現。
能夠經過下列命令行得到完整的命令參數清單:
$ celeryd --help
supervisord: [[http://supervisord.org]]
咱們經過調用 class 類的 ~celery.task.base.Task.delay 方法執行任務。
~celery.task.base.Task.apply_async 方法一個很是方便的方法,經過這個方法咱們能夠充分控制控制任務執行的參數(參見 :ref:guide-executing)。
>>> from tasks import add >>> add.delay(4, 4) <AsyncResult: 889143a6-39a2-4e52-837b-d80d33efb22d>
此時,任務已經被髮送到了消息 broker。直到有 worker 服務器取走並執行了這個任務,不然 Broker 將一直保存這個消息。
如今咱們可使用任務返回類 ~celery.result.AsyncResult 來查看 worker 的日誌,看看到底發生了什麼。若是你配置了一個結果存儲類 ~celery.result.AsyncResult 來保存任務狀態,任務執行完畢可得到返回值;任務執行失敗則可得到異常/回調等信息。
#add 爲任務名 $ celery call add
若是你想跟蹤任務狀態,Celery 就須要把這些狀態信息發送並存儲到某處。你可使用多種後端來達到這個目的: SQLAlchemy/Django ORM, Memcached, Redis,AMQP, MongoDB, Tokyo Tyrant 和 Redis —— 甚至你能夠定義你本身的。
在這裏,咱們將使用 amqp 做爲保存結果的後端來存儲狀態消息。能夠經過 {{{CELERY_RESULT_BACKEND}}} 來修改後端選項,你能夠用於保存結果後端的選項有:
CELERY_RESULT_BACKEND = "amqp" #: 咱們但願結果最多保存 5 分鐘。 #: 注意,這個特性須要 2.1.1 以上版本 RabbitMQ 才能支持。 #: 若是你使用的是早期版本的 RabbitMQ,請註釋下面這行。 CELERY_TASK_RESULT_EXPIRES = 300
請經過閱讀 Celery/任務結果後端 得到更多相關配置的信息。
如今,咱們配置好了保存結果的後端,讓咱們從新來執行任務。咱們再次使用類 ~celery.result.AsyncResult:
>>> result = add.delay(4, 4)
這裏是一些你能夠如何處理結果的方法:
>>> result.ready() # 任務是否已經執行,若是已經執行完畢,返回值爲 True。 False >>> result.result # 任務還沒完成,尚無返回值。 None >>> result.get() # 等待,直到任務執行完畢並返回值。 8 >>> result.result # 直接返回結果,不產生任何錯誤。 8 >>> result.successful() # 任務是否成功執行,若是成功則返回值 True。 True
若是任務執行過程當中發生了異常,result.successful() 的返回值將會變成 False,同時 result.result 將包含一個由 task 產生的異常實例。