Celery是Python開發的分佈式任務調度模塊。分爲任務分發,任務隊列,worker3個部分。celery的出現,解決了python運行後臺任務的需求。html
這篇文章介紹的celery版本是3.1.18python
celery架構git
+------------------+ +------> | celery worker.1 | +-----------------+ +-----------------------+ | +------------------+ | web service +-----> | job queue(redis or ..)+----+ +-----------------+ +-----------------------+ | +------------------+ +------> | celery worker.2 | | +------------------+ | | +------------------+ +------> | celery worker.[n]| +------------------+
任務隊列,支持如redis,RabbitMQ甚至數據庫。一般redis是最好的選擇,不過數據庫在本地使用的時候,也是不錯的。github
使用douban的pypi鏡像,安裝會快一點。web
pip install -i http://pypi.douban.com/simple celery
使用Redis做爲Broker時,再安裝一個celery-with-redis
redis
開始編寫tasks.py:sql
import time from celery import Celery celery = Celery('tasks', broker='redis://localhost:6379/0') @celery.task def sendmail(mail): print('sending mail to %s...' % mail['to']) time.sleep(2.0) print('mail sent.') if __name__ == '__main__': sendmail.delay(dict(to='myemail@gogs.io'))
啓動worker數據庫
celery -A tasks worker
運行任務json
python tasks.py
這裏普通的調用方式是sendmail(...)
若是改爲後臺運行就變成了sendmail.delay(...)
flask
Celery默認設置就能知足基本要求。Worker以Pool模式啓動,默認大小爲CPU核心數量,缺省序列化機制是pickle,但能夠指定爲json。因爲Python調用UNIX/Linux程序實在太容易,因此,用Celery做爲異步任務框架很是合適。
web中使用celery是常有的事情。heroku官方就推薦python程序使用celery。 代碼,我放到github上一份https://github.com/codeskyblue/celery-examples
首先建立一個web.py文件
from flask import Flask from celery import Celery def make_celery(app): celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) app.config.update( CELERY_BROKER_URL='redis://localhost:6379', CELERY_RESULT_BACKEND='redis://localhost:6379' ) celery = make_celery(app)
編寫任務函數
@celery.task() def add_together(a, b): res = a + b print res return res
而後是一個主頁, 請求主頁的時候,就會自動調用add_together這個函數,交給celery運行
@app.route('/') def homepage(): a = random.randint(0, 10) b = random.randint(0, 10) add_together.delay(a, b) return 'Create new task {} + {}'.format(a, b)
而後還須要編寫一個run.py
文件(這裏須要分開兩個文件)
from web import app if __name__ == '__main__': app.run()
啓動web應用python web.py
,啓動worker的方法celery -A web.celery worker
若是須要用sqlite做爲後端的話,也是能夠的,首先須要安裝pip install sqlalchemy
flask的app配置稍微改下
app.config.update( CELERY_BROKER_URL='sqla+sqlite:///queue.db', CELERY_RESULT_BACKEND='db+sqlite:///queue.db', CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] )
而後,celery就會用本地的queue.db文件,做爲隊列了。