Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,專一於實時處理的異步任務隊列,同時也支持任務調度。本文目的是看看 Celery 的 task 到底是什麼,以及 若是咱們想從無到有實現一個 task 機制,有哪些地方須要注意,應該如何處理。html
由於 task 和 Consumer 消費密切相關,爲了更好的說明,故本文與上文有部分重複,請諒解。java
咱們能夠大體想一想須要一些問題,也就是咱們下面剖析的出發點和留意點。node
咱們在下面會逐一回答這些問題。python
示例代碼服務端以下,這裏使用了裝飾器來包裝待執行任務。redis
Task就是用戶自定義的業務代碼,這裏的 task 就是一個加法功能。json
from celery import Celery app = Celery('myTest', broker='redis://localhost:6379') @app.task def add(x,y): print(x+y) return x+y if __name__ == '__main__': app.worker_main(argv=['worker'])
發送代碼以下:canvas
from myTest import add re = add.apply_async((2,17))
爲了瞭解 task 是什麼,咱們首先打印出運行變量看看,這裏選取了主要成員變量:後端
self = {add} <@task: myTest.add of myTest at 0x7faf35f0a208> Request = {str} 'celery.worker.request:Request' Strategy = {str} 'celery.worker.strategy:default' app = {Celery} <Celery myTest at 0x7faf35f0a208> backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7faf364aea20> from_config = {tuple: 9} (('serializer', 'task_serializer'), ('rate_limit', 'task_default_rate_limit'), ('priority', 'task_default_priority'), ('track_started', 'task_track_started'), ('acks_late', 'task_acks_late'), ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'), ('reject_on_worker_lost', 'task_reject_on_worker_lost'), ('ignore_result', 'task_ignore_result'), ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored')) name = {str} 'myTest.add' priority = {NoneType} None request = {Context} <Context: {}> request_stack = {_LocalStack: 0} <celery.utils.threads._LocalStack object at 0x7faf36405e48> serializer = {str} 'json'
能夠看出來,'myTest.add' 是一個Task變量。app
因而咱們須要看看Task 是什麼。Task 的實如今 Celery 中你會發現有兩處,負載均衡
一處位於 celery/app/task.py;
第二個位於 celery/task/base.py 中;
他們之間是有關係的,你能夠認爲第一個是對外暴露的接口,而第二個是具體的實現。
任務是 Celery 裏不可缺乏的一部分,它能夠是任何可調用對象。每個任務經過一個惟一的名稱進行標識, worker 經過這個名稱對任務進行檢索。任務能夠經過 app.task 裝飾器進行註冊,須要注意的一點是,當函數有多個裝飾器時,爲了保證 Celery 的正常運行,app.task 裝飾器須要在最外層。
Task 承載的功能就是在 Celery 應用中,啓動對應的消息消費者。
任務最基本的形式就是函數,任務發佈最直接的想法就是client將要執行的相關函數代碼打包,發佈到broker。分佈式計算框架 spark 就是使用這種方式(Spark的思想比較簡單:挪計算不挪數據)。2.0以前的celery也支持這種任務發佈的方式。
這種方式顯而易見的一個壞處是傳遞給broker的數據量可能會比較大。解決的辦法也很容易想到,就是把要發佈的任務相關的代碼,提早告訴worker。這就是 全局集合 和 註解註冊的做用。
當採用 "提早告訴 worker 咱們自定義的 task" 時候,定義 task 的方法以下:
@app.task(name='hello_task') def hello(): print('hello')
其中的app是worker中的application,經過裝飾器的方式,對任務函數註冊。
app會維護一個字典,key是任務的名字,也就是這裏的hello_task
,value是這個函數的內存地址。任務名必須惟一,可是任務名這個參數不是必須的,若是沒有給這個參數,celery會自動根據包的路徑和函數名生成一個任務名。
經過上面這種方式,client發佈任務只須要提供任務名以及相關參數,沒必要提供任務相關代碼:
# client端 app.send_task('hello_task')
這裏須要注意:client發佈任務後,任務會以一個消息的形式寫入broker隊列,帶有任務名稱等相關參數,等待worker獲取。這裏任務的發佈,是徹底獨立於worker端的,即便worker沒有啓動,消息也會被寫入隊列。
這種方式也有顯而易見的壞處,全部要執行的任務代碼都須要提早在worker端註冊好,client端和worker端的耦合變強了。
所以,咱們須要從 Celery 應用啓動時候開始看。
Celery 啓動首先就是來到 celery/_state.py
這裏創建了一個 全局 set,用來收集全部的 任務 tasks。
#: Global set of functions to call whenever a new app is finalized. #: Shared tasks, and built-in tasks are created by adding callbacks here. _on_app_finalizers = set()
在啓動時候,系統經過調用以下函數來添加 任務。
def connect_on_app_finalize(callback): """Connect callback to be called when any app is finalized.""" _on_app_finalizers.add(callback) return callback
首先,celery/app/builtins.py 就定義了不少內置任務,須要一一添加到全局回調集合中。
@connect_on_app_finalize def add_map_task(app): from celery.canvas import signature @app.task(name='celery.map', shared=False, lazy=False) def xmap(task, it): task = signature(task, app=app).type return [task(item) for item in it] return xmap
其次,系統流程會來到咱們的自定義task,把這個 task 註冊到全局回調集合中。
即,能夠這麼理解:Celery 啓動以後,會查找代碼中,哪些類或者函數使用了 @task註解,而後就把這些 類或者函數註冊到全局回調集合中。
@app.task def add(x,y): print(x+y) return x+y
咱們順着 @app.task 來到了 Celery 應用自己。
代碼位於:celery/app/base.py。
@app.task 的做用是返回 _create_task_cls
來構建一個task proxy,而後加入 應用待處理隊列 pending,而且利用connect_on_app_finalize(cons) 加入全局回調集合。
_create_task_cls = {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls at 0x7ff1a7b118c8>
具體代碼以下:
def task(self, *args, **opts): if USING_EXECV and opts.get('lazy', True): from . import shared_task return shared_task(*args, lazy=False, **opts) def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): _filt = filter def _create_task_cls(fun): if shared: def cons(app): return app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ connect_on_app_finalize(cons) # 這裏是重點,加入全局回調集合 if not lazy or self.finalized: ret = self._task_from_fun(fun, **opts) else: # return a proxy object that evaluates on first use ret = PromiseProxy(self._task_from_fun, (fun,), opts, __doc__=fun.__doc__) self._pending.append(ret) # 加入應用pending if _filt: return _filt(ret) return ret return _create_task_cls if len(args) == 1: if callable(args[0]): return inner_create_task_cls(**opts)(*args) return inner_create_task_cls(**opts)
按照示例中的調用,Celery 返回了Proxy的實例,傳入參數就是task_by_cons。
此時查看一下Proxy類的實現,該類位於celery/local.py中。
class Proxy(object): """Proxy to another object.""" # Code stolen from werkzeug.local.Proxy. __slots__ = ('__local', '__args', '__kwargs', '__dict__') def __init__(self, local, args=None, kwargs=None, name=None, __doc__=None): object.__setattr__(self, '_Proxy__local', local) # 將傳入參數local設置到_Proxy__local屬性中 object.__setattr__(self, '_Proxy__args', args or ()) # 設置列表屬性 object.__setattr__(self, '_Proxy__kwargs', kwargs or {}) # 設置鍵值屬性 if name is not None: object.__setattr__(self, '__custom_name__', name) if __doc__ is not None: object.__setattr__(self, '__doc__', __doc__) ... def _get_current_object(self): """Get current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ loc = object.__getattribute__(self, '_Proxy__local') # 獲取初始化傳入的local if not hasattr(loc, '__release_local__'): # 若是沒有__release_local__屬性 return loc(*self.__args, **self.__kwargs) # 函數調用,將初始化的值傳入調用該函數 try: # pragma: no cover # not sure what this is about return getattr(loc, self.__name__) # 獲取當前__name__屬性值 except AttributeError: # pragma: no cover raise RuntimeError('no object bound to {0.__name__}'.format(self)) ... def __getattr__(self, name): if name == '__members__': return dir(self._get_current_object()) return getattr(self._get_current_object(), name) # 獲取obj的屬性 def __setitem__(self, key, value): self._get_current_object()[key] = value # 設置key val def __delitem__(self, key): del self._get_current_object()[key] # 刪除對應key def __setslice__(self, i, j, seq): self._get_current_object()[i:j] = seq # 列表操做 def __delslice__(self, i, j): del self._get_current_object()[i:j] def __setattr__(self, name, value): setattr(self._get_current_object(), name, value) # 設置屬性 def __delattr__(self, name): delattr(self._get_current_object(), name) # 刪除對應屬性
咱們只展現了部分屬性,分析如上,主要是根據傳入的是否local是不是函數,或者包含release_local來判斷是不是調用函數,或是獲取屬性來處理。
上面代碼中,以下會把 task 添加到 Celery 應用的 pending queue。
self._pending.append(ret)
_pending定義以下,就是一個 deque:
class Celery: """Celery application. """ def __init__(self, main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs): self._pending = deque()
此時全局集合以下:
_on_app_finalizers = {set: 10} {function} <function add_chunk_task at 0x7fc200a81400> {function} <function add_backend_cleanup_task at 0x7fc200a81048> {function} <function add_starmap_task at 0x7fc200a81488> {function} <function add_group_task at 0x7fc200a812f0> {function} <function add_map_task at 0x7fc200a81510> {function} <function Celery.task.<locals>.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons at 0x7fc200af4510> {function} <function add_accumulate_task at 0x7fc200aa0158> {function} <function add_chain_task at 0x7fc200a81378> {function} <function add_unlock_chord_task at 0x7fc200a81598> {function} <function add_chord_task at 0x7fc200aa01e0>
具體邏輯如圖:
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------------------------------------------------------------------------------+
至此,得倒了一個 全局 set :_on_app_finalizers
,用來收集全部的 任務 tasks。
手機上如圖:
目前 Celery 知道了有哪些 task,而且把它們收集起來,可是還不知道它們的邏輯意義。或者能夠這麼認爲,Celery 只是知道有哪些類,可是沒有這些類的實例。
由於消費 task 是 Celery 的核心功能,因此咱們不可避免的要再回顧下 Worker 的啓動,可是這裏咱們注重 worker 之中 與 task 相關的部分。
其實就是處理上面的 全局 set :_on_app_finalizers
。把這些暫時沒有意義的 task 與 Celery 應用關聯起來。
具體來講,就是:
這裏的Worker 就是 Celery 用來消費的 worker 實例。
因此,咱們直接來到 worker 看看。
代碼位於:celery/bin/worker.py
@click.pass_context @handle_preload_options def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None, loglevel=None, logfile=None, pidfile=None, statedb=None, **kwargs): """Start worker instance.""" app = ctx.obj.app worker = app.Worker( hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=node_format(pidfile, hostname), statedb=node_format(statedb, hostname), no_color=ctx.obj.no_color, **kwargs) # 運行到這裏 worker.start() return worker.exitcode
在 worker = app.Worker
之中,咱們會發現,間接調用到了 WorkerController。
代碼運行到這裏,位於:celery/worker/worker.py。
這裏作了一些初始化工做,咱們繼續探究。
class WorkController: """Unmanaged worker instance.""" def __init__(self, app=None, hostname=None, **kwargs): self.app = app or self.app self.hostname = default_nodename(hostname) self.startup_time = datetime.utcnow() self.app.loader.init_worker() self.on_before_init(**kwargs) # 運行到這裏
代碼運行到這裏,位於:celery/apps/worker.py
這裏調用到了 trace.setup_worker_optimizations,這樣立刻就看到 task 了。
class Worker(WorkController): """Worker as a program.""" def on_before_init(self, quiet=False, **kwargs): self.quiet = quiet trace.setup_worker_optimizations(self.app, self.hostname)
代碼運行到這裏,位於:celery/app/trace.py。
調用到 app.finalize(),目的是啓動以前,搞定全部任務。
def setup_worker_optimizations(app, hostname=None): """Setup worker related optimizations.""" global trace_task_ret hostname = hostname or gethostname() # make sure custom Task.__call__ methods that calls super # won't mess up the request/task stack. _install_stack_protection() app.set_default() # evaluate all task classes by finalizing the app. app.finalize()
費了半天勁,咱們纔來到了關鍵邏輯。
app.finalize() 會添加任務到 Celery 應用。
即:以前系統把全部的task都收集起來了,得倒了一個全局 set :_on_app_finalizers
。可是這個 set 中的task 目前沒有邏輯意義,須要和 Celery 應用聯繫起來才行,因此這裏就是要創建關聯。
堆棧以下:
_task_from_fun, base.py:450 _create_task_cls, base.py:425 add_chunk_task, builtins.py:128 _announce_app_finalized, _state.py:52 finalize, base.py:511 setup_worker_optimizations, trace.py:643 on_before_init, worker.py:90 __init__, worker.py:95 worker, worker.py:326 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代碼以下:
def finalize(self, auto=False): """Finalize the app. This loads built-in tasks, evaluates pending task decorators, reads configuration, etc. """ with self._finalize_mutex: if not self.finalized: if auto and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') self.finalized = True _announce_app_finalized(self) # 這裏是關鍵,創建關聯 pending = self._pending while pending: maybe_evaluate(pending.popleft()) for task in self._tasks.values(): task.bind(self) self.on_after_finalize.send(sender=self)
_announce_app_finalized(self)
函數是爲了 : 把全局回調集合 _on_app_finalizers 中的回調函數運行,獲得任務的實例,而後就把它們加入到 Celery 的任務列表,用戶能夠經過 task 名字獲得對應的 task 實例。
def _announce_app_finalized(app): callbacks = set(_on_app_finalizers) for callback in callbacks: callback(app)
對於咱們的用戶自定義任務,callback 就是 _create_task_cls,所以就是運行 _create_task_cls 進行添加。
def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): _filt = filter def _create_task_cls(fun): if shared: def cons(app): return app._task_from_fun(fun, **opts) cons.__name__ = fun.__name__ connect_on_app_finalize(cons) if not lazy or self.finalized: ret = self._task_from_fun(fun, **opts) # 這裏
因而,在初始化過程當中,爲每一個 app 添加該任務時,會調用到 app._task_from_fun(fun, **options)。
_task_from_fun
之中,使用以下代碼把任務添加到 celery 之中。這樣就關聯起來。
self._tasks[task.name] = task
因而 self._tasks就爲:
_tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x25da0ca0d88> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x25da0ca0d88> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x25da0ca0d88> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x25da0ca0d88> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x25da0ca0d88> 'celery.group' = {group} <@task: celery.group of myTest at 0x25da0ca0d88> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x25da0ca0d88> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x25da0ca0d88> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x25da0ca0d88> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x25da0ca0d88> __len__ = {int} 10
具體代碼以下:
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): if not self.finalized and not self.autofinalize: raise RuntimeError('Contract breach: app not finalized') name = name or self.gen_task_name(fun.__name__, fun.__module__) base = base or self.Task if name not in self._tasks: run = fun if bind else staticmethod(fun) task = type(fun.__name__, (base,), dict({ 'app': self, 'name': name, 'run': run, '_decorated': True, '__doc__': fun.__doc__, '__module__': fun.__module__, '__annotations__': fun.__annotations__, '__header__': staticmethod(head_from_fun(fun, bound=bind)), '__wrapped__': run}, **options))() self._tasks[task.name] = task task.bind(self) # connects task to this app add_autoretry_behaviour(task, **options) else: task = self._tasks[name] return task
其中task在默認狀況下是celery.app.task:Task,在動態生成該實例後,調用了task.bind(self)方法,這裏就是設置 app 各類屬性。
@classmethod def bind(cls, app): was_bound, cls.__bound__ = cls.__bound__, True cls._app = app # 設置類的_app屬性 conf = app.conf # 獲取app的配置信息 cls._exec_options = None # clear option cache if cls.typing is None: cls.typing = app.strict_typing for attr_name, config_name in cls.from_config: # 設置類中的默認值 if getattr(cls, attr_name, None) is None: # 若是獲取該屬性爲空 setattr(cls, attr_name, conf[config_name]) # 使用app配置中的默認值 # decorate with annotations from config. if not was_bound: cls.annotate() from celery.utils.threads import LocalStack cls.request_stack = LocalStack() # 使用線程棧保存數據 # PeriodicTask uses this to add itself to the PeriodicTask schedule. cls.on_bound(app) return app
運行回到 Celery,此時代碼位於:celery/app/base.py
變量以下:
pending = {deque: 1} deque([<@task: myTest.add of myTest at 0x7fd907623550>]) self = {Celery} <Celery myTest at 0x7fd907623550>
從pending 中提取任務以後,會進行處理。前面咱們提到,有一些 task 的待處理工做,就是在這裏執行。
代碼位於:celery/local.py
def __maybe_evaluate__(self): return self._get_current_object() def _get_current_object(self): try: return object.__getattribute__(self, '__thing')
此時self以下,就是任務自己:
self = {add} <@task: myTest.add of myTest at 0x7fa09ee1e320>
返回就是 myTest.add 任務自己。
目前已經獲得了全部的 task,而且每個task都有本身的實例,能夠進行調用。
由於任務消費須要用到多進程,因此咱們須要先大體看看多進程如何啓動的。
讓咱們繼續看看 Celery Worker 的啓動。
在 Celery Worker 啓動過程當中,會啓動不一樣的bootsteps,在 Worker 啓動過程當中,對應的 steps 爲:[<step: Hub>, <step: Pool>, <step: Consumer>]。
start, bootsteps.py:116 start, worker.py:204 worker, worker.py:327 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代碼位於:celery/bootsteps.py
def start(self, parent): self.state = RUN if self.on_start: self.on_start() for i, step in enumerate(s for s in parent.steps if s is not None): self.started = i + 1 step.start(parent)
變量爲:
parent.steps = {list: 3} 0 = {Hub} <step: Hub> 1 = {Pool} <step: Pool> 2 = {Consumer} <step: Consumer> __len__ = {int} 3
具體 任務處理的邏輯 啓動 就在 Pool 之中。
在 Pool(bootsteps.StartStopStep) 中,以下代碼 w.process_task = w._process_task
給具體的 pool 配置了回調方法。 即 當 pool 接到通知,有運行機會時候,他知道用什麼回調函數來獲取/執行具體的task。
class Pool(bootsteps.StartStopStep): """Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency """ def create(self, w): procs = w.min_concurrency w.process_task = w._process_task # 這裏配置回調函數
方法以下,能夠預計,將來會經過 req.execute_using_pool(self.pool)
這裏調用到 多進程 :
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
此時 變量爲:
self = {Pool} <step: Pool> semaphore = {NoneType} None threaded = {bool} False w = {Worker} celery
最後獲得以下邏輯,這個TaskRegistry 在執行任務會用到:
self._tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fb652da5fd0> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0> 'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0> __len__ = {int} 10
圖例以下:
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------+-----------------------------------------------------------------------+ | | | +--------------------------------------------------------------------------------------------+ finalize v | | | TaskRegistry | +---------------------------+ | | | | | | | Celery | | | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | _process_task <-------------------+ process_task| | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | _tasks +-------------> | 'celery.group' = {group} <@task: celery.group of myTest> | | | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | +---------------------------+ | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手機以下:
或者咱們調整 圖結構,從另外一個角度看看。
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | | | connect_on_app_finalize +------------+ | <----------------------------+ | builtins.py| | +------------+ | | connect_on_app_finalize | +-------------+ + | <---------------------------+ |User Function| | +-------------+ v +------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +--------------------------+---------------------------------------------------------------------+ | | finalize | | | v +-------------+-------------+ | | | Celery | | | | _tasks | | + | | | | +---------------------------+ | | | v +--------------------------------------------------------------------------------------------+ | | | TaskRegistry | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手機以下:
Task 定義的代碼位於:celery/app/task.py。
從其成員變量能夠清楚的看到大體功能分類以下:
基礎信息,好比:
錯誤處理信息,好比:
業務控制,好比:
任務控制,好比:
具體定義以下:
@abstract.CallableTask.register class Task: __trace__ = None __v2_compat__ = False # set by old base in celery.task.base MaxRetriesExceededError = MaxRetriesExceededError OperationalError = OperationalError Strategy = 'celery.worker.strategy:default' Request = 'celery.worker.request:Request' _app = None name = None typing = None max_retries = 3 default_retry_delay = 3 * 60 rate_limit = None ignore_result = None trail = True send_events = True store_errors_even_if_ignored = None serializer = None time_limit = None soft_time_limit = None backend = None autoregister = True track_started = None acks_late = None acks_on_failure_or_timeout = None reject_on_worker_lost = None throws = () expires = None priority = None resultrepr_maxsize = 1024 request_stack = None _default_request = None abstract = True _exec_options = None __bound__ = False from_config = ( ('serializer', 'task_serializer'), ('rate_limit', 'task_default_rate_limit'), ('priority', 'task_default_priority'), ('track_started', 'task_track_started'), ('acks_late', 'task_acks_late'), ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'), ('reject_on_worker_lost', 'task_reject_on_worker_lost'), ('ignore_result', 'task_ignore_result'), ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'), ) _backend = None # set by backend property.
由於 task 是經過 Consumer 來調用,因此咱們要看看 Consumer 中關於 task 的部分,就是把 task 和 consumer 聯繫起來,這樣纔可以讓 Consumer 具體調用到 task。
Consumer啓動時候,也是要運行多個 steps。
parent.steps = {list: 8} 0 = {Connection} <step: Connection> 1 = {Events} <step: Events> 2 = {Heart} <step: Heart> 3 = {Mingle} <step: Mingle> 4 = {Gossip} <step: Gossip> 5 = {Tasks} <step: Tasks> 6 = {Control} <step: Control> 7 = {Evloop} <step: event loop> __len__ = {int} 8
consumer 會啓動 Tasks 這個bootsteps,這裏會:
'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
。所以,task 的回調就和 amqp.Consumer 聯繫,消息通路就構建完成。
代碼位於:celery/worker/consumer/tasks.py
class Tasks(bootsteps.StartStopStep): """Bootstep starting the task message consumer.""" requires = (Mingle,) def __init__(self, c, **kwargs): c.task_consumer = c.qos = None super().__init__(c, **kwargs) def start(self, c): """Start task consumer.""" c.update_strategies() # 配置每一個任務的回調方法 # - RabbitMQ 3.3 completely redefines how basic_qos works.. # This will detect if the new qos smenatics is in effect, # and if so make sure the 'apply_global' flag is set on qos updates. qos_global = not c.connection.qos_semantics_matches_spec # set initial prefetch count c.connection.default_channel.basic_qos( 0, c.initial_prefetch_count, qos_global, ) c.task_consumer = c.app.amqp.TaskConsumer( c.connection, on_decode_error=c.on_decode_error, ) # task 就和 amqp.Consumer 聯繫起來 def set_prefetch_count(prefetch_count): return c.task_consumer.qos( prefetch_count=prefetch_count, apply_global=qos_global, ) c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
關於 task 運行實際上是須要必定策略的,這也能夠認爲是一種負載均衡。其策略以下:
SCHED_STRATEGY_FCFS = 1 SCHED_STRATEGY_FAIR = 4 SCHED_STRATEGIES = { None: SCHED_STRATEGY_FAIR, 'default': SCHED_STRATEGY_FAIR, 'fast': SCHED_STRATEGY_FCFS, 'fcfs': SCHED_STRATEGY_FCFS, 'fair': SCHED_STRATEGY_FAIR, }
update_strategies 會配置每一個任務的回調策略以及回調方法,好比:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
。
堆棧以下:
update_strategies, consumer.py:523 start, tasks.py:26 start, bootsteps.py:116 start, consumer.py:311 start, bootsteps.py:365 start, bootsteps.py:116 start, worker.py:204 worker, worker.py:327 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 invoke, core.py:1066 invoke, core.py:1259 main, core.py:782 start, base.py:358 worker_main, base.py:374
代碼位於:celery/worker/consumer/consumer.py
def update_strategies(self): loader = self.app.loader # app的加載器 for name, task in items(self.app.tasks): # 遍歷全部的任務 self.strategies[name] = task.start_strategy(self.app, self) # 將task的name設爲key 將task start_strategy調用的返回值做爲 value task.__trace__ = build_tracer(name, task, loader, self.hostname, app=self.app) # 處理相關執行結果的函數
app.tasks變量以下,這就是目前 Celery 註冊的全部 tasks:
self.app.tasks = {TaskRegistry: 10} NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fc5a36e8160> 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fc5a36e8160> 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fc5a36e8160> 'celery.group' = {group} <@task: celery.group of myTest at 0x7fc5a36e8160> 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fc5a36e8160> 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fc5a36e8160> 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fc5a36e8160> 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fc5a36e8160> 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fc5a36e8160> 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fc5a36e8160> __len__ = {int} 10
此時咱們繼續查看task.start_strategy函數,
def start_strategy(self, app, consumer, **kwargs): return instantiate(self.Strategy, self, app, consumer, **kwargs) # 生成task實例
此時self.Strategy的默認值是celery.worker.strategy:default,
def default(task, app, consumer, info=logger.info, error=logger.error, task_reserved=task_reserved, to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t, proto1_to_proto2=proto1_to_proto2): """Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override. """ hostname = consumer.hostname # 設置相關的消費者信息 connection_errors = consumer.connection_errors # 設置錯誤值 _does_info = logger.isEnabledFor(logging.INFO) # task event related # (optimized to avoid calling request.send_event) eventer = consumer.event_dispatcher events = eventer and eventer.enabled send_event = eventer.send task_sends_events = events and task.send_events call_at = consumer.timer.call_at apply_eta_task = consumer.apply_eta_task rate_limits_enabled = not consumer.disable_rate_limits get_bucket = consumer.task_buckets.__getitem__ handle = consumer.on_task_request limit_task = consumer._limit_task body_can_be_buffer = consumer.pool.body_can_be_buffer Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回一個請求類 revoked_tasks = consumer.controller.state.revoked def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): if body is None: body, headers, decoded, utc = ( message.body, message.headers, False, True, ) if not body_can_be_buffer: body = bytes(body) if isinstance(body, buffer_t) else body else: body, headers, decoded, utc = proto1_to_proto2(message, body) # 解析接受的數據 req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) # 實例化請求 if (req.expires or req.id in revoked_tasks) and req.revoked(): return if task_sends_events: send_event( 'task-received', uuid=req.id, name=req.name, args=req.argsrepr, kwargs=req.kwargsrepr, root_id=req.root_id, parent_id=req.parent_id, retries=req.request_dict.get('retries', 0), eta=req.eta and req.eta.isoformat(), expires=req.expires and req.expires.isoformat(), ) # 若是須要發送接受請求則發送 if req.eta: # 時間相關處理 try: if req.utc: eta = to_timestamp(to_system_tz(req.eta)) else: eta = to_timestamp(req.eta, timezone.local) else: consumer.qos.increment_eventually() call_at(eta, apply_eta_task, (req,), priority=6) else: if rate_limits_enabled: # 速率限制 bucket = get_bucket(task.name) if bucket: return limit_task(req, bucket, 1) task_reserved(req) # if callbacks: [callback(req) for callback in callbacks] handle(req) # 處理接受的請求 return task_message_handler
此時處理的 handler 就是在 consumer 初始化的時候傳入的 w.process_task,
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
操做以後,獲得了每一個task的回調策略,這樣當多進程調用時候,就知道如何調用task了,即對於咱們目前的各個 task,當從broker 拿到任務消息以後,咱們都調用 task_message_handler
。
strategies = {dict: 10} 'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60> 'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400> 'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598> 'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840> 'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8> 'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90> 'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0> 'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378> 'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620> 'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8> __len__ = {int} 10
celery.worker.strategy:default 之中,這部分代碼須要看看:
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回一個請求類
Strategy 中,如下目的是爲了 根據 task 實例 構建一個 Request,從而把 broker 消息,consumer,多進程都聯繫起來。
具體能夠看到 Request. execute_using_pool 這裏就會和多進程處理開始關聯,好比和 comsumer 的 pool 進程池聯繫起來。
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
task 實例爲:
myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]
得到Requst代碼爲:
def create_request_cls(base, task, pool, hostname, eventer, ref=ref, revoked_tasks=revoked_tasks, task_ready=task_ready, trace=trace_task_ret): default_time_limit = task.time_limit default_soft_time_limit = task.soft_time_limit apply_async = pool.apply_async acks_late = task.acks_late events = eventer and eventer.enabled class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id if (self.expires or task_id in revoked_tasks) and self.revoked(): raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits result = apply_async( trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result def on_success(self, failed__retval__runtime, **kwargs): failed, retval, runtime = failed__retval__runtime if failed: if isinstance(retval.exception, ( SystemExit, KeyboardInterrupt)): raise retval.exception return self.on_failure(retval, return_ok=True) task_ready(self) if acks_late: self.acknowledge() if events: self.send_event( 'task-succeeded', result=retval, runtime=runtime, ) return Request
前面回調函數 task_message_handler中有 req = Req(...),這就涉及到了如何調用多進程,即 Request 類處理。
def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) # 實例化請求 if req.eta: # 時間相關 else: task_reserved(req) # if callbacks: [callback(req) for callback in callbacks] handle(req) # 處理接受的請求 return task_message_handler
注意:
此時處理的 handle(req) 的 handle函數 就是在 consumer 初始化的時候傳入的 w.process_task,
def _process_task(self, req): """Process task by sending it to the pool of workers.""" req.execute_using_pool(self.pool)
因此,handle(req) 實際上就是調用 Request 的 execute_using_pool 函數,就來到了多進程。
代碼爲:
class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id# 獲取任務id if (self.expires or task_id in revoked_tasks) and self.revoked():# 檢查是否過時或者是否已經執行過 raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits# 獲取時間 result = apply_async(# 執行對應的func並返回結果 trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result
由於信息量太大,因此分爲三個圖展現。
strategy 邏輯爲:
+-----------------------+ +---------------------------+ | Celery | | Consumer | | | | | | consumer +---------------------> | | +---------------+ | | | task_consumer +---------------> | amqp.Consumer | | _tasks | | | +---------------+ | + | | | | | | | strategies +----------------+ +-----------------------+ | | | | | | | | +---------------------------+ | | v v +------------------------------------------------------+-------------------------------------+ +-----------------------------------------------------------------------------+ | | | strategies = {dict: 10} | | TaskRegistry | | 'celery.chunks' = function default.<locals>.task_message_handler | | | | 'celery.backend_cleanup' = function default.<locals>.task_message_handler | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | | 'celery.chord_unlock' = function default.^locals>.task_message_handler | | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.group' = function default.<localsv.task_message_handler | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.map' = function default.<locals>.task_message_handler | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.chain' = function default.<locals>.task_message_handler | | 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.starmap' = function default.<locals>.task_message_handler | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chord' = function default.<locals>.task_message_handler | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'myTest.add' = function default.<locals^.task_message_handler | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.accumulate' = function default.vlocals>.task_message_handler | | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | | | 'myTest.add' = {add} <@task: myTest.add of myTest> | +-----------------------------------------------------------------------------+ | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手機以下
Celery 應用中註冊的task 邏輯爲
+------------------------------+ | _on_app_finalizers = set() | | | +--------------+---------------+ | connect_on_app_finalize | +------------+ | | builtins.py| +-----------------------> | +------------+ | | connect_on_app_finalize | +-------------+ | |User Function| +----------------------> | +-------------+ | v +----------------------------------------------------------------------------------------------------+ | _on_app_finalizers | | | | | | ^function add_chunk_task> | | <function add_backend_cleanup_task> | | <function add_starmap_task> | | <function add_group_task> | | <function add_map_task^ | | <function Celery.task.vlocals^.inner_create_task_cls.<locals>._create_task_cls.<locals>.cons> | | <function add_accumulate_taskv | | <function add_chain_task> | | <function add_unlock_chord_task> | | vfunction add_chord_task> | | | +----------------------------+-----------------------------------------------------------------------+ | | | +--------------------------------------------------------------------------------------------+ finalize v | | | TaskRegistry | +---------------------------+ | | | | | | | Celery | | | | | | NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | _process_task <-------------------+ process_task| | 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | | | 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | | | 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | _tasks +-------------> | 'celery.group' = {group} <@task: celery.group of myTest> | +---------------+ | | | 'celery.map' = {xmap} <@task: celery.map of myTest> | | amqp.Consumer | <--------+ task_consumer | | 'celery.chain' = {chain} <@task: celery.chain of myTest> | +---------------+ | | | 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | +---------------------------+ | 'celery.chord' = {chord} <@task: celery.chord of myTest> | | 'myTest.add' = {add} <@task: myTest.add of myTest> | | 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> | | | +--------------------------------------------------------------------------------------------+
手機以下:
當從broker獲取消息以後,處理任務時候邏輯爲:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +--------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +-----------+ myTest.add
手機以下圖:
至此,Celery啓動所有分析結束,咱們下一步看看一個完整的例子,即消息如何從發送到被消費的流程。