上篇文章中,已經介紹了celery和RabbitMQ的安裝以及基本用法。html
本文將從工程的角度介紹如何使用celery。python
請參考celery+RabbitMQ實戰記錄。api
$ mkdir celery_demo $ cd celery_demo $ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery
項目的目錄結構說明:app
-- celery_demo -- api.py -- celeryconfig.py -- rocket -- celery.py -- tasks.py
建立配置文件 celeryconfig.py,裏面包含CELERY_IMPORTS
、BROKER_URL
、CELERYD_LOG_FORMAT
、CELERY_ROUTES
.async
# celeryconfig.py RABBIT_MQ = { 'HOST': '127.0.0.1', 'PORT': 5672, 'USER': 'test', 'PASSWORD': '123456' } CELERY_IMPORTS = ("rocket.tasks", ) BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT']) CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s' CELERY_ROUTES = { 'rocket.tasks.add': {'queue': 'sunday'}, }
其中,參數定義以下:this
CELERY_IMPORTS
導入的taskspa
BROKER_URL
指定了broker信息,即消息隊列的地址。.net
CELERYD_LOG_FORMAT
指定了日誌格式。日誌
CELERY_ROUTES
指定了路由信息,即調用rocket.tasks.add
後,消息具體放入哪一個隊列,這裏是隊列名稱爲sunday
。code
建立目錄
mkdir rocket
在rocket目錄下,建立文件celery.py
from celery import Celery app = Celery("orange", backend='amqp') app.config_from_object("celeryconfig")
建立實例,並讀取配置。
在rocket目錄下,建立文件tasks.py
from rocket.celery import app @app.task def add(x, y): return x + y
定義tasks。
確認當前所在的目錄:
$ pwd /workspace/celery_demo
啓動消費者
$ ./venv3/bin/celery worker -A rocket -Q sunday --loglevel=info -f app.log /workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend. alternative='Please use RPC backend or a persistent backend.') celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb) Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56 [config] .> app: orange:0x10abdb6d8 .> transport: amqp://test:**@127.0.0.1:5672/myvhost .> results: amqp:// .> concurrency: 8 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> sunday exchange=sunday(direct) key=sunday [tasks] . rocket.tasks.add
建立文件api.py,會調用apply_async
向隊列中投放消息。
# api.py from rocket.tasks import add print("start...") result = add.apply_async((1, 2), expires=10) print("result:", result) print(result.ready()) print("end...")
執行api.py,啓動生產者
./venv3/bin/python api.py start... result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde True end...
查看日誌文件app.log
[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost [2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors [2019-04-23 09:53:47,455] [INFO] mingle: all alone [2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready. [2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] expires:[2019-04-23 01:54:05.817853+00:00] [2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3