django開發-使用celery搭建分佈式(多節點)任務隊列

今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務並執行。搭建3個或者以上的節點就相似了),使用到了celery,rabbitmq。這裏不會單獨介紹celery和rabbitmq中的知識了。python

1.項目基礎環境:
兩個ubuntu18.04虛擬機、python3.6.五、django2.0.四、celery3.1.26post2redis

2.主節點django項目結構:
圖片描述django

3.settings.py中關於celery的配置:json

import djcelery
# 此處的Queue和Exchange都涉及到RabbitMQ中的概念,這裏不作介紹
from kombu import Queue, Exchange
djcelery.setup_loader()
BROKER_URL = 'amqp://test:test@192.168.43.6:5672/testhost'
CELERY_RESULT_BACKEND = 'amqp://test:test@192.168.43.6:5672/testhost'

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER='json'
CELERY_RESULT_SERIALIZER='json'
# CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'msgpack', 'yaml']

CELERY_DEFAULT_EXCHANGE = 'train'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERY_IMPORTS = ("proj.celery1.tasks", )

CELERY_QUEUES = (
  Queue('train', routing_key='train'),
  Queue('predict', routing_key='predict'),
)

4.celery.py中的配置:ubuntu

# coding:utf8
from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

5.proj/init.py中的配置:api

from __future__ import absolute_import
from .celery import app as celery_app

6.celery1/tasks.py:(主節點中的任務不會執行,只執行子節點中的任務)app

from __future__ import absolute_import
from celery import task


@task
def do_train(x, y):
    return x + y

7.celery1/views.py:async

from .tasks import do_train
class Test1View(APIView):
    def get(self, request):
        try:
            # 這裏的queue和routing_key也涉及到RabiitMQ中的知識
            # 關鍵,在這裏控制向哪一個queue中發送任務,子節點經過這個執行對應queue中的任務
            ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train")
            # 獲取結果
            data = ret.get()
        except Exception as e:
            return Response(dict(msg=str(e), code=10001))
        return Response(dict(msg="OK", code=10000, data=data))

8.子節點目錄結構:
圖片描述post

9.子節點中celery1/celery.py:url

from __future__ import absolute_import
from celery import Celery
CELERY_IMPORTS = ("celery1.tasks", )
app = Celery('myapp',
             # 此處涉及到RabbitMQ的知識,RabbitMQ是對應主節點上的
             broker='amqp://test:test@192.168.43.6:5672/testhost',
             backend='amqp://test:test@192.168.43.6:5672/testhost',
             include=['celery1.tasks'])

app.config_from_object('celery1.config')

if __name__ == '__main__':
  app.start()

10.子節點中celery1/config.py:

from __future__ import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER='json'
CELERY_RESULT_SERIALIZER='json'
CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml']

CELERY_DEFAULT_EXCHANGE = 'train'
# exchange type能夠看RabbitMQ中的相關內容
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERT_QUEUES =  (
  Queue('train',exchange='train',routing_key='train'),
)

11.子節點celery1/tasks.py:(這個是要真正執行的task,每一個節點能夠不一樣)

from __future__ import absolute_import
from celery1.celery import app


import time
from celery import task


@task
def do_train(x, y):
    """
    訓練
    :param data:
    :return:
    """
    time.sleep(3)
    return dict(data=str(x+y),msg="train")

12.啓動子節點中的celery:
celery1是項目,-Q train表示從train這個queue中接收任務

celery -A celery1 worker -l info -Q train

13.啓動主節點中的django項目:

python manage.py runserver

14.使用Postman請求對應的view

請求url:http://127.0.0.1:8000/api/v1/celery1/test/
返回的結果是:
{
    "msg": "OK",
    "code": 10000,
    "data": {
        "data": "6",
        "msg": "train"
    }
}

15.遇到的問題:
1)celery隊列報錯: AttributeError: ‘str’ object has no attribute ‘items’
解決:將redis庫從3.0回退到了2.10,pip install redis==2.10
解決方法參考連接:https://stackoverflow.com/que...

今天就說到這裏,若有疑問,歡迎交流。

相關文章
相關標籤/搜索