Celery 庫在使用以前必須初始化,一個celery實例被稱爲一個應用(或者縮寫 app)。python
Celery 應用是線程安全的,因此多個不一樣配置、不一樣組件、不一樣任務的 應用能夠在一個進程空間裏共存。正則表達式
下面建立一個 celery 應用:安全
>>> from celery import Celery >>> app = Celery() >>> app <Celery __main__:0x100469fd0>
最後一行顯示的是 celery 應用的文本表示: 包含應用類的名稱(Celery),當前主模塊的名稱(main),以及應用對象的內存地址(0x100469fd0)。app
上述文本表示中只有一部分是重要的,那就是主模塊名稱。下面分析下它爲什麼重要。async
當你發送一個消息給 Celery,消息中不會包含任何源碼,而只有你想要執行的任務的名稱。這就好像因特網上的域名映射原理通常:每一個執行單元維護着一個任務名稱到實際任務函數的映射,這個映射被稱爲任務註冊表。函數
當你定義一個任務,這個任務就會被添加到本地註冊表:工具
>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks['__main__.add'] <@task: __main__.add>
如今,你又看到顯示 __main__
模塊名稱;當Celery不能探查到這個任務函數屬於哪一個模塊時,它將使用主模塊名稱來產生任務名稱的前綴。lua
這在有些狀況下會產生問題:
1. 定義任務的主模塊做爲一個程序運行。
2. 應用在python交互終端建立。spa
例以下面程序,定義任務的模塊還調用 app.worker_main()
來啓動一個工做單元:.net
tasks.py:
from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == '__main__': app.worker_main()
當這個模塊運行,任務將之前綴 __main__
命名,可是當該模塊被其餘進程引入來運行一個任務,這個任務的名稱將之前綴 tasks
命名(即這個模塊的真實名稱)。
>>> from tasks import add >>> add.name tasks.add
你能夠給主模塊聲明另一個名稱:
>>> app = Celery('tasks') >>> app.main 'tasks' >>> @app.task ... def add(x, y): ... return x + y >>> add.name tasks.add
具體能夠查看名稱這節。
你能夠設置一些選項來改變 Celery 的工做方式。這些選項能夠直接在 app 實例上進行設置,或者也可使用一個指定的配置模塊。
配置使用 app.conf
變量保存:
>>> app.conf.timezone 'Europe/London'
你能夠直接設置配置值:
>>> app.conf.enable_utc = True
或者使用 update
方法同時更新多個配置項。
>>> app.conf.update( ... enable_utc=True, ... timezone='Europe/London', ...)
實際的配置對象由多個字典決定,配置項按如下順序查詢:
1. 運行時的配置修改
2. 配置模塊(若是聲明)
3. 默認配置(celery.app.defaults)
你還可使用 app.add_defaults()
方法添加新的默認配置源。
另外: 全部可用配置的完整列表及其默認值請參照 Configuration reference。
app.config_from_object() 方法從一個配置對象加載配置。
它能夠是一個配置模塊,或者任意包含配置屬性的對象。
注意任何前面設置的配置在調用 config_from_object
方法後都將被重置。若是你想設置附加的配置應該在調用這個方法以後。
示例1: 使用模塊名
app.config_from_object()
方法的參數能夠是一個 python 模塊的全限定名稱,或者一個 python 屬性名,例如:」celeryconfig」,」myproj.config.celery」,或者 「myproj.config:CeleryConfig」:
from celery import Celery app = Celery() app.config_from_object('celeryconfig')
celeryconfig 模塊內容以下形式:
celeryconfig.py:
enable_utc = True timezone = 'Europe/London'
只要 import celeryconfig
能正常運行,應用實例就能加載它。
示例2:傳遞一個模塊對象
你還能夠傳遞一個已經加載的模塊對象,可是不做爲常規建議。
提示:
建議使用模塊名的方式加載,由於這種狀況下當prefork池使用時,配置模塊沒必要序列化。若是遇到配置問題或者序列化錯誤,能夠嘗試使用模塊名的方式加載配置。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
示例3:使用配置類/對象
from celery import Celery app = Celery() class Config: enable_utc = True timezone = 'Europe/London' app.config_from_object(Config) # or using the fully qualified name of the object: # app.config_from_object('module:Config')
app.config_from_envvar()
從環境變量中獲取配置模塊名稱。
例如,從環境變量 CELERY_CONFIG_MODULE 所聲明的模塊加載配置:
import os from celery import Celery #: Set default configuration module name os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig') app = Celery() app.config_from_envvar('CELERY_CONFIG_MODULE')
你能夠經過環境變量聲明要使用的模塊:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
若是你想打印配置信息,做爲調試信息或者相似,你也許不想暴露密碼和API祕鑰這類信息。
Celery 提供了一些有用的工具函數來展現這些配置信息,其中一個是 humanize()
函數:
>>> app.conf.humanize(with_defaults=False, censored=True)
請注意Celery不會移除全部的敏感信息,由於它只是僅僅使用一個正則表達式來匹配配置項鍵名。若是你添加包含敏感信息的定製化配置,你應該使用 celery 能識別爲敏感信息的鍵名。
若是一個配置項鍵名包含如下字符串,它將被看做是敏感的: API,TOKEN,KEY,SECRET,PASS,SIGNATURE,DATABASE
一個應用實例是延遲加載的,意味着它只有在實際調用的時候纔會被求值。
建立一個celery 實例只會作以下事情:
1. 建立一個用於事件的邏輯時鐘實例
2. 建立一個任務註冊表
3. 將本身設置爲當前應用實例(若是 set_as_current
參數被禁用將不會作此設置)
4. 調用 app.on_init()
回調函數(默認不作任何事情)
app.task()
裝飾器在任務定義時不會建立任務,而是延遲任務的建立到任務使用時,或者應用被終止時。
下面這個例子說明了直到你使用任務時或者訪問任務對象的屬性時(這裏是 repr()
)任務纔會被建立:
>>> @app.task >>> def add(x, y): ... return x + y >>> type(add) <class 'celery.local.PromiseProxy'> >>> add.__evaluated__() False >>> add # <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
應用的終止有兩種狀況,顯示調用 app.finalize()
終止,或者經過訪問 app.tasks
屬性隱示終止。
終止應用對象將會執行:
1. 應用間必須共享的任務的拷貝
任務默認是被共享的,可是若是任務裝飾器的共享參數被設置爲禁用時任務會爲被綁定的應用所私有。
對全部未求值的任務求值
確認全部任務都綁定到當前應用實例
任務綁定到了應用實例,因此能夠讀取配置的默認值。
note:
「默認應用實例」
celery 並非一開始有應用實例這個概念,最先只有一個模塊級別的API,爲了向後兼容老的API,這個模塊級別API會保留直到celery 5.0發佈。
celery 會建立一個特殊的應用實例 - 默認應用實例,若是沒有自定義的應用實例被初始化,這個默認應用實例將會被啓用。
例如,老的任務基類使用了許多兼容特性,其中一些與新的特性不兼容,好比任務方法。
from celery.task import Task # << OLD Task base class. from celery import Task # << NEW base class.
即便你使用老的模塊級別的API,也推薦使用新的基類。
雖然能夠依賴於當前設置的應用實例,可是將應用實例做爲參數傳遞給全部須要它的對象仍然是最佳操做實踐。
稱這種操做爲「應用實例鏈」的緣由是由於它依賴所傳遞的應用實例建立了一個鏈。
下面這個例子被認爲是差的實踐:
from celery import current_app class Scheduler(object): def run(self): app = current_app
它應用將 app 做爲一個參數傳遞:
class Scheduler(object): def __init__(self, app): self.app = app
在celery內部實現中,使用 celery.app_or_default()
函數使得模塊級別的 API 也能正常使用。
from celery.app import app_or_default class Scheduler(object): def __init__(self, app=None): self.app = app_or_default(app)
在開發環境中,能夠經過設置 CELERY_TRACE_APP
環境變量在應用實例鏈被打破時拋出一個異常:
$ CELERY_TRACE_APP=1 celery worker -l info
note:
API 的演化
celery 項目從開始建立到如今的七年多時間裏已經改變了不少。
例如,最開始可使用任何一個可調用對象做爲一個任務:
def hello(to): return 'hello {0}'.format(to) >>> from celery.execute import apply_async >>> apply_async(hello, ('world!',))
能夠建立一個任務類,設置特定屬性,或者覆蓋其餘行爲
from celery.task import Task from celery.registry import tasks class Hello(Task): queue = 'hipri' def run(self, to): return 'hello {0}'.format(to) tasks.register(Hello) >>> Hello.delay('world!')
後來,開發者以爲傳遞任意可調用對象是反模式,由於它使得很難使用除了 pickle
以外的序列化方案,所以這個特性在 2.0 就被踢除了,取而代之的是任務裝飾器:
from celery.task import task @task(queue='hipri') def hello(to): return 'hello {0}'.format(to)
全部使用 task() 裝飾器建立的任務都會繼承應用的基礎 Task
類。
你可使用裝飾器的 base 參數給任務聲明一個不一樣的基類:
@app.task(base=OtherTask): def add(x, y): return x + y
建立一個自定義的任務類,你應該繼承這個中性類:celery.Task
from celery import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print('TASK STARTING: {0.name}[{0.request.id}]'.format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
提示:
若是你覆蓋了任務的 __call__
方法,那麼很是重要的一點是你還須要調用父類的方法使得在任務被直接調用時基類call方法能設置好默認請求。
這個中性類比較特殊,由於它不會綁定到任意特殊應用實例。一旦任務綁定到一個應用實例,它將讀取應用的配置信息來設置默認值等等。
使用一個基類,你須要使用 app.task()
裝飾器建立一個任務:
@app.task(base=DebugTask) def add(x, y): return x + y
還能夠經過修改 app.Task
屬性來修改一個應用實例的默認基類:
>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = 'hipri' >>> app.Task = MyBaseTask >>> app.Task <unbound MyBaseTask> >>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [<class add of <Celery __main__:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type 'object'>]
轉自:https://blog.csdn.net/libing_thinking/article/details/78541171