pip install django pip install celery pip install django-celery
root@bogon:~# django-admin startproject news
root@bogon:~# tree news/ news/ ├── manage.py └── news ├── __init__.py ├── settings.py ├── urls.py └── wsgi.py
from __future__ import absolute_import import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'news.settings') from django.conf import settings app = Celery('news') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app
import djcelery djcelery.setup_loader() BROKER_URL = 'django://' # 如下爲mq配置 # BROKER_URL = "amqp://guest:guest@localhost:5672//" INSTALLED_APPS = ( 'djcelery', 'kombu.transport.django', )
# 如下是標準的redis配置 BROKER_URL = 'redis://localhost:6379' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # rabbitmq 配置 # 官網優化的地方也推薦使用c的librabbitmq CELERY_RESULT_BACKEND = "amqp" # celery任務執行結果的超時時間 CELERY_TASK_RESULT_EXPIRES = 1200 # celery worker的併發數 CELERYD_CONCURRENCY = 50 # celery worker 每次去rabbitmq取任務的數量 CELERYD_PREFETCH_MULTIPLIER = 4 # 每一個worker執行了多少任務就會死掉 CELERYD_MAX_TASKS_PER_CHILD = 40 # 這是使用了django-celery默認的數據庫調度模型,任務執行週期都被存在你指定的orm數據庫中 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 我作的rabbitmq和celery監控很完善因此這個任務超時時間比較短只有半小時 CELERYD_TASK_TIME_LIMIT = 1800 # 默認的隊列,若是一個消息不符合其餘的隊列就會放在默認隊列裏面 CELERY_DEFAULT_QUEUE = "default_dongwm" ## 如下是隊列的一些配置 CELERY_QUEUES = { "default_dongwm": { # 這是上面指定的默認隊列 "exchange": "default_dongwm", "exchange_type": "direct", "routing_key": "default_dongwm" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topictest.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "test2": { # test和test2是2個fanout隊列,注意他們的exchange相同 "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks", }, "test": { "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks2", }, }
class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task.startswith('topictest'): return { 'queue': 'topicqueue', } # 個人dongwm.tasks文件裏面有2個任務都是test開頭 elif task.startswith('webui.tasks.test'): return { "exchange": "broadcast_tasks", } # 剩下的其實就會被放到默認隊列 else: return None # CELERY_ROUTES原本也能夠用一個大的含有多個字典的字典,可是不如直接對它作一個名稱統配 CELERY_ROUTES = (MyRouter(), )
root@bogon:~# django-admin startapp webui root@bogon:~# cat webui/tasks.py from __future__ import absolute_import from celery import shared_task @shared_task def add(x, y): return x + y
#每分鐘執行任務 @periodic_task(run_every=crontab(minute="*/1")) def check_ping(): print 'Pong'