celery 使用交流

celery簡介

celery 是一個基於分佈消息傳遞的異步任務隊列,他可讓任務的執行脫離主程序,也能夠分配到其餘得主機上運行。咱們能夠用它實現異步任務和定時任務。
複製代碼

使用場景

1.你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,
你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行進行時,你能夠繼續作其它的事情。
2.你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福
複製代碼

celery模塊

任務模塊 Task
包含異步任務和定時任務,異步任務一般在業務邏輯中被觸發並分發往任務隊列,定時任務是由celery beat 進程週期性地姜任務發往任務隊列
複製代碼

消息中間件 Broker

celery使用消息進行通訊,broker也就是消息傳遞機制, 接手任務生產者發來的任務, 將任務存入隊列中。

附:可以使用的broker,官方目前推薦的穩定性較高的主要是RabbitMQ和Redis:
複製代碼

任務執行單元 Worker

worker是任務執行單元,他實時監控消息隊列,獲取隊列中調度的任務並執行
複製代碼

任務結果存儲 Backend

Backend 用於存儲任務的執行結果,以供查詢。同消息中間件同樣,存儲也可以使用 RabbitMQ, Redis 和 MongoDB 等
複製代碼

celery 基本工做流程簡述

celery的簡單使用

建立一個celery的異步任務能夠簡單的分爲三步:(這裏使用redis做爲broker)

一、建立celery實例

二、啓動worker

三、調用celery異步任務
複製代碼

建立 celery 實例

首先安裝celeryhtml

pip install celery
複製代碼

建立 celery應用 tasks.py,在這裏broker使用的是redis,關於redis的使用就不在這裏說明了python

# -*- coding: utf-8 -*-

import time

from celery import Celery


# celery實例,在這裏個人broker和backend都是設置的redis
app = Celery('tasks', backend='redis://:@127.0.0.1:6379/2', broker='redis://:@127.0.0.1:6379/2')

@app.task
def add(x, y):
    return x + y


@app.task
def sleep_task(timeout=20):
    for i in range(timeout):
        print i
        time.sleep(1)
    return timeout
複製代碼

啓動worker,這裏的worker啓動的就是以前寫的tasks.py文件

celery -A tasks worker --loglevel=info
複製代碼

進入Python環境 redis

delay()是celery的任務調用方式,其中delay 是使用 apply_async 的快捷方式。apply_async 支持更多的參數,它的通常形式以下

ready()能夠獲取任務的完成狀態 get()能夠獲取任務的結果 get(propagate=True),能夠在任務異常時覆蓋這一行爲 result.traceback,能夠獲取異常shell

celery 工做流

首先看一下 signature() 與 chain()

首先引入兩個包
from tasks import add
from celery import signature, chain
複製代碼

利用signature建立一個任務,能夠看到這個任務也支持delay(),也能夠直接調用 數據庫

Simple chaindjango

chain能夠將 signature任務連接在一塊兒,造成一條任務鏈,將多個任務串聯在一塊兒相繼調用,同時可使下一個任務使用上一個任務的返回值。注:s是signature的簡寫json

Immutable signatures

不可變簽名,能夠看到每一個簽名任務都與上一個任務沒有關係,都是獨立的任務,也能夠用chain連接在一塊兒bash

add.signature((2, 2), immutable=True)
# 能夠簡寫爲
add.si(2, 2)
res = chain(add.si(2, 2) | add.si(4, 4) | add.si(8, 8))()
複製代碼

django 中使用celery

想要在django中使用celery,首先須要配置一個celery的應用,官方建議的是在settings文件的同目錄下建立一個celery.py的實例 app

# 絕對導入,防止與celery衝突
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 設置Django的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
# celery app
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
# 配置前綴
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 自動掃描app下的tasks文件
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
複製代碼

而後須要在init文件下引入這個模塊,確保django啓動時加載文件異步

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']
複製代碼

接下來是在settings的配置參數

# Celery settings
from __future__ import absolute_import, unicode_literals

# broker
CELERY_BROKER_URL = 'redis://:@127.0.0.1:6379/3'
# 內容類型,默認全部類型
CELERY_ACCEPT_CONTENT = ['application/json']
# 任務結果
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 更多配置能夠查看
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-accept_content

# 定時任務配置
CELERY_BEAT_SCHEDULE = {

}
複製代碼

若是 CELERY_RESULT_BACKEND 設置了django,注意須要INSTALLED_APPS配置下

配置下數據庫,在數據庫中就能夠查看任務的執行狀況了
python manage.py migrate django_celery_results
複製代碼
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
複製代碼

任務繼承

繼承Task,能夠更好的控制任務,在這裏重寫了Task的幾個方法

# -*- coding: utf-8 -*-
# ___author__ = 'gwx'

import celery


class BaseTask(celery.Task):
    """ celery 基類, 繼承Task """

    def __call__(self, *args, **kwargs):
        """ :param args: :param kwargs: :return: """
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(BaseTask, self).__call__(*args, **kwargs)

    # 任務成功
    def on_success(self, retval, task_id, args, kwargs):
        print(self.request)
        print("success")
        print(retval)
        print(task_id)
        print(args)
        print(kwargs)

        return super(BaseTask, self).on_success(retval, task_id, args, kwargs)

    # 任務失敗
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failure")
        print(exc)
        print(task_id)
        print(args)
        print(kwargs)
        print(einfo)
        # 失敗重試
        self.retry(exc=exc)

    # finally
    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        print(status, retval, task_id, args, kwargs, einfo)
        print('task %s is finished' % self.name)

複製代碼

建立celery任務,這裏建立了幾種不一樣類型的任務

# -*- coding: utf-8 -*-
# ___author__ = 'gwx'

import celery
from workflows.base import BaseTask


@celery.task
def add(x, y):
    return x + y


@celery.task(bind=True, base=BaseTask, name='add_success')
def add_success(self, x, y):
    """ success 演示 """
    s = x + y
    print("success: this result is %s" % s)
    return 'add success return'


@celery.task(bind=True, base=BaseTask, name='add_failure')
def add_failure(self, msg='this is a error task', max_retry=2):
    """ failure 演示 """
    print("task retries %s" % self.request.retries)
    if self.request.retries >= int(max_retry):
        return 'add failure return'
    raise Exception("add_failure message: %s" % msg)


@celery.task(bind=True, base=BaseTask, name='end_task')
def end_task(self):
    """ 最後的任務 """
    print('this is end task')
    return 'end task'


@celery.task(bind=True, base=BaseTask, name="work_flow_task")
def work_flow_task(self):
    """ 任務流, 使用chain將 si類型 任務串接起來,這樣能夠模板完成對主任務的拆分, 而自定義基類,能夠更好的控制子任務 :param self: :return: """
    success_si = add_success.si(2, 4)
    failure_si = add_failure.si(msg='retry task', max_retry=0)
    end_si = end_task.si()
    celery.chain(success_si, failure_si, end_si)()
複製代碼

啓動worker,進入項目的根目錄

# -l celery日誌等級, -c worker數,
# 你們能夠經過 celery -A proj.celery worker -- help查找更多適合項目的配置
celery -A proj.celery worker -l info -c 10
複製代碼

調用任務,我這裏是使用django的shell調用的 進入項目的根目錄

python manage.py shell
# 引入了tasks
from workflows.tasks import *
# 調用add_success的celery任務
add_success.apply_async((2, 4), pri=0)
複製代碼

worker執行結果

add_failure任務你們能夠嘗試本身執行下,不一樣的是增長了一個重試的功能。

鏈式任務

多個子任務的輸出結果

定時任務

celery支持定時任務, 在setting中配置

""" add 是此任務的lable name task 是要執行的定時任務,必須是celery修飾的 schedule 是定時設置,一下是3s一次 當引入時,也能夠這樣設置: 'schedule': timedelta(seconds=5) 同時也支持crontab() 'schedule': crontab(hour=7, minute=30, day_of_week=1) 更多可查看 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html """
from datetime import timedelta

# 定時任務配置
CELERY_BEAT_SCHEDULE = {
    'add': {
        'task': 'demoapp.tasks.add',
        'schedule': 3,
        'args': (2, 3)
    }
}
複製代碼

定時啓動和worker相似

celery beat -A proj.celery -l info
複製代碼

在worker中查看結果,能夠看到沒隔三秒鐘都會執行一邊add task

當任務更新時,須要從新啓動worker或者定時worker,定時至關於worker中的一個進程

相關文章
相關標籤/搜索