django celery異步框架

描述:實現運維平臺的異步執行與定時任務,如下簡單描述了安裝過程及使用。
 
安裝django和celery
pip install django
pip install celery 
pip install django-celery
 
新建一個項目名爲news
root@bogon:~# django-admin startproject news  
 
查看目錄樹
root@bogon:~# tree news/
news/
├── manage.py
└── news
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
 
定義一個celery實例 news/news/celery.py
from __future__ import absolute_import
 
import os
 
from celery import Celery
 
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'news.settings')
 
from django.conf import settings
 
app = Celery('news')
 
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
 
 
@app.task(bind=True)
def debug_task(self):
  print('Request: {0!r}'.format(self.request))
 
在django中加載應用news/news/__init__.py
from __future__ import absolute_import
 
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

 

celery基本配置
import djcelery
djcelery.setup_loader()
 
BROKER_URL = 'django://'
# 如下爲mq配置
# BROKER_URL = "amqp://guest:guest@localhost:5672//"
 
INSTALLED_APPS = (
  'djcelery',
  'kombu.transport.django',
)
 
celery隊列配置以及詳細說明
# 如下是標準的redis配置
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
 
 
# rabbitmq 配置
# 官網優化的地方也推薦使用c的librabbitmq
CELERY_RESULT_BACKEND = "amqp"
# celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 1200
# celery worker的併發數
CELERYD_CONCURRENCY = 50
# celery worker 每次去rabbitmq取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每一個worker執行了多少任務就會死掉
CELERYD_MAX_TASKS_PER_CHILD = 40
# 這是使用了django-celery默認的數據庫調度模型,任務執行週期都被存在你指定的orm數據庫中
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# 我作的rabbitmq和celery監控很完善因此這個任務超時時間比較短只有半小時
CELERYD_TASK_TIME_LIMIT = 1800
# 默認的隊列,若是一個消息不符合其餘的隊列就會放在默認隊列裏面
CELERY_DEFAULT_QUEUE = "default_dongwm"
 
## 如下是隊列的一些配置
CELERY_QUEUES = {
  "default_dongwm": { # 這是上面指定的默認隊列
    "exchange": "default_dongwm",
    "exchange_type": "direct",
    "routing_key": "default_dongwm"
  },
  "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
    "routing_key": "topictest.#",
    "exchange": "topic_exchange",
    "exchange_type": "topic",
  },
  "test2": { # test和test2是2個fanout隊列,注意他們的exchange相同
    "exchange": "broadcast_tasks",
    "exchange_type": "fanout",
    "binding_key": "broadcast_tasks",
  },
  "test": {
    "exchange": "broadcast_tasks",
    "exchange_type": "fanout",
    "binding_key": "broadcast_tasks2",
  },
}

class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task.startswith('topictest'): return { 'queue': 'topicqueue', } # 個人dongwm.tasks文件裏面有2個任務都是test開頭 elif task.startswith('webui.tasks.test'): return { "exchange": "broadcast_tasks", } # 剩下的其實就會被放到默認隊列 else: return None # CELERY_ROUTES原本也能夠用一個大的含有多個字典的字典,可是不如直接對它作一個名稱統配 CELERY_ROUTES = (MyRouter(), )
 
建立一個應用和異步任務
root@bogon:~# django-admin startapp webui
root@bogon:~# cat webui/tasks.py
from __future__ import absolute_import
 
from celery import shared_task
 
@shared_task
def add(x, y):
  return x + y
 
建立一個定時任務
#每分鐘執行任務
@periodic_task(run_every=crontab(minute="*/1"))
def check_ping():
  print 'Pong'

  

 
更多詳細的配置請參考官方文檔。
http://docs.celeryproject.org/en/latest/index.html
相關文章
相關標籤/搜索