分佈式任務隊列Celery快速上手

Celery介紹

celery(芹菜)是一個異步任務隊列/基於分佈式消息傳遞的做業隊列。 php

  • 它側重於實時操做,但對調度支持也很好。
  • celery用於生產系統天天處理數以百萬計的任務。
  • celery是用Python編寫的,但該協議能夠在任何語言實現。它也能夠用其餘語言經過webhooks實現。
  • 目前已知有php/ruby/nodejs的實現

爲何用Celery?

  • 異步
  • 耗時久的事兒能夠扔給 Worker 處理,處理完能夠觸發子任務提醒
  • 自然的併發能力(多進程/協程)!
  • 很是方便添加 Worker 來加強處理能力
  • Celery提供了Web方式的監控/報警,這樣,咱們就能夠監控每一個任務的狀況了
  • 出現錯誤能夠自動處理/重試

角色介紹

Brokers: 提供隊列服務,Celery支持的Brokers有: node

  • RabbitMQ(推薦)
  • Redis
  • MongoDB
  • Beanstalk
  • CouchDB
  • SQLAlchemy(MySQL/PostgreSQL/Sqlite/Oracle)
  • Amazon SQS等

Worker: 真正幹活的,實際運行任務的節點。 web

開始 Celery 的第一步

選擇你的 Broker

在你正式開始使用 Celery 以前,你須要選擇、安裝並運行一個 broker。 redis

Broker 是一種負責接收、發送任務消息(task messages)的服務 sql

你能夠從如下幾種 broker 中選擇一個: 數據庫

  • RabbitMQ : [[http://www.rabbitmq.com/]]

功能完整、安全、飽經實踐檢驗的 broker。若是對你而言,不丟失消息很是重要,RabbitMQ 將是你最好的選擇。 django

請查看 [[/技術分享/Celery/broker-installation]] 以便得到更多關於 RabbitMQ 安裝和配置相關的信息。 後端

  • Redis : [[http://redis.io/]]

也是一個功能完整的 broker,但電源故障或異常將致使數據丟失。 安全

請查看 [[/技術分享/Celery/otherqueues-redis]] 以便配置相關的信息。 ruby

  • 數據庫

不推薦使用數據庫做爲消息隊列,但在很是小的應用中可使用。Celery 可使用 SQLAlchemy 和 Django ORMS。請查看 [[/技術分享/Celery/otherqueues-sqlalchemy]] 或 [[/技術分享/Celery/otherqueues-django]]。

  • 更多其餘選擇。

除了上面列出的之外,還有其餘能夠選擇的傳輸實現,例如 CouchDB, Beanstalk, MongoDB, and SQS。請查閱 Kombu 的文檔以便得到更多信息。

安裝Celery

#安裝celery $ pip install celery #安裝時區的模塊,否則會有時間慢8小時的問題 $ pip install pytz

建立一個簡單「任務」(Task)

在這個教程裏,咱們將建立一個簡單的「任務」(Task) —— 把兩個數加起來。一般,咱們在 Python 的模塊中定義「任務」。

按照慣例,咱們將調用模塊 file:tasks.py,看起來會像這個樣子:

file:tasks.py

from celery.task import task @task def add(x, y): return x + y

此時, @task 裝飾器實際上建立了一個繼承自 :class:~celery.task.base.Task 的「類」(class)。除非須要修改「任務類」的缺省行爲,不然咱們推薦只經過裝飾器定義「任務」(這是咱們推崇的最佳實踐)。

seealso: 關於建立任務和任務類的完整文檔能夠在 ../userguide/tasks 中找到。

配置

Celery 使用一個配置模塊來進行配置。這個模塊缺省北命名爲 :file:celeryconfig.py。

爲了能被 import,這個配置模塊要麼存在於當前目錄,要麼包含在 Python 路徑中。

同時,你能夠經過使用環境變量 CELERY_CONFIG_MODULE 來隨意修改這個配置文件的名字。

如今來讓咱們建立配置文件 celeryconfig.py.

  1. 配置如何鏈接 broker(例子中咱們使用 RabbitMQ): BROKER_URL = "amqp:''guest:guest@localhost :5672''"
  2. 定義用於存儲元數據(metadata)和返回值(return values)的後端: CELERY_RESULT_BACKEND = "amqp"

AMQP 後端缺省是非持久化的,你只能取一次結果(一條消息)。

能夠閱讀 :ref:conf-result-backend 瞭解可使用的後端清單和相關參數。

  1. 最後,咱們列出 worker 須要 import 的模塊,包括你的任務。

咱們只有一個剛開始添加的任務模塊 :file:tasks.py::

CELERY_IMPORTS = ("tasks", )

這就好了。

你還有更多的選項可使用,例如:你指望使用多少個進程來並行處理(:setting:CELERY_CONCURRENCY 設置),或者使用持久化的結果保存後端。能夠閱讀 :ref:configuration 查看更多的選項。

note:

你能夠也使用 $ celery -A tasks worker --loglevel=info

運行 worker 服務器

爲了方便測試,咱們將在前臺運行 worker 服務器,這樣咱們就能在終端上看到 celery 上發生的事情:

$ celeryd --loglevel=INFO

在生產環境中,也許你但願將 worker 在後臺以守護進程的方式運行。若是你但願這麼作,你能夠利用平臺或者相似於 supervisord_ (查閱 :ref:daemonizing 以得到更多信息) 的工具來實現。

能夠經過下列命令行得到完整的命令參數清單:

$ celeryd --help

supervisord: [[http://supervisord.org]]

執行任務(task)

咱們經過調用 class 類的 ~celery.task.base.Task.delay 方法執行任務。

~celery.task.base.Task.apply_async 方法一個很是方便的方法,經過這個方法咱們能夠充分控制控制任務執行的參數(參見 :ref:guide-executing)。

>>> from tasks import add >>> add.delay(4, 4) <AsyncResult: 889143a6-39a2-4e52-837b-d80d33efb22d>

此時,任務已經被髮送到了消息 broker。直到有 worker 服務器取走並執行了這個任務,不然 Broker 將一直保存這個消息。

如今咱們可使用任務返回類 ~celery.result.AsyncResult 來查看 worker 的日誌,看看到底發生了什麼。若是你配置了一個結果存儲類 ~celery.result.AsyncResult 來保存任務狀態,任務執行完畢可得到返回值;任務執行失敗則可得到異常/回調等信息。

Shell裏執行任務

#add 爲任務名 $ celery call add

保留結果

若是你想跟蹤任務狀態,Celery 就須要把這些狀態信息發送並存儲到某處。你可使用多種後端來達到這個目的: SQLAlchemy/Django ORM, Memcached, Redis,AMQP, MongoDB, Tokyo Tyrant 和 Redis —— 甚至你能夠定義你本身的。

在這裏,咱們將使用 amqp 做爲保存結果的後端來存儲狀態消息。能夠經過 {{{CELERY_RESULT_BACKEND}}} 來修改後端選項,你能夠用於保存結果後端的選項有:

CELERY_RESULT_BACKEND = "amqp" #: 咱們但願結果最多保存 5 分鐘。  #: 注意,這個特性須要 2.1.1 以上版本 RabbitMQ 才能支持。 #: 若是你使用的是早期版本的 RabbitMQ,請註釋下面這行。 CELERY_TASK_RESULT_EXPIRES = 300

請經過閱讀 Celery/任務結果後端 得到更多相關配置的信息。

如今,咱們配置好了保存結果的後端,讓咱們從新來執行任務。咱們再次使用類 ~celery.result.AsyncResult:

>>> result = add.delay(4, 4)

這裏是一些你能夠如何處理結果的方法:

>>> result.ready() # 任務是否已經執行,若是已經執行完畢,返回值爲 True。 False >>> result.result # 任務還沒完成,尚無返回值。 None >>> result.get() # 等待,直到任務執行完畢並返回值。 8 >>> result.result # 直接返回結果,不產生任何錯誤。 8 >>> result.successful() # 任務是否成功執行,若是成功則返回值 True。 True

若是任務執行過程當中發生了異常,result.successful() 的返回值將會變成 False,同時 result.result 將包含一個由 task 產生的異常實例。

相關文章
相關標籤/搜索