djcelery入門:實現運行定時任務


基於Django與Celery實現異步對列任務 - Python - 伯樂在線 http://python.jobbole.com/81953/ 更新於2015-08-26php


注:本文根據官方文檔結合具體例子整理,供celery入門學習,更多內容請移步參考資料一節。最近更新於2014-10-19
html

1 什麼是celery

celery是一個異步任務隊列/基於分佈式消息傳遞的做業隊列。Celery經過消息(message)進行通訊,使用代理(broker)在客戶端和工做執行者之間進行交互。當開始一個任務時,客戶端發送消息到隊列並由代理將其發往響應的工做執行者處。python

2 Windows環境配置

在這裏使用RabbitMQ做爲消息代理(broker),Django數據庫做爲結果存儲(ResultStore)。git

2.1 安裝ERLang

  1. 首先是到ERLang官網去下載ERlang可執行文件  地址:http://www.erlang.org/download.htmlgithub

  2. 而後安裝ERLang。ajax

  3. 而後設置ERLang的環境變量。redis

  4. 在環境變量中加入 ERL_HOME = erlang安裝目錄數據庫

  5. 在path中添加 %ERL_HOME%\bindjango


2.2 安裝rabbitmq

http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5.exe下載rabbitmq-server 並安裝。windows

 點擊開始菜單中的 rabbitmq-start ,rabbitmq-server就啓動了,在管理工具-服務中能夠看到相關信息的。

2.3 安裝djcelery

和大多數Python第三方包同樣,用 pip安裝celery和djcelery兩個包。djcelery依賴於djcelery,因此只要執行pip install djcelery命令便可。

3 配置Djcelery

主要步驟

  1. 在settings配置相關參數

  2. 定義任務

  3. 執行任務,能夠在程序中調用執行,也可交給後臺週期性執行

3.1 基本配置

下面是djcelery的有關配置,定義在Django項目的settings模塊內。

#... ... 
 
INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    # Uncomment the next line to enable the admin:
    'django.contrib.admin',
    # Uncomment the next line to enable admin documentation:
    # 'django.contrib.admindocs',
    'djcelery',  #添加djcelery
     'mrs_app',  #本身的APP
) 
 
# ... .... 
# 配置djcelery相關參數,ResultStore默認存儲在數據庫可沒必要重寫 ,
import djcelery
djcelery.setup_loader()
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
#任務定義所在的模塊
CELERY_IMPORTS = ('mrs_app.my_celery.tasks', )
# 使用和Django同樣的時區
CELERY_TIMEZONE = TIME_ZONE

#以上爲基本配置,如下爲週期性任務定義,以celerybeat_開頭的  

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

#CELERYBEAT_SCHEDULE = {
#    'add-every-3-minutes': {
#        'task': 'mrs_app.my_celery.tasks.monthly_reading_task',
#        'schedule': timedelta(minutes=3)
#    },
#}

首先導出djcelery模塊,並調用setup_loader方法加載有關配置。

broker_url 類URL格式代表使用哪一種消息代理 格式參見這裏
celery_imports 任務定義在哪一個模塊,最好以tasks.py命名
celery_timezone 時間時區,等於TIMEZONE即和Django同樣的時區。
celerybeat_scheduler 週期性任務,用Django數據庫保存最後運行時間等信息
celerybeat_schedule 定義任務,上面註釋表示每隔3分鐘執行monthly_reading_task任務

 3.2 定義任務

有兩種格式

  • 類定義:一個繼承了celery.app.task的類並實現了run方法

  • 函數定義:@task裝飾的函數

經過實現task相關方法能夠實現更多的邏輯,好比成功回調、錯誤處理、重試機制等,如下是最基本的定義方式。

#mrs_app.my_celery.tasks.py

from celery import task

#第一種,函數方式  
 @task(name='monthly_reading')
def monthly_reading_task():
    task_obj = MonthlyReading(debug=False)
    task_obj.start()
    
#第二種,類定義
class MonthlyReadingTask(Task):
    name='monthly_reading'
    def run(*args, **kwargs): 
        task_obj = MonthlyReading(debug=False)
        task_obj.start()

3.3 啓動

啓動 python manage.py celery worker -l info

若是有定時任務的話,還須要啓動心跳

 另開一個cmd窗口 python manage.py celery beat  (windows下-B選項不可用)

4 執行任務

4.1 直接調用

本身在代碼中的調用,支持延遲/同步/異步調用,可參考task類定義,例子見參考資料的《使用django+celery+RabbitMQ實現異步執行》。

4.2 週期性調用

這種由djcelery調用,因此須要在settings.CELERYBEAT_SCHEDULER設置一個調度器,這裏使用數據庫。

djcelery提供了一些Model(定義在djcelery/models.py文件),數據庫模型以下,

periodictask

描述定時任務。重要字段有:

name: 字符串,標識符

task,字符串,任務函數/類所在的路徑,通常是celery_imports + function name。

interval:外鍵指向intervalschedule,表示每隔多少時間執行

crontab:外鍵指向crontabschedule,表示在某一時刻執行。

enabled:是否生效


intervalschedule

表示時間間隔,有兩個參數:

every:正數;period間隔單位。好比intervalschedule(every=2, period='day')表示每隔2天


crontabschedule 表示某一時刻,有minute、hour、day_of-week、day_of_month、day_of_year它們的組合意義,參見cron時間表示法,好比0 0 * 10 * 表示每月的10號凌晨。

說明:

  • 任務和定時任務的區別:定時任務 = 任務 + intervalschedule/crontabschedule 兩個定時任務能夠執行同一個任務。

  • 任務沒有相應的Model,用字符串表示,即periodictask模型的task字段

  • 定時任務有相應的Model即periodictask。

djcelery在初始化中主要完成兩件:

在settings.CELERY_IMPORTS定義下的模塊搜索全部任務。這個對數據庫沒有任何改變,只是用Admin添加定時任務時periodictask.task字段變成選擇框,列出了全部定義的任務。

從settings.CELERYBEAT_SCHEDULE建立定時任務,這個會建立數據記錄,至關於celery_models.PeriodicTask.objects.create(..)語句。

4.3 建立定時任務

經過它提供的Model Query API來操做,同日常的數據庫查詢同樣。

from djcelery import models as celery_models

celery_models.PeriodicTask.objects.create(...)
celery_models.PeriodicTask.ojects.get(name='add')
....

djcelery提供了admin管理界面,訪問http://localhost:8000/admin/djcelery/ 便可,在這裏能夠對定時任務進行增刪改查,具體和Django admin同樣。

(圖待補充)

注:當咱們修改任務的設置後,好比關閉、更改時間後不用重啓celery服務等。

4.4 示例

admin畢竟是給後臺管理人員使用的,它全部的參數都暴露給使用者了。下面是一個實際使用的例子。

需求:ajax實現月度定時任務monthly_reading_task的執行和控制,即

每月的某一天執行該任務;能夠選擇開啓或者關閉該定時任務;可以選擇任務在哪一天(1-28日)執行。

界面看起來是這樣的:

基本上就是Model的增刪改查。就不是經過admin來操做了。

查詢任務信息

    def read(self, request, *args, **kwargs):
        try:
            task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME)
            if task.enabled:
                return {
                    'enabled': True,
                    'day_of_month': int(task.crontab.day_of_month),
                    'last_run_at': task.last_run_at if task.last_run_at else '0'
                }
            else:
                return {'enabled': False}
        except celery_models.PeriodicTask.DoesNotExist:
            return {'enabled': False}

更新日期

    def create(self, request, *args, **kwargs):
        enabled = request.POST.get('enabled', None)
        if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]:
            return self.operate_fail('無效參數')
        if enabled == self.DISABLED_POST_VALUE:
            self.disable_task(self.TASK_NAME)
            return self.operate_success()
        else:
            try:
                day_of_month = int(request.POST.get('day_of_month', ''))
                if day_of_month > 28 or day_of_month < 1:
                    return self.operate_fail('日期必須在1-28日之間')
                task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading",
                                                                                 task="mrs_app.my_celery.tasks.monthly_reading_task")
                if created:
                    crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month,
                                                                           hour=0,
                                                                           minute=0)
                    crontab.save()
                    task.crontab = crontab
                    task.enabled = True
                    task.save()
                else:
                    task.crontab.day_of_month = day_of_month
                    task.crontab.save()
                    task.enabled = True
                    task.save()
                return self.operate_success()
            except ValueError:
                return self.operate_fail('抄表日不能爲空')

關閉定時

    def disable_task(self, name):
        try:
            task = celery_models.PeriodicTask.objects.get(name=name)
            task.enabled = False
            task.save()
            return True
        except celery_models.PeriodicTask.DoesNotExist:
            return True

後臺控制界面



5 參考資料&進階

相關文章
相關標籤/搜索