celery task - 2

# celery task

前言

討論一個定時任務,通常而言,須要的功能以下:html

  1. 封裝成對象,獨立執行;
  2. 對象有一些接口,便於瞭解它的狀態;
  3. 定時調用;
  4. 行爲控制,包括重試,成功/失敗回調等;

下面分別介紹celery的這些功能實現。python

1.task basic

celery的task基礎類是tasks.Task()linux

1.1 bound tasks

綁定表明第一個參數默認是selfredis

logger = get_task_logger(__name__)

@task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)

1.2 Task類繼承

須要注意的是聲明的位置,是在把方法修飾成Task類時聲明。json

@app.task(base=MyTask)
def add(x, y):
    #raise KeyError
    return x + y

1.3 names

每一個task實例都有一個非重複的名字,譬以下例:
app


@app.task(name='tasks.mul')
def mul(x, y):

通常沒必要要使用這一功能,特別是在task方法放在單獨的module中時,默認name就是module name+方法名(celery_tasks.mul)。
儘可能不要把任務模塊命名爲tasks.py,命名爲celery_1.py更好一些。ide

1.4 其它屬性

Task.max_retries 最大重試次數
Task.default_retry_delay 默認重試等待時間。
Task.ignore_result 拋棄結果,意味着不能經過 AsyncResult查看結果。函數

2.task 自定義任務行爲

文檔:http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classesui

主要有四種,包括失敗/成功/重試/完成
on_failure on_success on_retry after_returnurl

# celery_tasks.py
class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print 'task done: {0}'.format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print 'task fail, reason: {0}'.format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
    return x + y

2.1 retry

app.Task.retry() 是實現重試的方法。

# an example of using retry:
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

能夠指定重試間隔時間,默認爲180秒,下面案例指定爲1800秒。


@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):

關於最大重試次數等參數是在task實例中指定。

3. 定時執行 periodic task

文檔:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

celery beat is a scheduler。

默認的參數源自beat_schedule。

3.1 時區

調度器默認使用UTC時區,固然須要修改:
timezone = 'Europe/London'
或 app.conf.timezone = 'Europe/London'

3.2 entries

有兩種添加定時任務的方式,裝飾器和配置文件。
經常使用配置文件方式。

3.2.1 從配置文件中讀取定時任務

celery_config.py # 配置文件

!/usr/bin/env python

coding:utf-8

"""
celery configure

"""

author = 'sss'

from future import absolute_import
from celery.schedules import crontab
from datetime import timedelta

使用redis存儲任務隊列及結果

broker_url = 'redis://:123@192.168.199.113:6379/0'
result_backend = 'redis://:123@192.168.199.113:6379/1'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']

時區

timezone = 'Asia/Shanghai'

celery默認開啓本身的日誌

False表示不關閉

worker_hijack_root_logger = False

存儲結果過時時間,過時後自動刪除

單位爲秒

result_expires = 60 * 60 * 24

導入任務所在文件

imports = [
'celery_tasks',]

定時任務配置

beat_schedule = {
'test1': {
# 具體須要執行的函數
# 該函數必需要使用@app.task裝飾
'task': 'celery_tasks.test1_run',
# 定時時間
# 每分鐘執行一次,不能爲小數
'schedule': crontab(minute='/1'),
# 或者這麼寫,每小時執行一次
# "schedule": crontab(minute=0, hour="
/1")
# 執行的函數須要的參數
'args': ()
},
'test2': {
'task': 'celery_tasks.test2_run',
# 設置定時的時間,10秒一次
'schedule': timedelta(seconds=10),
'args': ()
}}

celery_workder.py # 主文件


from future import absolute_import

拒絕隱式引入,若是celery.py和celery模塊名字同樣,避免衝突,須要加上這條語句

該代碼中,名字是不同的,最好也要不同

celery test -- worker

from celery import Celery

def create_worker():
# app = Celery('tasks', broker=di)
'''app = Celery('tasks',
#backend=di_backend,
broker=di_broker,
include=['celery_tasks'])
'''
app = Celery()
#app.conf.update(result_expires=3600,)
app.config_from_object('celery_config')
return app

app = create_worker()

celery_tasks.py # task方法文件

from celery_worker import app
from celery.task import Task
import time

tasks.py

class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print('task done: {0}'.format(retval))
return super(MyTask, self).on_success(retval, task_id, args, kwargs)


@app.task(base=MyTask)
def add(x, y):
#raise KeyError
print('wwww')
return x + y

@app.task
def mul(x, y):
return x * y

@app.task
def xsum(numbers):
return sum(numbers)

@app.task
def test(arg):
print(arg)

def test11():
time.sleep(1)
print('test11')

....略

def on_failure(self, exc, task_id, args, kwargs, einfo): print('task fail, reason: {0}'.format(exc)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

運行-發佈任務
celery -A celery_worker beat

運行-執行任務
celery -A celery_worker -l info -P eventlet

3.2.2 裝飾器添加任務-動態添加

在上一章節中給出的案例是在配置文件中寫入參數beat_schedule;
有時這樣不太方便,須要更靈活的添加定時任務;

def setup_periodic_tasks(sender, **kwargs):
    # 每5s調用 test('hello')
    sender.add_periodic_task(5.0, test.s('hello'), name='add every 5')

    # 每20s調用 test('world')
    sender.add_periodic_task(10.0, test.s('world'), expires=7)

    # 每週一早上7:30 執行 test('Happy Mondays!')
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1), # 可靈活修改
        test.s('Happy Mondays!'),
    )


setup_periodic_tasks(app)
print('定時任務列表', app.conf.beat_schedule)

執行命令celery -A celery_test beat -l debug

能夠比較一下定時任務列表的輸出。
沒有添加任務:


(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定時任務列表 {}
celery beat v4.3.0 (rhubarb) is starting.

添加任務以後:


(vir_venv) E:\python\code_2>celery -A celery_test beat -l debug
定時任務列表 # 已格式化
{
"add every 5": {
"schedule": 5.0,
"task": "celery_tasks.test",
"args": [
"hello"
],
"kwargs": {},
"options": {}
},
"celery_tasks.test('world')": {
"schedule": 10.0,
"task": "celery_tasks.test",
"args": [
"world"
],
"kwargs": {},
"options": {
"expires": 7
}
},
"celery_tasks.test('Happy Mondays!')": {
"schedule": "<crontab: 30 7 1 * * (m/h/d/dM/MY)>",
"task": "celery_tasks.test",
"args": [
"HappyMondays!"
],
"kwargs": {},
"options": {}
}
}

相關啓動命令:
celery -A celery_worker worker -l info -P eventlet --logfile=c.log
celery -A celery_test beat -l debug

4. celery相關命令

發佈任務
celery -A celery_task beat

執行任務
celery -A celery_task worker -l info -P eventlet

後臺啓動celery worker進程
celery multi start work_1 -A appcelery

中止worker進程,若是沒法中止,加上-A
celery multi stop WORKNAME

重啓worker進程
celery multi restart WORKNAME

查看進程數
celery status -A celery_task

5. 指定時間格式

複雜的定時功能可使用crontab功能,它跟linux自帶的crontab所支持的格式是同樣的,很是方便定製任務執行時間。

from celery.schedules import crontab
 
app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

上面的案例是在每一個週一的7:30執行tasks.add任務。

相關文章
相關標籤/搜索