討論一個定時任務,通常而言,須要的功能以下:html
下面分別介紹celery的這些功能實現。python
celery的task基礎類是tasks.Task()linux
綁定表明第一個參數默認是selfredis
logger = get_task_logger(__name__) @task(bind=True) def add(self, x, y): logger.info(self.request.id)
須要注意的是聲明的位置,是在把方法修飾成Task類時聲明。json
@app.task(base=MyTask) def add(x, y): #raise KeyError return x + y
每一個task實例都有一個非重複的名字,譬以下例:
app
@app.task(name='tasks.mul')
def mul(x, y):
通常沒必要要使用這一功能,特別是在task方法放在單獨的module中時,默認name就是module name+方法名(celery_tasks.mul)。
儘可能不要把任務模塊命名爲tasks.py,命名爲celery_1.py更好一些。ide
Task.max_retries 最大重試次數
Task.default_retry_delay 默認重試等待時間。
Task.ignore_result 拋棄結果,意味着不能經過 AsyncResult查看結果。函數
文檔:http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classesui
主要有四種,包括失敗/成功/重試/完成
on_failure on_success on_retry after_returnurl
# celery_tasks.py class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task done: {0}'.format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task fail, reason: {0}'.format(exc) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=MyTask) def add(x, y): return x + y
app.Task.retry() 是實現重試的方法。
# an example of using retry: @app.task(bind=True) def send_twitter_status(self, oauth, tweet): try: twitter = Twitter(oauth) twitter.update_status(tweet) except (Twitter.FailWhaleError, Twitter.LoginError) as exc: raise self.retry(exc=exc)
能夠指定重試間隔時間,默認爲180秒,下面案例指定爲1800秒。
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
關於最大重試次數等參數是在task實例中指定。
文檔:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
celery beat is a scheduler。
默認的參數源自beat_schedule。
調度器默認使用UTC時區,固然須要修改:
timezone = 'Europe/London'
或 app.conf.timezone = 'Europe/London'
有兩種添加定時任務的方式,裝飾器和配置文件。
經常使用配置文件方式。
celery_config.py # 配置文件
!/usr/bin/env python
coding:utf-8
"""
celery configure"""
author = 'sss'
from future import absolute_import
from celery.schedules import crontab
from datetime import timedelta使用redis存儲任務隊列及結果
broker_url = 'redis://:123@192.168.199.113:6379/0'
result_backend = 'redis://:123@192.168.199.113:6379/1'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']時區
timezone = 'Asia/Shanghai'
celery默認開啓本身的日誌
False表示不關閉
worker_hijack_root_logger = False
存儲結果過時時間,過時後自動刪除
單位爲秒
result_expires = 60 * 60 * 24
導入任務所在文件
imports = [
'celery_tasks',]定時任務配置
beat_schedule = {
'test1': {
# 具體須要執行的函數
# 該函數必需要使用@app.task裝飾
'task': 'celery_tasks.test1_run',
# 定時時間
# 每分鐘執行一次,不能爲小數
'schedule': crontab(minute='/1'),
# 或者這麼寫,每小時執行一次
# "schedule": crontab(minute=0, hour="/1")
# 執行的函數須要的參數
'args': ()
},
'test2': {
'task': 'celery_tasks.test2_run',
# 設置定時的時間,10秒一次
'schedule': timedelta(seconds=10),
'args': ()
}}
celery_workder.py # 主文件
from future import absolute_import拒絕隱式引入,若是celery.py和celery模塊名字同樣,避免衝突,須要加上這條語句
該代碼中,名字是不同的,最好也要不同
celery test -- worker
from celery import Celery
def create_worker():
# app = Celery('tasks', broker=di)
'''app = Celery('tasks',
#backend=di_backend,
broker=di_broker,
include=['celery_tasks'])
'''
app = Celery()
#app.conf.update(result_expires=3600,)
app.config_from_object('celery_config')
return appapp = create_worker()
celery_tasks.py # task方法文件
from celery_worker import app
from celery.task import Task
import timetasks.py
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print('task done: {0}'.format(retval))
return super(MyTask, self).on_success(retval, task_id, args, kwargs)@app.task(base=MyTask)
def add(x, y):
#raise KeyError
print('wwww')
return x + y@app.task
def mul(x, y):
return x * y@app.task
def xsum(numbers):
return sum(numbers)@app.task
def test(arg):
print(arg)def test11():
time.sleep(1)
print('test11')....略
def on_failure(self, exc, task_id, args, kwargs, einfo): print('task fail, reason: {0}'.format(exc)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
運行-發佈任務
celery -A celery_worker beat
運行-執行任務
celery -A celery_worker -l info -P eventlet
在上一章節中給出的案例是在配置文件中寫入參數beat_schedule;
有時這樣不太方便,須要更靈活的添加定時任務;
def setup_periodic_tasks(sender, **kwargs): # 每5s調用 test('hello') sender.add_periodic_task(5.0, test.s('hello'), name='add every 5') # 每20s調用 test('world') sender.add_periodic_task(10.0, test.s('world'), expires=7) # 每週一早上7:30 執行 test('Happy Mondays!') sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改 test.s('Happy Mondays!'), ) setup_periodic_tasks(app) print('定時任務列表', app.conf.beat_schedule)
執行命令celery -A celery_test beat -l debug
能夠比較一下定時任務列表的輸出。
沒有添加任務:
(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定時任務列表 {}
celery beat v4.3.0 (rhubarb) is starting.
添加任務以後:
(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定時任務列表 # 已格式化
{
"add every 5": {
"schedule": 5.0,
"task": "celery_tasks.test",
"args": [
"hello"
],
"kwargs": {},
"options": {}
},
"celery_tasks.test('world')": {
"schedule": 10.0,
"task": "celery_tasks.test",
"args": [
"world"
],
"kwargs": {},
"options": {
"expires": 7
}
},
"celery_tasks.test('Happy Mondays!')": {
"schedule": "<crontab: 30 7 1 * * (m/h/d/dM/MY)>",
"task": "celery_tasks.test",
"args": [
"HappyMondays!"
],
"kwargs": {},
"options": {}
}
}
相關啓動命令:
celery -A celery_worker worker -l info -P eventlet --logfile=c.log
celery -A celery_test beat -l debug
發佈任務
celery -A celery_task beat
執行任務
celery -A celery_task worker -l info -P eventlet
後臺啓動celery worker進程
celery multi start work_1 -A appcelery
中止worker進程,若是沒法中止,加上-A
celery multi stop WORKNAME
重啓worker進程
celery multi restart WORKNAME
查看進程數
celery status -A celery_task
複雜的定時功能可使用crontab功能,它跟linux自帶的crontab所支持的格式是同樣的,很是方便定製任務執行時間。
from celery.schedules import crontab app.conf.beat_schedule = { # Executes every Monday morning at 7:30 a.m. 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }
上面的案例是在每一個週一的7:30執行tasks.add任務。