Python 並行分佈式框架:Celery

Celery (芹菜)是基於Python開發的分佈式任務隊列。它支持使用任務隊列的方式在分佈的機器/進程/線程上執行任務調度。html

1、架構設計

 

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。python

  • 消息中間件redis

    Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQsql

  • 任務執行單元mongodb

    Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。apache

  • 任務結果存儲json

    Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache架構

另外, Celery還支持不一樣的併發和序列化的手段併發

  • 併發app

    PreforkEventletgevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

 

2、安裝和運行

    Celery的安裝過程略爲複雜,下面的安裝過程是基於個人AWS EC2的Linux版本的安裝過程,不一樣的系統安裝過程可能會有差別。你們能夠參考官方文檔。

     首先我選擇RabbitMQ做爲消息中間件,因此要先安裝RabbitMQ。做爲安裝準備,先更新YUM。   

sudo apt-get update
sudo apt-get upgrade

     RabbitMQ是基於erlang的,因此先安裝erlang.

# Add and enable relevant application repositories:
# Note: We are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm
 
# Finally, download and install Erlang:
yum install -y erlang

而後安裝RabbitMQ

# Download the latest RabbitMQ package using wget:
wget  
# Add the necessary keys for verification:
rpm --import  
# Install the .RPM package using YUM:
yum install rabbitmq-server-3.2.2-1.noarch.rpm

啓動RabbitMQ服務。

rabbitmq-server start

RabbitMQ服務已經準備好了,而後安裝Celery, 假定你使用pip來管理你的python安裝包

pip install Celery

可能須要下面的庫,以下安裝便可:

sudo apt-get install sqlite
sudo pip install sqlalchemy

 

3、快速入門例程

爲了測試Celery是否工做,咱們運行一個最簡單的任務,編寫tasks.py

from celery import Celery
 
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
@app .task
def add(x, y):
    return x + y

在當前目錄運行一個worker,用來執行這個加法的task

celery -A tasks worker --loglevel=info

其中-A參數表示的是Celery App的名字。注意這裏我使用的是SQLAlchemy做爲結果存儲。對應的python包要事先安裝好。

worker日誌中咱們會看到這樣的信息:

- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1e68d50
- ** ---------- .> transport:   amqp://guest:**@localhost :5672//
- ** ---------- .> results:     db+sqlite:///results.sqlite
- *** --- * --- .> concurrency: 8 (prefork)

其中,咱們能夠看到worker缺省使用prefork來執行併發,並設置併發數爲8

下面的任務執行的客戶端代碼:

from celery import Celery
from tasks2 import add
import time
result = add.delay(4,4)

print "Waiting result..." 
while not result.ready():
  time.sleep(2)
  
print "Result:",result.get()

用python執行這段客戶端代碼,在客戶端,結果以下

Waiting result...
Result: 8

Work日誌顯示:

[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]
[2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] 
succeeded in 0.0309705100954s: 8

    這裏咱們能夠發現,每個task有一個惟一的ID,task異步執行在worker上。

     這裏要注意的是,若是你運行官方文檔中的例子,你是沒法在客戶端獲得結果的,這也是我爲何要使用SQLAlchemy 來存儲任務執行結果的緣由。官方的例子使用AMPQ,有可能Worker在打印日誌的時候取出了task的運行結果顯示在worker日誌中,然而 AMPQ做爲一個消息隊列,當消息被取走後,隊列中就沒有了,因而客戶端老是沒法獲得任務的執行結果。不知道爲何官方文檔對這樣的錯誤視而不見。

    若是你們想要對Celery作更進一步的瞭解,請參考官方文檔

相關文章
相關標籤/搜索