Celery是分佈式任務隊列,能實時處理任務, 同時支持task scheduling. 官方文檔
Celery工做原理以下:html
celery client發送message給brokerredis
worker 從broker中消費消息,並將結果存儲在result_end中json
本文中使用的broker是Rabbit MQ,result_end使用的是Redis.瀏覽器
如今有兩個task,分別是加法運算和乘法運算。假定乘法運算的事件優先級高&事件也不少,對於加法運算,要求每分鐘最多處理10個事件。app
Celery Worker:
在2 臺server上部署worker,其中:
server1上的worker處理queue priority_low和priority_high上的事件
server2上的worker只處理priority_high上的事件框架
Celery Client:在應用中調用分佈式
Rabbit MQ:在server3上啓動url
Redis:在localhost啓動spa
對兩個任務加上callback的處理,若是成功,打印「----[task_id] is done」code
from celery import Celery from kombu import Queue import time app = Celery('tasks', backend='redis://127.0.0.1:6379/6') app.config_from_object('celeryconfig') class CallbackTask(Task): def on_success(self, retval, task_id, args, kwargs): print "----%s is done" % task_id def on_failure(self, exc, task_id, args, kwargs, einfo): pass @app.task(base=CallbackTask) def add(x, y): return x + y @app.task(base=CallbackTask) def multiply(x,y): return x * y
from kombu import Queue from kombu import Exchange result_serializer = 'json' broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f" task_queues = ( Queue('priority_low', exchange=Exchange('priority', type='direct'), routing_key='priority_low'), Queue('priority_high', exchange=Exchange('priority', type='direct'), routing_key='priority_high'), ) task_routes = ([ ('tasks.add', {'queue': 'priority_low'}), ('tasks.multiply', {'queue': 'priority_high'}), ],) task_annotations = { 'tasks.add': {'rate_limit': '10/m'} }
消費priority_high事件
celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h
消費priority_high和priority_low事件
celery -A tasks worker -Q priority_high,priority_low --concurrency=4 -l info -E -n worker2@%h
生產者,pushlish 事件到broker
from tasks import add from tasks import multiply for i in xrange(50): add.delay(2, 2) multiply.delay(10,10)
pip install flower
假設在server2上啓動flower,flower默認的端口是5555.
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
在瀏覽器上輸入 http://server2_ip:5555, 能夠看到以下界面:
從queued tasks途中,能夠看出 priority_high中的task先消費完,和預期是同樣的。