Celery (芹菜)是基於Python開發的分佈式任務隊列。它支持使用任務隊列的方式在分佈的機器/進程/線程上執行任務調度。html
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。python
消息中間件redis
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQsql
任務執行單元mongodb
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。apache
任務結果存儲json
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache架構
另外, Celery還支持不一樣的併發和序列化的手段併發
併發app
序列化
pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
Celery的安裝過程略爲複雜,下面的安裝過程是基於個人AWS EC2的Linux版本的安裝過程,不一樣的系統安裝過程可能會有差別。你們能夠參考官方文檔。
首先我選擇RabbitMQ做爲消息中間件,因此要先安裝RabbitMQ。做爲安裝準備,先更新YUM。
sudo apt-get update sudo apt-get upgrade
RabbitMQ是基於erlang的,因此先安裝erlang.
# Add and enable relevant application repositories: # Note: We are also enabling third party remi package repositories. wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm # Finally, download and install Erlang: yum install -y erlang
而後安裝RabbitMQ
# Download the latest RabbitMQ package using wget: wget # Add the necessary keys for verification: rpm --import # Install the .RPM package using YUM: yum install rabbitmq-server-3.2.2-1.noarch.rpm
啓動RabbitMQ服務。
rabbitmq-server start
RabbitMQ服務已經準備好了,而後安裝Celery, 假定你使用pip來管理你的python安裝包
pip install Celery
可能須要下面的庫,以下安裝便可:
sudo apt-get install sqlite sudo pip install sqlalchemy
爲了測試Celery是否工做,咱們運行一個最簡單的任務,編寫tasks.py
from celery import Celery app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//') app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' @app .task def add(x, y): return x + y
在當前目錄運行一個worker,用來執行這個加法的task
celery -A tasks worker --loglevel=info
其中-A參數表示的是Celery App的名字。注意這裏我使用的是SQLAlchemy做爲結果存儲。對應的python包要事先安裝好。
worker日誌中咱們會看到這樣的信息:
- ** ---------- [config] - ** ---------- .> app: tasks:0x1e68d50 - ** ---------- .> transport: amqp://guest:**@localhost :5672// - ** ---------- .> results: db+sqlite:///results.sqlite - *** --- * --- .> concurrency: 8 (prefork)
其中,咱們能夠看到worker缺省使用prefork來執行併發,並設置併發數爲8
下面的任務執行的客戶端代碼:
from celery import Celery from tasks2 import add import time result = add.delay(4,4) print "Waiting result..." while not result.ready(): time.sleep(2) print "Result:",result.get()
用python執行這段客戶端代碼,在客戶端,結果以下
Waiting result... Result: 8
Work日誌顯示:
[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] [2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8
這裏咱們能夠發現,每個task有一個惟一的ID,task異步執行在worker上。
這裏要注意的是,若是你運行官方文檔中的例子,你是沒法在客戶端獲得結果的,這也是我爲何要使用SQLAlchemy 來存儲任務執行結果的緣由。官方的例子使用AMPQ,有可能Worker在打印日誌的時候取出了task的運行結果顯示在worker日誌中,然而 AMPQ做爲一個消息隊列,當消息被取走後,隊列中就沒有了,因而客戶端老是沒法獲得任務的執行結果。不知道爲何官方文檔對這樣的錯誤視而不見。
若是你們想要對Celery作更進一步的瞭解,請參考官方文檔