celery+RabbitMQ 實戰記錄2—工程化使用

上篇文章中,已經介紹了celery和RabbitMQ的安裝以及基本用法。html

本文將從工程的角度介紹如何使用celery。python

1.配置和啓動RabbitMQ

請參考celery+RabbitMQ實戰記錄api

2. 安裝和使用celery

2.1 建立虛擬環境,並安裝celery

$ mkdir celery_demo
$ cd celery_demo
$ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery

項目的目錄結構說明:app

-- celery_demo
    -- api.py
    -- celeryconfig.py
    -- rocket
        -- celery.py
        -- tasks.py

2.2 配置celery

建立配置文件 celeryconfig.py,裏面包含CELERY_IMPORTSBROKER_URLCELERYD_LOG_FORMATCELERY_ROUTES.async

# celeryconfig.py

RABBIT_MQ = {
    'HOST': '127.0.0.1',
    'PORT': 5672,
    'USER': 'test',
    'PASSWORD': '123456'
}

CELERY_IMPORTS = ("rocket.tasks", )

BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT'])

CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s'

CELERY_ROUTES = {
        'rocket.tasks.add': {'queue': 'sunday'},
}

其中,參數定義以下:this

  • CELERY_IMPORTS 導入的taskspa

  • BROKER_URL指定了broker信息,即消息隊列的地址。.net

  • CELERYD_LOG_FORMAT 指定了日誌格式。日誌

  • CELERY_ROUTES 指定了路由信息,即調用rocket.tasks.add後,消息具體放入哪一個隊列,這裏是隊列名稱爲sundaycode

2.3 建立celery實例

建立目錄

mkdir rocket

在rocket目錄下,建立文件celery.py

from celery import Celery

app = Celery("orange", backend='amqp')
app.config_from_object("celeryconfig")

建立實例,並讀取配置。

2.4 定義tasks

在rocket目錄下,建立文件tasks.py

from rocket.celery import app


@app.task
def add(x, y):
        return x + y

定義tasks。

2.5 啓動celery(即消費者)

確認當前所在的目錄:

$ pwd
/workspace/celery_demo

啓動消費者

$ ./venv3/bin/celery worker  -A rocket -Q sunday --loglevel=info  -f app.log

/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
    The AMQP result backend is scheduled for deprecation in     version 4.0 and removal in version v5.0.     Please use RPC backend or a persistent backend.

  alternative='Please use RPC backend or a persistent backend.')

celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb)

Darwin-18.2.0-x86_64-i386-64bit 2019-04-22 20:53:56

[config]
.> app:         orange:0x10abdb6d8
.> transport:   amqp://test:**@127.0.0.1:5672/myvhost
.> results:     amqp://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> sunday           exchange=sunday(direct) key=sunday


[tasks]
  . rocket.tasks.add

2.6 生產者

建立文件api.py,會調用apply_async向隊列中投放消息。

# api.py
from rocket.tasks import add


print("start...")

result = add.apply_async((1, 2), expires=10)

print("result:", result)
print(result.ready())


print("end...")

執行api.py,啓動生產者

./venv3/bin/python api.py
start...
result: 6ef453e6-1797-4dd7-a2fc-bd6ef8096fde
True
end...

2.6 查看結果

查看日誌文件app.log

[2019-04-23 09:53:46,406] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost
[2019-04-23 09:53:46,420] [INFO] mingle: searching for neighbors
[2019-04-23 09:53:47,455] [INFO] mingle: all alone
[2019-04-23 09:53:47,474] [INFO] celery@admindeMacBook-Pro-2.local ready.
[2019-04-23 09:53:55,850] [INFO] Received task: rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde]   expires:[2019-04-23 01:54:05.817853+00:00]
[2019-04-23 09:53:55,876] [INFO] Task rocket.tasks.add[6ef453e6-1797-4dd7-a2fc-bd6ef8096fde] succeeded in 0.023095715092495084s: 3

參考

using celery in project

相關文章
相關標籤/搜索