Django的celery配置(包括定時任務、隊列)

1、安裝celery

Django項目不須要安裝celery這個包,能夠直接使用django-celery這個包,,先來安裝它,在終端中輸入:python

pip install django-celery

2、安裝rabbitmq,創建celery隊列

我作的項目用的就是rabbitmq,按道理來講,也是能夠用redis做爲消息隊列的,可是rabbitmq更好,此處不作詳細解釋,有興趣的同窗的能夠去研究下。 ubuntu環境下,在終端中輸入:redis

sudo apt-get install rabbitmq-server

3、配置settings.py

首先要在INSTALLED_APPS中添加對djcelery的引用shell

INSTALLED_APPS = [
    'decelery',
]

再在settings.py中添加如下代碼django

import djcelery
djcelery.setup_loader()

BROKER_URL = 'amqp://guest:guest@localhost:5672/'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

4、添加celery異步任務

在common/tasks/task1.py中添加如下代碼,就定義了celery異步任務。 由於celery任務可能會不少,爲了便於管理,咱們就在項目下的common/tasks文件夾中建立了許多task.py文件,如task1.py、task2.py等。 在task1.py中添加如下代碼:ubuntu

from celery.task import task

@task
def function1(a, b):
    print a + b

注意,想讓這個celery任務能執行,還必需要在settings.py中加一段配置,這個在上面已經添加了,這裏再特別提醒下,以下:服務器

# common、tasks是文件夾,task一、task2是tasks文件夾下面的py文件
# CELERY_IMPORTS:是導入目標任務文件
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

要調用這個異步任務的話,就用app

from common.tasks.task1 import function

function.delay(a, b)

是否是很方便?工具的好處就是把事情簡單化,可是太依賴工具,不懂的底層原理的話容易把人搞傻。 好比其實還能夠用threading來建立新線程來執行異步任務,此處再也不贅述,不然又是一套長篇大論,有興趣的童鞋能夠去學習一下如何使用threading庫。frontend

5、啓動celery

由於咱們這裏用的是django-celery,而不是直接使用celery這個庫,因此啓動celery的命令與celery官網裏面介紹的是不同的,新人極可能由於這個掉進坑裏,因此這裏我特別提醒一下。 終端命令以下:異步

# 先啓動服務器
python manage.py runserver
# 再啓動worker(--concurrency=2是開4個worker進程,不加也能夠啓動,只不過在生產環境仍是應該多開幾個進程的)
python manage.py celery worker --concurrency=2 -l info

6、celery定時任務的配置

在項目中有時不只僅使用以上的異步任務,有時候須要建立不少定時任務,這樣celery又能夠大顯身手了。在settings.py中添加如下配置,就能夠添加定時任務函數

from celery.schedules import crontab

# 下方的common和tasks依然是文件夾
# function二、function3分別是tasks文件夾中的task1.py、task2.py文件的函數的函數名
CELERYBEAT_SCHEDULE = {
    'function2': {
        'task': 'common.tasks.task1',
        'schedule': crontab(minute='*/50'), # 每50分鐘執行一次
    },
    'function3': {
        'task': 'common.tasks.task2',
        'schedule': crontab(minute=0, hour='8,13'), # 天天的8點0分和13點0分各執行一次
    },
}

下面是common/tasks/task1.py中的函數

from celery.task import task

@task
def function2():
    print '='*40
    print 'This is function2, celery is great!'
    print '='*40

下面是common/tasks/task2.py中的函數,跟task1.py中是同樣的使用方法。

from celery.task import task

@task
def function3():
    print '='*40
    print 'This is function3, celery is great!'
    print 'Fuck celery start failure!'
    print '='*40

7、啓動celery定時任務

得先啓動第六節中介紹的命令後,再執行這個命令,定時任務才能執行。由於beat只是分配定時任務給celery的worker,因此只有worker啓動後,定時任務才能異步執行。 顧名思義,worker就是幹苦力的民工,累活髒或都給它幹。beat能夠形象的理解爲工地作計劃的人,到了要幹活的時候就分配任務給民工,多是包工頭,也多是通常的管理工程進度的小弟。反正都是苦命的人,屌絲何須難爲屌絲。 (╥╯^╰╥) 終端命令以下:

python manage.py celery beat -l info

8、celery隊列的配置

在項目中celery的異步任務不少的時候,這個時候咱們就須要將不一樣的任務分配到不一樣的隊列(queue)中去執行,若是隻有一個默認隊列的話,全部異步任務都會在這個隊列中執行(是須要排隊的,先來的先執行),任務不少的時候,就無法同時執行不少任務了,甚至形成任務的擁堵。將不一樣的任務分配到不一樣的隊列就能夠保證同一時刻能夠同時運行不一樣隊列中的任務,互補干擾,而且每一個隊列能夠單獨開好幾個進程。 進程數最好不要超過CPU的核數,由於CPU只有4個核的話,你開5個進程,同一時間仍是隻能執行4個進程。

咱們能夠在項目中設置三個隊列(default, frontend, backend),隊列的名字能夠本身任意取。如下是在settings.py中添加隊列的配置:

from kombu import Exchange, Queue

# 默認隊列是default
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# x-priority是任務的優先級
# 優先級就是哪一個隊列優先執行,比較緊迫的須要立刻執行的任務優先級能夠設置爲最高
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default', consumer_arguments={'x-priority': 5}),
    Queue('frontend', Exchange('frontend'), routing_key='frontend', consumer_arguments={'x-priority': 10}),
    Queue('backend', Exchange('backend'), routing_key='backend', consumer_arguments={'x-priority': 8}),
)

# 特別須要注意的是,異步任務的路徑必須精確到函數名(好比下方的common、tasks是文件夾,task一、task2是py文件,function1就是task1.py中的定義的異步任務的函數名),否則的話異步任務就無法執行
CELERY_ROUTES = {
    "common.tasks.task1.function1": {'queue': "frontend", 'routing_key': 'frontend'},
    "common.tasks.task1.function2": {'queue': "backend", 'routing_key': 'backend'},
    "common.tasks.task2.function3": {'queue': "default", 'routing_key': 'default'},
}

9、啓動celery隊列

配置了隊列的話,若是執行python manage.py celery worker --concurrency=2 -l info的話就只會建立一個默認的隊列,而咱們須要建立多個隊列,這樣咱們就不須要運行這個命令了,咱們須要在終端中分別運行如下命令:

# -Q 後面加的是配置的隊列名,concurrency(進程數)設置爲幾就由本身定了,只要不超過CPU核數就好了
python manage.py celery worker -l info -Q default --concurrency=1
python manage.py celery worker -l info -Q frontend --concurrency=2
python manage.py celery worker -l info -Q backend --concurrency=4

10、補充

Django下要查看其餘celery的命令,包括參數配置、啓動多worker進程的方式均可以經過python manage.py celery --help來查看,一下是終端輸入命令後出來的提示信息:

Usage: manage.py celery <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
  -b BROKER, --broker=BROKER
                        url to broker.  default is 'amqp://guest@localhost//'
  --loader=LOADER       name of custom loader class to use.
  --config=CONFIG       Name of the configuration module
  --workdir=WORKING_DIRECTORY
                        Optional directory to change to after detaching.
  -C, --no-color        
  -q, --quiet           
  --version             show program's version number and exit
  -h, --help            show this help message and exit

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active
|    celery inspect active_queues
|    celery inspect clock
|    celery inspect conf None
|    celery inspect memdump
|    celery inspect memsample
|    celery inspect objgraph None
|    celery inspect ping
|    celery inspect registered
|    celery inspect report
|    celery inspect reserved
|    celery inspect revoked
|    celery inspect scheduled
|    celery inspect stats
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max] [min]
|    celery control cancel_consumer <queue>
|    celery control disable_events
|    celery control enable_events
|    celery control pool_grow [N=1]
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery migrate
|    celery call
|    celery result
|    celery report
---- -- - - --------- -- - -------------- --- ------------

Type 'celery <command> --help' for help using a specific command.
相關文章
相關標籤/搜索