python-celery使用教程

Celery

Celery是Python開發的分佈式任務調度模塊。分爲任務分發,任務隊列,worker3個部分。celery的出現,解決了python運行後臺任務的需求。html

這篇文章介紹的celery版本是3.1.18python

celery架構git

+------------------+
                                                       +------> | celery worker.1  |
+-----------------+       +-----------------------+    |        +------------------+
|   web service   +-----> | job queue(redis or ..)+----+                            
+-----------------+       +-----------------------+    |        +------------------+
                                                       +------> | celery worker.2  |
                                                       |        +------------------+
                                                       |                            
                                                       |        +------------------+
                                                       +------> | celery worker.[n]|
                                                                +------------------+

任務隊列,支持如redis,RabbitMQ甚至數據庫。一般redis是最好的選擇,不過數據庫在本地使用的時候,也是不錯的。github

安裝celery

使用douban的pypi鏡像,安裝會快一點。web

pip install -i http://pypi.douban.com/simple celery

普通使用

使用Redis做爲Broker時,再安裝一個celery-with-redisredis

開始編寫tasks.py:sql

import time
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def sendmail(mail):
    print('sending mail to %s...' % mail['to'])
    time.sleep(2.0)
    print('mail sent.')

if __name__ == '__main__':
    sendmail.delay(dict(to='myemail@gogs.io'))

啓動worker數據庫

celery -A tasks worker

運行任務json

python tasks.py

這裏普通的調用方式是sendmail(...) 若是改爲後臺運行就變成了sendmail.delay(...)flask

Celery默認設置就能知足基本要求。Worker以Pool模式啓動,默認大小爲CPU核心數量,缺省序列化機制是pickle,但能夠指定爲json。因爲Python調用UNIX/Linux程序實在太容易,因此,用Celery做爲異步任務框架很是合適。

flask中使用celery

web中使用celery是常有的事情。heroku官方就推薦python程序使用celery。 代碼,我放到github上一份https://github.com/codeskyblue/celery-examples

首先建立一個web.py文件

from flask import Flask
from celery import Celery


def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)

編寫任務函數

@celery.task()
def add_together(a, b):
    res = a + b
    print res
    return res

而後是一個主頁, 請求主頁的時候,就會自動調用add_together這個函數,交給celery運行

@app.route('/')
def homepage():
    a = random.randint(0, 10)
    b = random.randint(0, 10)
    add_together.delay(a, b)
    return 'Create new task {} + {}'.format(a, b)

而後還須要編寫一個run.py文件(這裏須要分開兩個文件)

from web import app

if __name__ == '__main__':
    app.run()

啓動web應用python web.py,啓動worker的方法celery -A web.celery worker

若是須要用sqlite做爲後端的話,也是能夠的,首先須要安裝pip install sqlalchemy

flask的app配置稍微改下

app.config.update(
    CELERY_BROKER_URL='sqla+sqlite:///queue.db',
    CELERY_RESULT_BACKEND='db+sqlite:///queue.db',
    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
)

而後,celery就會用本地的queue.db文件,做爲隊列了。

Refs

相關文章
相關標籤/搜索