celery學習筆記

celery使用在django下,官方4.2文檔   http://docs.celeryproject.org/en/latest/django/index.html, github文檔:https://github.com/celery/celery/tree/master/examples/django/html

 

 

首先生成celery的apppython

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')  #  導入django環境

app = Celery('proj')  # 生成app

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()  #  自動到每一個app下邊尋找tasks文件,並導入


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

使用@shared_task裝飾器,爲了使用它,首先在項目的__init__.py文件導入appmysql

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

建立tasks模塊:git

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

 

運行celerygithub

celery -A proj worker -l info

window 10下運行報錯,弄了很久,最後看別人的博客解決了:https://blog.csdn.net/qq_30242609/article/details/79047660, 解決辦法:redis

celery -A proj worker -l info -P eventlet

把celery的並行執行方式改成 eventlet, 而不是默認的prefork,須要安裝sql

pip install eventlet數據庫

 

把保存結果後端設爲django的結果後端,須要使用django-celery-resultsdjango

$ pip install django-celery-results
INSTALLED_APPS{
  django_celery_results,
}

 python manage.py migrate django_celery_results

數據庫json

CELERY_RESULT_BACKEND = 'django-db'

緩存

CELERY_RESULT_BACKEND = 'django-cache'

幫助:

 celery help

序列化配置,因爲4.0默認是json序列化,序列化有時候會不成功,須要進行配置

CELERY_ACCEPT_CONTENT = ['json', 'pickle'] 
CELERY_TASK_SERIALIZER = 'pickle' # 'json'默認, 改成pickle
CELERY_RESULT_SERIALIZER= 'json' # 結果序列化,默認json能夠改成pickle

 

使用redis做爲中間人

CELERY_BROKER_URL = 'redis://:password@host:port/db'

'redis://localhost/0'

簡寫:'redis://'

RabbitMQ: 'amqp://guest:guest@localhost//'

 

其餘後端

    sqlite (filename) CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’

 

# mysql CELERY_RESULT_BACKEND = ‘db+mysql://scott:tiger@localhost/foo’

 

# postgresql CELERY_RESULT_BACKEND = ‘db+postgresql://scott:tiger@localhost/mydatabase’

 

# oracle CELERY_RESULT_BACKEND = ‘db+oracle://scott:tiger@127.0.0.1:1521/sidname’

配置時區app.conf.timezone = 'Asia/Shanghai'

 

運行週期性任務:

進行配置

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 調用 test('hello') 每10秒運行
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # test('world') 每30秒
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # 每一個星期一早上 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

動態配置

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

args位置參數 元組 ,列表

kwargs關鍵字參數, 字典類型

定時任務示例

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

文檔連接:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules

相關文章
相關標籤/搜索