基於Django與Celery實現異步對列任務 - Python - 伯樂在線 http://python.jobbole.com/81953/ 更新於2015-08-26php
注:本文根據官方文檔結合具體例子整理,供celery入門學習,更多內容請移步參考資料一節。最近更新於2014-10-19
html
celery是一個異步任務隊列/基於分佈式消息傳遞的做業隊列。Celery經過消息(message)進行通訊,使用代理(broker)在客戶端和工做執行者之間進行交互。當開始一個任務時,客戶端發送消息到隊列並由代理將其發往響應的工做執行者處。python
在這裏使用RabbitMQ做爲消息代理(broker),Django數據庫做爲結果存儲(ResultStore)。git
首先是到ERLang官網去下載ERlang可執行文件 地址:http://www.erlang.org/download.htmlgithub
而後安裝ERLang。ajax
而後設置ERLang的環境變量。redis
在環境變量中加入 ERL_HOME = erlang安裝目錄數據庫
在path中添加 %ERL_HOME%\bindjango
從http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5.exe下載rabbitmq-server 並安裝。windows
點擊開始菜單中的 rabbitmq-start ,rabbitmq-server就啓動了,在管理工具-服務中能夠看到相關信息的。
和大多數Python第三方包同樣,用 pip安裝celery和djcelery兩個包。djcelery依賴於djcelery,因此只要執行pip install djcelery命令便可。
主要步驟
在settings配置相關參數
定義任務
執行任務,能夠在程序中調用執行,也可交給後臺週期性執行
下面是djcelery的有關配置,定義在Django項目的settings模塊內。
#... ... INSTALLED_APPS = ( 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.sites', 'django.contrib.messages', 'django.contrib.staticfiles', # Uncomment the next line to enable the admin: 'django.contrib.admin', # Uncomment the next line to enable admin documentation: # 'django.contrib.admindocs', 'djcelery', #添加djcelery 'mrs_app', #本身的APP ) # ... .... # 配置djcelery相關參數,ResultStore默認存儲在數據庫可沒必要重寫 , import djcelery djcelery.setup_loader() BROKER_URL = 'amqp://guest:guest@localhost:5672//' #任務定義所在的模塊 CELERY_IMPORTS = ('mrs_app.my_celery.tasks', ) # 使用和Django同樣的時區 CELERY_TIMEZONE = TIME_ZONE #以上爲基本配置,如下爲週期性任務定義,以celerybeat_開頭的 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' #CELERYBEAT_SCHEDULE = { # 'add-every-3-minutes': { # 'task': 'mrs_app.my_celery.tasks.monthly_reading_task', # 'schedule': timedelta(minutes=3) # }, #}
首先導出djcelery模塊,並調用setup_loader方法加載有關配置。
broker_url | 類URL格式代表使用哪一種消息代理 | 格式參見這裏 |
celery_imports | 任務定義在哪一個模塊,最好以tasks.py命名 | |
celery_timezone | 時間時區,等於TIMEZONE即和Django同樣的時區。 | |
celerybeat_scheduler | 週期性任務,用Django數據庫保存最後運行時間等信息 | |
celerybeat_schedule | 定義任務,上面註釋表示每隔3分鐘執行monthly_reading_task任務 |
有兩種格式
類定義:一個繼承了celery.app.task的類並實現了run方法
函數定義:@task裝飾的函數
經過實現task相關方法能夠實現更多的邏輯,好比成功回調、錯誤處理、重試機制等,如下是最基本的定義方式。
#mrs_app.my_celery.tasks.py from celery import task #第一種,函數方式 @task(name='monthly_reading') def monthly_reading_task(): task_obj = MonthlyReading(debug=False) task_obj.start() #第二種,類定義 class MonthlyReadingTask(Task): name='monthly_reading' def run(*args, **kwargs): task_obj = MonthlyReading(debug=False) task_obj.start()
啓動 python manage.py celery worker -l info
若是有定時任務的話,還須要啓動心跳
另開一個cmd窗口 python manage.py celery beat (windows下-B選項不可用)
本身在代碼中的調用,支持延遲/同步/異步調用,可參考task類定義,例子見參考資料的《使用django+celery+RabbitMQ實現異步執行》。
這種由djcelery調用,因此須要在settings.CELERYBEAT_SCHEDULER設置一個調度器,這裏使用數據庫。
djcelery提供了一些Model(定義在djcelery/models.py文件),數據庫模型以下,
periodictask | 描述定時任務。重要字段有: name: 字符串,標識符 task,字符串,任務函數/類所在的路徑,通常是celery_imports + function name。 interval:外鍵指向intervalschedule,表示每隔多少時間執行 crontab:外鍵指向crontabschedule,表示在某一時刻執行。 enabled:是否生效 |
|
intervalschedule | 表示時間間隔,有兩個參數: every:正數;period間隔單位。好比intervalschedule(every=2, period='day')表示每隔2天 |
|
crontabschedule | 表示某一時刻,有minute、hour、day_of-week、day_of_month、day_of_year它們的組合意義,參見cron時間表示法,好比0 0 * 10 * 表示每月的10號凌晨。 |
說明:
任務和定時任務的區別:定時任務 = 任務 + intervalschedule/crontabschedule 。兩個定時任務能夠執行同一個任務。
任務沒有相應的Model,用字符串表示,即periodictask模型的task字段
定時任務有相應的Model即periodictask。
djcelery在初始化中主要完成兩件:
在settings.CELERY_IMPORTS定義下的模塊搜索全部任務。這個對數據庫沒有任何改變,只是用Admin添加定時任務時periodictask.task字段變成選擇框,列出了全部定義的任務。
從settings.CELERYBEAT_SCHEDULE建立定時任務,這個會建立數據記錄,至關於celery_models.PeriodicTask.objects.create(..)語句。
經過它提供的Model Query API來操做,同日常的數據庫查詢同樣。
from djcelery import models as celery_models celery_models.PeriodicTask.objects.create(...) celery_models.PeriodicTask.ojects.get(name='add') ....
djcelery提供了admin管理界面,訪問http://localhost:8000/admin/djcelery/ 便可,在這裏能夠對定時任務進行增刪改查,具體和Django admin同樣。
(圖待補充)
注:當咱們修改任務的設置後,好比關閉、更改時間後不用重啓celery服務等。
admin畢竟是給後臺管理人員使用的,它全部的參數都暴露給使用者了。下面是一個實際使用的例子。
需求:ajax實現月度定時任務monthly_reading_task的執行和控制,即
每月的某一天執行該任務;能夠選擇開啓或者關閉該定時任務;可以選擇任務在哪一天(1-28日)執行。
界面看起來是這樣的:
基本上就是Model的增刪改查。就不是經過admin來操做了。
查詢任務信息
def read(self, request, *args, **kwargs): try: task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME) if task.enabled: return { 'enabled': True, 'day_of_month': int(task.crontab.day_of_month), 'last_run_at': task.last_run_at if task.last_run_at else '0' } else: return {'enabled': False} except celery_models.PeriodicTask.DoesNotExist: return {'enabled': False}
更新日期
def create(self, request, *args, **kwargs): enabled = request.POST.get('enabled', None) if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]: return self.operate_fail('無效參數') if enabled == self.DISABLED_POST_VALUE: self.disable_task(self.TASK_NAME) return self.operate_success() else: try: day_of_month = int(request.POST.get('day_of_month', '')) if day_of_month > 28 or day_of_month < 1: return self.operate_fail('日期必須在1-28日之間') task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading", task="mrs_app.my_celery.tasks.monthly_reading_task") if created: crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month, hour=0, minute=0) crontab.save() task.crontab = crontab task.enabled = True task.save() else: task.crontab.day_of_month = day_of_month task.crontab.save() task.enabled = True task.save() return self.operate_success() except ValueError: return self.operate_fail('抄表日不能爲空')
關閉定時
def disable_task(self, name): try: task = celery_models.PeriodicTask.objects.get(name=name) task.enabled = False task.save() return True except celery_models.PeriodicTask.DoesNotExist: return True
後臺控制界面
官方文檔:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
cron時間格式:http://www.nncron.ru/help/EN/working/cron-format.htm
celery最佳實踐:http://my.oschina.net/siddontang/blog/284107
使用django+celery+RabbitMQ實現異步執行:http://blog.charlee.li/django-celery-rabbitmq-intro/
RabbitMQ文檔: http://www.rabbitmq.com/documentation.html
Django/Celery Quickstart (or, how I learned to stop using cron and love celery) http://chase-seibert.github.io/blog/2010/07/09/djangocelery-quickstart-or-how-i-learned-to-stop-using-cron-and-love-celery.html
Using Celery to handle asynchronous tasks in Django – Sebastian Dahlgren http://sebastiandahlgren.se/2012/11/13/using-celery-for-asynchronous-messages-in-django/
Django-celery配置及使用指南 http://zhujinliang.cn/django/2013/07/18/Django-celery%E9%85%8D%E7%BD%AE%E5%8F%8A%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97/
Introducing Celery for Python+Django - LINUX For You http://www.opensourceforu.com/2013/12/introducing-celery-pythondjango/
Django中如何使用django-celery完成異步任務 (1) | 上海味股達信息科技有限公司 http://www.weiguda.com/blog/73/
Django Celery Architecture | langoor.mobi Blog http://blog.langoor.mobi/django-celery-redis-vs-rabbitmq-message-broker/django_celery_architecture/
Queueing Messages using Celery with RabbitMQ Message Broker Server - 2014 http://www.bogotobogo.com/python/RabbitMQ_Celery/python_Queueing%20using_Celery_with_RabbitMQ_Message_Broker_Server.php
AMQP, RabbitMQ and Celery - A Visual Guide For Dummies | Abhishek Tiwari http://abhishek-tiwari.com/post/amqp-rabbitmq-and-celery-a-visual-guide-for-dummies
Tracing Celery Performance For Web Applications - 推酷 http://www.tuicool.com/articles/ZbyUnu