Celery在使用前必須實例化,稱爲application或app。app是線程安全的,具備不一樣配置、組件、task的多個Celery應用能夠共存於同一個進程空間。python
# 建立Celery應用 >>> from celery import Celery >>> app = Celery() >>> app <Celery __main__:0x100469fd0>
最後一行文本化顯示了Celery應用:包含應用所屬類的名稱,當前主模塊名,以及內存地址。惟一重要的信息是模塊名稱。正則表達式
在Celery中發送task消息時,該消息僅包含要執行的task的名稱。每個worker維護一個task名稱和對應函數的映射,這稱爲task registry
。shell
當定義一個task時,該task將註冊到本地:安全
>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks['__main__.add'] <@task: __main__.add>
當Celery沒法檢測task函數屬於哪一個模塊時,使用main模塊名生成初始task名稱。app
這種方式僅適用於如下兩種場景:函數
# tasks.py from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == '__main__': app.worker_main()
若是直接運行tasks.py,task名將以__main__
爲前綴,但若是tasks.py被其餘程序導入,task名將以tasks
爲前綴。以下:lua
>>> from tasks import add >>> add.name tasks.add
也能夠直接指定主模塊名:線程
>>> app = Celery('tasks') >>> app.main 'tasks' >>> @app.task ... def add(x, y): ... return x + y >>> add.name tasks.add
能夠經過直接設置,或使用專用配置模塊對Celery進行配置。code
經過app.conf
屬性查看或直接設置配置:orm
>>> app.conf.timezone 'Europe/London' >>> app.conf.enable_utc = True
或用app.conf.update
方法一次更新多個配置:
>>> app.conf.update( ... enable_utc=True, ... timezone='Europe/London', ...)
app.config_from_object()
方法從配置模塊或對象中導入配置。須要注意的是:調用config_from_object()方法將重置在這以前配置的任何設置。
使用模塊名
app.config_from_object()方法接收python模塊的徹底限定名(fully qualified name
)或具體到其中的某個屬性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":
from celery import Celery app = Celery() app.config_from_object('celeryconfig')
只要可以正常執行import celeryconfig
,app就能正常配置。
使用模塊對象
也能夠傳入一個已導入的模塊對象,但不建議這樣作。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
更推薦使用模塊名的方式,由於這樣在使用prefork pool時不須要序列化該模塊。若是在實際應用中出現配置問題或序列化錯誤,請嘗試使用模塊名的方式。
使用配置類或對象
from celery import Celery app = Celery() class Config: enable_utc = True timezone = 'Europe/London' app.config_from_object(Config)
app.config_from_envvar()
方法從環境變量中接收配置模塊名。
import os from celery import Celery #: Set default configuration module name os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig') app = Celery() app.config_from_envvar('CELERY_CONFIG_MODULE')
經過環境變量指定配置模塊:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
若是要顯示Celery配置,可能須要過濾某些敏感信息如密碼、密鑰等。Celery提供了幾種用於幫助顯示配置的實用方法。
humanize()
該方法按行返回字符串形式的配置,默認只包含改動過的配置,若是要顯示內置的默認配置,設置with_defaults
參數爲True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()
該方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery不會移除全部的敏感信息,由於它使用正則表達式匹配鍵並判斷是否移除。若是用戶添加了包含敏感信息的自定義配置,能夠使用Celery可能標記爲敏感配置的名稱來命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
應用實例是惰性的。
建立Celery實例只會執行如下操做:
logical clock instance
task registry
set_as_current
參數)app.on_init()
回調函數(默認不執行任何操做)app.task()
裝飾器不會在task定義時當即建立task,而是在task使用時或finalized
應用後建立。
下例說明了在使用task或訪問其屬性前,都不會建立task:
>>> @app.task >>> def add(x, y): ... return x + y >>> type(add) <class 'celery.local.PromiseProxy'> >>> add.__evaluated__() False >>> add # <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
應用的Finalization
指顯式地調用app.finalize()
方法或隱式地訪問app.tasks屬性。
finalized
應用將會:
shared
屬性,將屬於應用私有。雖然能夠依賴於當前應用,但最佳實踐是將應用實例傳遞給任何須要它的對象,這個行爲能夠稱爲app chain
。
# 依賴於當前應用(bad) from celery import current_app class Scheduler(object): def run(self): app = current_app
# 傳遞應用實例(good) class Scheduler(object): def __init__(self, app): self.app = app
在開發模式設置CELERY_TRACE_APP環境變量,能夠在應用鏈斷開時拋出異常:
$ CELERY_TRACE_APP=1 celery worker -l info
使用task()
裝飾器建立的task都繼承自celery.app.task
模塊的Task
基類。繼承該類能夠自定義task類:
from celery import Task # 或者 from celery.app.task import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print('TASK STARTING: {0.name}[{0.request.id}]'.format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
若是要重寫__call__()
方法,記得調用super。這樣在task直接調用時會執行基類的默認事件。
Task
基類是特殊的,由於它並未綁定到任何特定的應用。一旦task綁定到應用,它將讀取配置以設置默認值等。
經過base
參數指定基類
@app.task(base=DebugTask) def add(x, y): return x + y
經過app.Task
屬性指定基類
>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = 'hipri' >>> app.Task = MyBaseTask >>> app.Task <unbound MyBaseTask> >>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [<class add of <Celery __main__:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type 'object'>]