實現一個異步的任務隊列,最簡單來看須要如下幾個部分:任務消息的隊列,任務執行者,任務結果存儲python
消息中間件web
消息中間件即用來提供消息隊列功能的組件,celery自己不提供,可是支持不少的類型的中間人:RabbitMQ, Redis, Mongodb, Django ORM等引擎。redis
任務執行者sql
任務執行者即worker,由celery提供的任務執行單元,這裏就能夠分佈式的放在系統的各個節點中。實現併發的特性。mongodb
任務結果存儲shell
用來存儲任務運行的結果的組件,默認是沒有的,能夠自定義引擎,支持SQLAlchemy,cache, mongodb, redis等數據庫
使用的環境爲python2.7, redis3.0.7,這裏準備使用redis做爲消息中間件和結果存儲。
django
1. 先定義tasks.py 服務器
from celery import Celery app = Celery('tasks', backend="redis://127.0.0.1:6379/1", broker='redis://127.0.0.1:6379/0') #app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite' #app.conf.CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' @app.task def add(x, y): return x + y
Celery第一參數爲Celery App的名字,backend爲任務結果存儲,broker爲消息中間件,這裏使用的redis引擎。
session
下面定義一個task,用來計算兩個參數的和,並返回。
2. 啓動worker
在tasks.py目錄的命令行執行:
celery -A tasks worker --loglevel=info
能夠看到以下信息:
能夠看到worker的基本信息,app, transport, results ,能夠看到默認經過prefork起了4個進程
接下來咱們經過命令行調用tasks裏面的add任務:
咱們使用add.delay執行,傳入參數5,6,經過返回值result的ready()方法能夠獲得是否執行成功,經過get獲得任務執行的結果。
在worker那裏,能夠看到任務的添加和執行的過程:
基本python 的使用就這裏了。
celery易於web框架集成,與不少web框架都有支持的第三方庫:
這裏使用django-celery, 經過pip安裝好後,建立一個django項目後,在settings.py中增長以下代碼:
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' #BORKER_URL = 'django://' INSTALLED_APPS = ( 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', #"kombu.transport.django", ... )
settings.py中設置了broker的url,固然測試環境可使用django做爲broker,若是食用django做爲broker,則在installed_apps中增長kombu.transport.django。
接下來,咱們在本身的django的app裏面建立一個tasks.py文件:
from celery import task @task def add(x, y): return x + y
能夠直接使用celery的task裝飾器裝飾定義的task,這樣就告訴Celery這是一個task。
在settings.py中的djcelery.setup_loader()運行時,Celery便會查看全部INSTALLED_APPS中的app目錄中的tasks.py文件,將全部標記task的function註冊成celery task。
接下來啓動workder,由於咱們使用了django-celery,那麼直接使用命令:
python manager.py celery worker --loglevel=info
便可啓動worker,終端中的輸出跟以前celery直接開啓worker基本一致。
而後咱們能夠經過python manager.py shell 去調用add這個task:
經過worker的終端也能看到task的處理。
若是在settings.py中設置了CELERY_ALWAYS_EAGER=True,則執行task時,便不需加delay了。
雖然使用crnotab能夠在服務器上定時執行寫好的腳本,celery提供了定時任務的功能,可以經過scheduler很好的跟django結合起來。
要是celery執行定時任務的方式不少種,咱們採用將定時任務存儲在django數據庫中,咱們須要在settings.py中配置:
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
而後就能夠在django的admin管理頁面看到以下:
能夠經過配置Periodic tasks增長定時任務。
經過python manager.py celery beat開啓定時任務。
參考連接:http://www.weiguda.com/blog/73/
http://my.oschina.net/zhangxu0512/blog/212447?fromerr=r098W69A