Celery用戶指南中文翻譯-Application

Celery在使用前必須實例化,稱爲application或app。app是線程安全的,具備不一樣配置、組件、task的多個Celery應用能夠共存於同一個進程空間。python

# 建立Celery應用
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最後一行文本化顯示了Celery應用:包含應用所屬類的名稱,當前主模塊名,以及內存地址。惟一重要的信息是模塊名稱。正則表達式

Main Name

在Celery中發送task消息時,該消息僅包含要執行的task的名稱。每個worker維護一個task名稱和對應函數的映射,這稱爲task registryshell

當定義一個task時,該task將註冊到本地:安全

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

當Celery沒法檢測task函數屬於哪一個模塊時,使用main模塊名生成初始task名稱。app

這種方式僅適用於如下兩種場景:函數

  1. 定義task的模塊做爲程序運行
  2. app在python shell中建立
# tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

若是直接運行tasks.py,task名將以__main__爲前綴,但若是tasks.py被其餘程序導入,task名將以tasks爲前綴。以下:lua

>>> from tasks import add
>>> add.name
tasks.add

也能夠直接指定主模塊名:線程

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

Configuration

能夠經過直接設置,或使用專用配置模塊對Celery進行配置。code

經過app.conf屬性查看或直接設置配置:orm

>>> app.conf.timezone
'Europe/London'

>>> app.conf.enable_utc = True

或用app.conf.update方法一次更新多個配置:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

config_from_object

app.config_from_object()方法從配置模塊或對象中導入配置。須要注意的是:調用config_from_object()方法將重置在這以前配置的任何設置

使用模塊名
app.config_from_object()方法接收python模塊的徹底限定名(fully qualified name)或具體到其中的某個屬性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

只要可以正常執行import celeryconfig,app就能正常配置。

使用模塊對象
也能夠傳入一個已導入的模塊對象,但不建議這樣作。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

更推薦使用模塊名的方式,由於這樣在使用prefork pool時不須要序列化該模塊。若是在實際應用中出現配置問題或序列化錯誤,請嘗試使用模塊名的方式。

使用配置類或對象

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)

config_from_envvar

app.config_from_envvar()方法從環境變量中接收配置模塊名。

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

經過環境變量指定配置模塊:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

Censored configuration

若是要顯示Celery配置,可能須要過濾某些敏感信息如密碼、密鑰等。Celery提供了幾種用於幫助顯示配置的實用方法。

humanize()
該方法按行返回字符串形式的配置,默認只包含改動過的配置,若是要顯示內置的默認配置,設置with_defaults參數爲True:

>>> app.conf.humanize(with_defaults=False, censored=True)

table()
該方法返回字典形式的配置:

>>> app.conf.table(with_defaults=False, censored=True)

Celery不會移除全部的敏感信息,由於它使用正則表達式匹配鍵並判斷是否移除。若是用戶添加了包含敏感信息的自定義配置,能夠使用Celery可能標記爲敏感配置的名稱來命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。

Laziness

應用實例是惰性的。

建立Celery實例只會執行如下操做:

  1. 建立用於event的logical clock instance
  2. 建立task registry
  3. 設置爲當前應用(除非禁用了set_as_current參數)
  4. 調用app.on_init()回調函數(默認不執行任何操做)

app.task()裝飾器不會在task定義時當即建立task,而是在task使用時或finalized應用後建立。

下例說明了在使用task或訪問其屬性前,都不會建立task:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

應用的Finalization指顯式地調用app.finalize()方法或隱式地訪問app.tasks屬性。

finalized應用將會:

  1. 複製必須在應用間共享的task。task默認是共享的,但若是禁用了task裝飾器的shared屬性,將屬於應用私有。
  2. 評估全部待處理的task裝飾器
  3. 確保全部task綁定到當前應用。將task綁定到某個應用,以即可以從配置中讀取默認值。

Breaking the chain

雖然能夠依賴於當前應用,但最佳實踐是將應用實例傳遞給任何須要它的對象,這個行爲能夠稱爲app chain

# 依賴於當前應用(bad)

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app
# 傳遞應用實例(good)

class Scheduler(object):

    def __init__(self, app):
        self.app = app

在開發模式設置CELERY_TRACE_APP環境變量,能夠在應用鏈斷開時拋出異常:

$ CELERY_TRACE_APP=1 celery worker -l info

Abstract Tasks

使用task()裝飾器建立的task都繼承自celery.app.task模塊的Task基類。繼承該類能夠自定義task類:

from celery import Task
# 或者 from celery.app.task import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

若是要重寫__call__()方法,記得調用super。這樣在task直接調用時會執行基類的默認事件。

Task基類是特殊的,由於它並未綁定到任何特定的應用。一旦task綁定到應用,它將讀取配置以設置默認值等。

  1. 經過base參數指定基類

    @app.task(base=DebugTask)
    def add(x, y):
        return x + y
  2. 經過app.Task屬性指定基類

    >>> from celery import Celery, Task
    
    >>> app = Celery()
    
    >>> class MyBaseTask(Task):
    ...    queue = 'hipri'
    
    >>> app.Task = MyBaseTask
    >>> app.Task
    <unbound MyBaseTask>
    
    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    
    >>> add
    <@task: __main__.add>
    
    >>> add.__class__.mro()
    [<class add of <Celery __main__:0x1012b4410>>,
     <unbound MyBaseTask>,
     <unbound Task>,
     <type 'object'>]
相關文章
相關標籤/搜索