今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務並執行。搭建3個或者以上的節點就相似了),使用到了celery,rabbitmq。這裏不會單獨介紹celery和rabbitmq中的知識了。python
1.項目基礎環境:
兩個ubuntu18.04虛擬機、python3.6.五、django2.0.四、celery3.1.26post2redis
2.主節點django項目結構:
django
3.settings.py中關於celery的配置:json
import djcelery # 此處的Queue和Exchange都涉及到RabbitMQ中的概念,這裏不作介紹 from kombu import Queue, Exchange djcelery.setup_loader() BROKER_URL = 'amqp://test:test@192.168.43.6:5672/testhost' CELERY_RESULT_BACKEND = 'amqp://test:test@192.168.43.6:5672/testhost' CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_RESULT_SERIALIZER='json' # CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'msgpack', 'yaml'] CELERY_DEFAULT_EXCHANGE = 'train' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_IMPORTS = ("proj.celery1.tasks", ) CELERY_QUEUES = ( Queue('train', routing_key='train'), Queue('predict', routing_key='predict'), )
4.celery.py中的配置:ubuntu
# coding:utf8 from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks(settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
5.proj/init.py中的配置:api
from __future__ import absolute_import from .celery import app as celery_app
6.celery1/tasks.py:(主節點中的任務不會執行,只執行子節點中的任務)app
from __future__ import absolute_import from celery import task @task def do_train(x, y): return x + y
7.celery1/views.py:async
from .tasks import do_train class Test1View(APIView): def get(self, request): try: # 這裏的queue和routing_key也涉及到RabiitMQ中的知識 # 關鍵,在這裏控制向哪一個queue中發送任務,子節點經過這個執行對應queue中的任務 ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train") # 獲取結果 data = ret.get() except Exception as e: return Response(dict(msg=str(e), code=10001)) return Response(dict(msg="OK", code=10000, data=data))
8.子節點目錄結構:
post
9.子節點中celery1/celery.py:url
from __future__ import absolute_import from celery import Celery CELERY_IMPORTS = ("celery1.tasks", ) app = Celery('myapp', # 此處涉及到RabbitMQ的知識,RabbitMQ是對應主節點上的 broker='amqp://test:test@192.168.43.6:5672/testhost', backend='amqp://test:test@192.168.43.6:5672/testhost', include=['celery1.tasks']) app.config_from_object('celery1.config') if __name__ == '__main__': app.start()
10.子節點中celery1/config.py:
from __future__ import absolute_import from kombu import Queue,Exchange from datetime import timedelta CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_RESULT_SERIALIZER='json' CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml'] CELERY_DEFAULT_EXCHANGE = 'train' # exchange type能夠看RabbitMQ中的相關內容 CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERT_QUEUES = ( Queue('train',exchange='train',routing_key='train'), )
11.子節點celery1/tasks.py:(這個是要真正執行的task,每一個節點能夠不一樣)
from __future__ import absolute_import from celery1.celery import app import time from celery import task @task def do_train(x, y): """ 訓練 :param data: :return: """ time.sleep(3) return dict(data=str(x+y),msg="train")
12.啓動子節點中的celery:
celery1是項目,-Q train表示從train這個queue中接收任務
celery -A celery1 worker -l info -Q train
13.啓動主節點中的django項目:
python manage.py runserver
14.使用Postman請求對應的view
請求url:http://127.0.0.1:8000/api/v1/celery1/test/ 返回的結果是: { "msg": "OK", "code": 10000, "data": { "data": "6", "msg": "train" } }
15.遇到的問題:
1)celery隊列報錯: AttributeError: ‘str’ object has no attribute ‘items’
解決:將redis庫從3.0回退到了2.10,pip install redis==2.10
解決方法參考連接:https://stackoverflow.com/que...
今天就說到這裏,若有疑問,歡迎交流。