分佈式任務隊列Celery入門與進階

1、簡介

  Celery是由Python開發、簡單、靈活、可靠的分佈式任務隊列,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery側重於實時操做,但對調度支持也很好,其天天能夠處理數以百萬計的任務。特色:html

  • 簡單:熟悉celery的工做流程後,配置使用簡單
  • 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery會自動嘗試從新執行任務
  • 快速:一個單進程的celery每分鐘可處理上百萬個任務
  • 靈活:幾乎celery的各個組件均可以被擴展及自定製

應用場景舉例:python

  1.web應用:當用戶在網站進行某個操做須要很長時間完成時,咱們能夠將這種操做交給Celery執行,直接返回給用戶,等到Celery執行完成之後通知用戶,大大提好網站的併發以及用戶的體驗感。web

  2.任務場景:好比在運維場景下須要批量在幾百臺機器執行某些命令或者任務,此時Celery能夠輕鬆搞定。redis

  3.定時任務:向定時導數據報表、定時發送通知相似場景,雖然Linux的計劃任務能夠幫我實現,可是很是不利於管理,而Celery能夠提供管理接口和豐富的API。json

2、架構&工做原理

  Celery由如下三部分構成:消息中間件(Broker)、任務執行單元Worker、結果存儲(Backend),以下圖:api

  

工做原理:promise

  1. 任務模塊Task包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往消息隊列,而定時任務由Celery Beat進程週期性地將任務發往消息隊列;
  2. 任務執行單元Worker實時監視消息隊列獲取隊列中的任務執行;
  3. Woker執行完任務後將結果保存在Backend中;

消息中間件Broker

  消息中間件Broker官方提供了不少備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ。架構

任務執行單元Worker

  Worker是任務執行單元,負責從消息隊列中取出任務執行,它能夠啓動一個或者多個,也能夠啓動在不一樣的機器節點,這就是其實現分佈式的核心。併發

結果存儲Backend

  Backend結果存儲官方也提供了諸多的存儲方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。app

3、安裝使用 

  這裏我使用的redis做爲消息中間件,redis安裝能夠參考https://www.cnblogs.com/wdliu/p/9360286.html。

Celery安裝: 

pip3 install celery

簡單使用

  目錄結構:

project/
├── __init__.py  
├── config.py
└── tasks.py

各目錄文件說明:

__init__.py:初始化Celery以及加載配置文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from celery import Celery
app = Celery('project')                                # 建立 Celery 實例
app.config_from_object('project.config')               # 加載配置模塊

config.py:  Celery相關配置文件,更多配置參考:http://docs.celeryproject.org/en/latest/userguide/configuration.html

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis

CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間

CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置

CELERY_IMPORTS = (     # 指定導入的任務模塊,能夠指定多個
    'project.tasks',
)

tasks.py :任務定義文件

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
@app.task
def show_name(name):
    return name

啓動Worker:

celery worker -A project -l debug

各個參數含義:

  worker: 表明第啓動的角色是work固然還有beat等其餘角色;

  -A :項目路徑,這裏個人目錄是project

  -l:啓動的日誌級別,更多參數使用celery --help查看

查看日誌輸出,會發現咱們定義的任務,以及相關配置:

 

  雖然啓動了worker,可是咱們還須要經過delay或apply_async來將任務添加到worker中,這裏咱們經過交互式方法添加任務,並返回AsyncResult對象,經過AsyncResult對象獲取結果:

AsyncResult除了get方法用於經常使用獲取結果方法外還提如下經常使用方法或屬性:

  • state: 返回任務狀態;
  • task_id: 返回任務id;
  • result: 返回任務結果,同get()方法;
  • ready(): 判斷任務是否以及有結果,有結果爲True,不然False;
  • info(): 獲取任務信息,默認爲結果;
  • wait(t): 等待t秒後獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
  • successfu(): 判斷任務是否成功,成功爲True,不然爲False;

4、進階使用

  對於普通的任務來講可能知足不了咱們的任務需求,因此還須要瞭解一些進階用法,Celery提供了諸多調度方式,例如任務編排、根據任務狀態執行不一樣的操做、重試機制等,如下會對經常使用高階用法進行講述。

定時任務&計劃任務

  Celery的提供的定時任務主要靠schedules來完成,經過beat組件週期性將任務發送給woker執行。在示例中,新建文件period_task.py,並添加任務到配置文件中:

period_task.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.schedules import crontab

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每週一下午四點五十六執行sayhai
        sayhi.s('wd'),name='say_hi'
    )



@app.task
def add(x,y):
    print(x+y)
    return x+y


@app.task
def sayhi(name):
    return 'hello %s' % name

config.py

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis

CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間

CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置

CELERY_IMPORTS = (     # 指定導入的任務模塊,能夠指定多個
    'project.tasks',
    'project.period_task', #定時任務
)

啓動worker和beat:

celery worker -A project -l debug #啓動work
celery beat -A  project.period_task -l  debug #啓動beat,注意此時對應的文件路徑

咱們能夠觀察worker日誌:

還能夠經過配置文件方式指定定時和計劃任務,此時的配置文件以下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis

CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間

CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置

CELERY_IMPORTS = (     # 指定導入的任務模塊,能夠指定多個
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
    'period_add_task': {    # 計劃任務
        'task': 'project.period_task.add',  #任務路徑
        'schedule': crontab(hour=18, minute=16, day_of_week=1),
        'args': (3, 4),
    },
'add-every-30-seconds': {          # 每10秒執行
        'task': 'project.period_task.sayhi',  #任務路徑
        'schedule': 10.0,
        'args': ('wd',)
    },
}

此時的period_task.py只須要註冊到woker中就好了,以下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    print(x+y)
    return x+y


@app.task
def sayhi(name):
    return 'hello %s' % name

一樣啓動worker和beat結果和第一種方式同樣。更多詳細的內容請參考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules

任務綁定

  Celery可經過任務綁定到實例獲取到任務的上下文,這樣咱們能夠在任務運行時候獲取到任務的狀態,記錄相關日誌等。

修改任務中的period_task.py,以下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
logger
= get_task_logger(__name__) @app.task(bind=True) # 綁定任務 def add(self,x,y): logger.info(self.request.__dict__) #打印日誌 try: a=[] a[10]==1 except Exception as e: raise self.retry(exc=e, countdown=5, max_retries=3) # 出錯每5秒嘗試一次,總共嘗試3次 return x+y

在以上代碼中,經過bind參數將任務綁定,self指任務的上下文,經過self獲取任務狀態,同時在任務出錯時進行任務重試,咱們觀察日誌:

內置鉤子函數

  Celery在執行任務時候,提供了鉤子方法用於在任務執行完成時候進行對應的操做,在Task源碼中提供了不少狀態鉤子函數如:on_success(成功後執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行),在進行使用是咱們只須要重寫這些方法,完成相應的操做便可。

在如下示例中,咱們繼續修改period_task.py,分別定義三個任務來演示任務失敗、重試、任務成功後執行的操做:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):

    def on_success(self, retval, task_id, args, kwargs):   # 任務成功執行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))



    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任務失敗執行
        logger.info('task id:{} , arg:{} , failed ! erros : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任務重試執行
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

@app.task(base=demotask,bind=True)
def add(self,x,y):
    try:
        a=[]
        a[10]==1
    except Exception as e:
        raise self.retry(exc=e, countdown=5, max_retries=1) # 出錯每5秒嘗試一次,總共嘗試1次
    return x+y

@app.task(base=demotask)
def sayhi(name):
    a=[]
    a[10]==1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def sum(a,b):
    return 'a+b={} '.format(a+b)

此時的配置文件config.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from project import app
from celery.schedules import crontab

BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis做爲消息中間件

CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這裏使用redis

CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間

CELERY_TIMEZONE='Asia/Shanghai'   # 時區配置

CELERY_IMPORTS = (     # 指定導入的任務模塊,能夠指定多個
    'project.tasks',
    'project.period_task',
)

app.conf.beat_schedule = {
'add': {          # 每10秒執行
        'task': 'project.period_task.add',  #任務路徑
        'schedule': 10.0,
        'args': (10,12),
    },
'sayhi': {          # 每10秒執行
        'task': 'project.period_task.sayhi',  #任務路徑
        'schedule': 10.0,
        'args': ('wd',),
    },
'sum': {          # 每10秒執行
        'task': 'project.period_task.sum',  #任務路徑
        'schedule': 10.0,
        'args': (1,3),
    },
}

而後重啓worker和beat,查看日誌:

 

任務編排

  在不少狀況下,一個任務須要由多個子任務或者一個任務須要不少步驟才能完成,Celery一樣也能實現這樣的任務,完成這類型的任務經過如下模塊完成:

  • group: 並行調度任務

  • chain: 鏈式任務調度

  • chord: 相似group,但分header和body2個部分,header能夠是一個group任務,執行完成後調用body的任務

  • map: 映射調度,經過輸入多個入參來屢次調度同一個任務

  • starmap: 相似map,入參相似*args

  • chunks: 將任務按照必定數量進行分組

 

修改tasks.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from project import app

@app.task
def add(x,y):
    return x+y


@app.task
def mul(x,y):
    return x*y


@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res+=i
    return res

 

group: 組任務,組內每一個任務並行執行

和project同級目錄新建consumer.py以下:

from celery import group
from project.tasks import add,mul,sum
res = group(add.s(1,2),add.s(1,2))()  # 任務 [1+2,1+2] 
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

結果:

 

chain:鏈式任務

鏈式任務中,默認上一個任務的返回結果做爲參數傳遞給子任務

from celery import chain
from project.tasks import add,mul,sum
res = chain(add.s(1,2),add.s(3),mul.s(3))()  # 任務((1+2)+3)*3
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break
#結果
#res:18

還可使用|表示鏈式任務,上面任務也能夠表示爲:

res = (add.s(1,2) | add.s(3) | (mul.s(3)))()
res.get()

 

chord:任務分割,分爲header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果做爲參數傳遞給body

from celery import chord
from project.tasks import add,mul,sum
res = chord(header=[add.s(1,2),mul.s(3,4)],body=sum.s())()  # 任務(1+2)+(3*4)
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

#結果:
#res:15

 

chunks:任務分組,按照任務的個數分組

from project.tasks import add,mul,sum
res = add.chunks(zip(range(5),range(5)),4)()  # 4 表明每組的任務的個數
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

結果:

 

delay &apply_async

  對於delay和apply_async均可以用來進行任務的調度,本質上是delay對apply_async進行了再一次封裝(或者能夠說是快捷方式),二者都返回AsyncResult對象,如下是兩個方法源碼。

    def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)
delay源碼
    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        """Apply tasks asynchronously by sending a message.

        Arguments:
            args (Tuple): The positional arguments to pass on to the task.

            kwargs (Dict): The keyword arguments to pass on to the task.

            countdown (float): Number of seconds into the future that the
                task should execute.  Defaults to immediate execution.

            eta (~datetime.datetime): Absolute time and date of when the task
                should be executed.  May not be specified if `countdown`
                is also supplied.

            expires (float, ~datetime.datetime): Datetime or
                seconds in the future for the task should expire.
                The task won't be executed after the expiration time.

            shadow (str): Override task name used in logs/monitoring.
                Default is retrieved from :meth:`shadow_name`.

            connection (kombu.Connection): Re-use existing broker connection
                instead of acquiring one from the connection pool.

            retry (bool): If enabled sending of the task message will be
                retried in the event of connection loss or failure.
                Default is taken from the :setting:`task_publish_retry`
                setting.  Note that you need to handle the
                producer/connection manually for this to work.

            retry_policy (Mapping): Override the retry policy used.
                See the :setting:`task_publish_retry_policy` setting.

            queue (str, kombu.Queue): The queue to route the task to.
                This must be a key present in :setting:`task_queues`, or
                :setting:`task_create_missing_queues` must be
                enabled.  See :ref:`guide-routing` for more
                information.

            exchange (str, kombu.Exchange): Named custom exchange to send the
                task to.  Usually not used in combination with the ``queue``
                argument.

            routing_key (str): Custom routing key used to route the task to a
                worker server.  If in combination with a ``queue`` argument
                only used to specify custom routing keys to topic exchanges.

            priority (int): The task priority, a number between 0 and 9.
                Defaults to the :attr:`priority` attribute.

            serializer (str): Serialization method to use.
                Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
                serialization method that's been registered
                with :mod:`kombu.serialization.registry`.
                Defaults to the :attr:`serializer` attribute.

            compression (str): Optional compression method
                to use.  Can be one of ``zlib``, ``bzip2``,
                or any custom compression methods registered with
                :func:`kombu.compression.register`.
                Defaults to the :setting:`task_compression` setting.

            link (Signature): A single, or a list of tasks signatures
                to apply if the task returns successfully.

            link_error (Signature): A single, or a list of task signatures
                to apply if an error occurs while executing the task.

            producer (kombu.Producer): custom producer to use when publishing
                the task.

            add_to_parent (bool): If set to True (default) and the task
                is applied while executing another task, then the result
                will be appended to the parent tasks ``request.children``
                attribute.  Trailing can also be disabled by default using the
                :attr:`trail` attribute

            publisher (kombu.Producer): Deprecated alias to ``producer``.

            headers (Dict): Message headers to be included in the message.

        Returns:
            celery.result.AsyncResult: Promise of future evaluation.

        Raises:
            TypeError: If not enough arguments are passed, or too many
                arguments are passed.  Note that signature checks may
                be disabled by specifying ``@task(typing=False)``.
            kombu.exceptions.OperationalError: If a connection to the
               transport cannot be made, or if the connection is lost.

        Note:
            Also supports all keyword arguments supported by
            :meth:`kombu.Producer.publish`.
        """
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )
apply_async源碼

對於其使用,apply_async支持經常使用參數:

  • eta:指定任務執行時間,類型爲datetime時間類型;
  • countdown:倒計時,單位秒,浮點類型;
  • expires:任務過時時間,若是任務在超過過時時間還未執行則回收任務,浮點類型獲取datetime類型;
  • retry:任務執行失敗時候是否嘗試,布爾類型。;
  • serializer:序列化方案,支持pickle、json、yaml、msgpack;
  • priority:任務優先級,有0~9優先級可設置,int類型;
  • retry_policy:任務重試機制,其中包含幾個重試參數,類型是dict以下:
max_retries:最大重試次數

interval_start:重試等待時間

interval_step:每次重試疊加時長,假設第一重試等待1s,第二次等待1+n秒

interval_max:最大等待時間

####示例
 add.apply_async((1, 3), retry=True, retry_policy={
        'max_retries': 1,
        'interval_start': 0,
        'interval_step': 0.8,
        'interval_max': 5,
    })
View Code

更多參數參考:http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async

 

  

 5、管理與監控

  Celery管理和監控功能是經過flower組件實現的,flower組件不只僅提供監控功能,還提供HTTP API可實現對woker和task的管理。

安裝使用

pip3 install flower

啓動

 flower -A project --port=5555   
# -A :項目目錄
#--port 指定端口

訪問http:ip:5555

api使用,例如獲取woker信息:

curl http://127.0.0.1:5555/api/workers

結果:

更多api參考:https://flower.readthedocs.io/en/latest/api.html

相關文章
相關標籤/搜索