Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,專一於實時處理的異步任務隊列,同時也支持任務調度。html
通過多篇文章以後(在文末有連接),咱們介紹了 Celery 如何啓動,也介紹了 Task。本文咱們就看看收到一個任務以後,Celery(包括 Kombu)內部的消費流程脈絡(到多進程以前)。java
目的 是 作一個暫時性總結,梳理目前思路,爲下一階段分析多進程作準備。python
由於是具體流程梳理,因此會涉及到比較多的堆棧信息和運行時變量,但願你們理解。redis
以前在分析celery的worker的啓動過程當中,咱們提到了,Celery 最後開啓了loop等待任務來消費,啓動時候定義的回調函數就是 on_task_received,縮減版堆棧以下。json
on_task_received, consumer.py:542 _receive_callback, messaging.py:620 _callback, base.py:630 _deliver, base.py:980 _brpop_read, redis.py:748 on_readable, redis.py:358 create_loop, hub.py:361 asynloop, loops.py:81 start, consumer.py:592 start, bootsteps.py:116 start, consumer.py:311 start, bootsteps.py:365 start, worker.py:204 worker, worker.py:327 caller, base.py:132 new_func, decorators.py:21 invoke, core.py:610 main, core.py:782 start, base.py:358 worker_main, base.py:374
咱們能夠大體看出一個邏輯流程:windows
僅憑堆棧沒有一個總體概念,本文咱們就看看 Celery 是如何消費消息的。promise
具體咱們從 poll 開始看起,即 Redis 之中有一個新的任務消息,Celery 的 BRPOP 對應的 FD 收到了 Poll 響應。多線程
咱們從 kombu 開始看。架構
首先給出 Kombu 部分的整理邏輯圖,這樣你們就有了一個總體直觀的瞭解:app
+-------------+ +-------------------+ +-------------------------+ | hub | 1 | Transport | 2 |MultiChannelPoller | | | fileno | | cycle.on_readable(fileno) | | | cb +--------------> on_readable +-------------------------------------> _fd_to_chan[fileno] | | | | | | | | poll | | +-<---------------+ | chan.handlers[type]+---------------+ +-------------+ | _callbacks[queue]| | | | | | + | | +-------------------------+ | | | | | | +-------------------+ | | | | | | | +-----------------------+ | | | | Channel | 3 | | | | | _brpop_read | | | | | | | +----------------+ connection +<------------+ | _deliver(message, queue)| | | 5 4 | | | callback(message) | | +----------------------------------------------> callback(message)+---------------+ +-----------------------+ | | +----------------------+ | | Consumer | | on_m(message) | | | +---------------------------+ on_message | <------------+ | | | _receive_callback kombu | +----------------------+ 6 | +-----------------------------------------------------------------------------------------------------------------------+ | Celery | +---------------------------+ | Consumer | | | | | | v | | on_task_received | | | | | +---------------------------+
手機以下:
咱們在上圖中能夠看到邏輯上分爲 Kombu 和 Celery 兩個範疇,消息先從 Kombu 開始,而後來到了 Celery。
咱們首先從消息循環 hub 開始入手。
在 kombu/asynchronous/hub.py 中有以下代碼:
能夠看到,當 poll 有消息,就會調用 readers[fd] 配置的 cb。這裏的 td 就是 redis socket 對應的 fd。
簡略版代碼以下:
def create_loop(self, generator=generator, sleep=sleep, min=min, next=next, Empty=Empty, StopIteration=StopIteration, KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR): readers, writers = self.readers, self.writers poll = self.poller.poll while 1: if readers or writers: to_consolidate = [] try: events = poll(poll_timeout) for fd, event in events or (): if event & READ: cb, cbargs = readers[fd] cb(*cbargs)
readers[fd] 之中註冊的是 Transport 類的 on_readable 回調函數,因此代碼來到 Transport。
其做用爲調用 MultiChannelPoller 處理。
代碼位置爲:kombu/transport/redis.py,這裏的 cycle 就是 Transport。
def on_readable(self, fileno): """Handle AIO event for one of our file descriptors.""" self.cycle.on_readable(fileno)
此時變量爲:
fileno = {int} 34 self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
以下,邏輯跑到了 Transport:
+-------------+ +---------------+ | hub | | Transport | | | fileno | | | cb +--------------> on_readable | | | | | | poll | | | +-------------+ +---------------+
此時代碼來到 MultiChannelPoller。由前面系列文章咱們知道,MultiChannelPoller 的做用是把 Channel 和 Poll 聯繫起來。其做用爲調用 poll fd 對應的 Channel 進一步處理。
從代碼能看到,每個 fd 對應一個 Channel,由於 poll 只是告訴 Celery 某個 fd 有消息,可是具體怎麼讀消息,還須要 Celery 進一步處理。
由於 Celery 任務 使用的是 redis BRPOP 操做實現,因此此時獲取的是 BRPOP 對應的回調函數 _brpop_read。
代碼位置爲:kombu/transport/redis.py。
def on_readable(self, fileno): chan, type = self._fd_to_chan[fileno] if chan.qos.can_consume(): chan.handlers[type]()
此時變量以下,咱們能夠看到對應的各個邏輯部分:
chan.handlers[type] = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>> chan = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0> fileno = {int} 34 self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fcbaddfd048> type = {str} 'BRPOP'
此時 代碼來到 Channel。代碼爲:kombu/transport/redis.py。
Channel 這部分的做用爲調用 redis client進行讀消息,對消息進行解讀,從而提出其中的 queue(就是代碼片斷裏面的 dest 變量),這樣就知道應該哪一個用戶(即 queue 對應的用戶)來處理消息。而後使用 self.connection._deliver 對消息進行相應分發。
具體 _brpop_read 代碼以下:
def _brpop_read(self, **options): try: try: dest__item = self.client.parse_response(self.client.connection, 'BRPOP', **options) except self.connection_errors: # if there's a ConnectionError, disconnect so the next # iteration will reconnect automatically. self.client.connection.disconnect() raise if dest__item: dest, item = dest__item dest = bytes_to_str(dest).rsplit(self.sep, 1)[0] self._queue_cycle.rotate(dest) self.connection._deliver(loads(bytes_to_str(item)), dest) # 消息分發 return True else: raise Empty() finally: self._in_poll = None
此時變量爲:
dest = {str} 'celery' dest__item = {tuple: 2} 0 = {bytes: 6} b'celery' 1 = {bytes: 861} b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2- item = b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2- self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
代碼回到 Transport。
此時代碼做用爲調用 self._callbacks
的 回調函數 進行處理。能夠看出來,這裏記錄了對於 queue 的 回調。
_callback
爲:<function Channel.basic_consume.
並且能夠看出來任務消息的具體格式和內容,好比 {'exchange': '', 'routing_key': 'celery'},從這裏就能知道 對應的 queue 是什麼。
代碼位置爲:transport/virtual/base.py。
def _deliver(self, message, queue): try: callback = self._callbacks[queue] except KeyError: logger.warning(W_NO_CONSUMERS, queue) self._reject_inbound_message(message) else: callback(message)
變量以下,咱們能夠看到,Celery 此時的三個不一樣的回調就對應了三個不一樣功能。
celeryev.c755f81c-415e-478f-bb51-def341a96c0c
就是對應 Event處理;celery@.celery.pidbox
就是對應 control;celery
就是正常消息消費;self._callbacks = {dict: 3} 'celeryev.c755f81c-415e-478f-bb51-def341a96c0c' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef23048> 'celery@.celery.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56488> 'celery' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56d08> message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}} queue = {str} 'celery' self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>
此時邏輯圖以下:
+-------------+ +-------------------+ +-------------------------+ | hub | 1 | Transport | 2 |MultiChannelPoller | | | fileno | | cycle.on_readable(fileno) | | | cb +--------------> on_readable +-------------------------------------> _fd_to_chan[fileno] | | | | | | | | poll | | +-<---------------+ | chan.handlers[type]+------------+ +-------------+ | _callbacks[queue]| | | | | | | | +-------------------------+ | | | | | +-------------------+ | | | | | +-----------------+ | | | Channel | 3 | | | | _brpop_read | | | | | +----------------+ connection | <--------------+ _deliver(message, queue)| | 4 | | +-----------------+
手機以下:
代碼繼續回調到 kombu/transport/virtual/base.py。
就是 queue 的 回調函數 basic_consume。由於此時 channel 獲得了 queue 對應的 redis 消息,因此 Channel 就須要調用這個 queue 對應的回調函數。就是 調用 Consumer 的回調函數。
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs): """Consume from `queue`.""" self._tag_to_queue[consumer_tag] = queue self._active_queues.append(queue) def _callback(raw_message): message = self.Message(raw_message, channel=self) if not no_ack: self.qos.append(message, message.delivery_tag) return callback(message) self.connection._callbacks[queue] = _callback self._consumers.add(consumer_tag) self._reset_cycle()
此時 變量爲:
callback = {method} <bound method Consumer._receive_callback of <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>> message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'd raw_message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}} self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>
此時邏輯圖以下:
+-------------+ +-------------------+ +-------------------------+ | hub | 1 | Transport | 2 |MultiChannelPoller | | | fileno | | cycle.on_readable(fileno) | | | cb +--------------> on_readable +-------------------------------------> _fd_to_chan[fileno] | | | | | | | | poll | | +-<---------------+ | chan.handlers[type]+---------------+ +-------------+ | _callbacks[queue]| | | | | | + | | +-------------------------+ | | | | | | +-------------------+ | | | | | | | +-----------------------+ | | | | Channel | 3 | | | | | _brpop_read | | | | | | | +----------------+ connection +<------------+ | _deliver(message, queue)| | | 5 4 | | | callback(message) | | +----------------------------------------------> callback(message)+---------------> +-----------------------+
手機以下:
Kombu Consumer 回調的代碼位於:kombu/messaging.py
具體是調用 用戶註冊在 Kombu Consumer 的回調函數。注意的是: Kombu Comsumer 的用戶就是 Celery,因此這裏立刻就調用到了 Celery 以前註冊的回調函數。
def _receive_callback(self, message): accept = self.accept on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) if m2p: message = m2p(message) if accept is not None: message.accept = accept if message.errors: return message._reraise_error(self.on_decode_error) decoded = None if on_m else message.decode() except Exception as exc: if not self.on_decode_error: raise self.on_decode_error(message, exc) else: return on_m(message) if on_m else self.receive(decoded, message)
變量爲:
on_m = {function} <function Consumer.create_task_handler.<locals>.on_task_received at 0x7fcbaef562f0> accept = {set: 1} {'application/json'} channel = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0> m2p = {method} <bound method Channel.message_to_python of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>> message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}> self = {Consumer} <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
具體邏輯以下:
+-------------+ +-------------------+ +-------------------------+ | hub | 1 | Transport | 2 |MultiChannelPoller | | | fileno | | cycle.on_readable(fileno) | | | cb +--------------> on_readable +-------------------------------------> _fd_to_chan[fileno] | | | | | | | | poll | | +-<---------------+ | chan.handlers[type]+---------------+ +-------------+ | _callbacks[queue]| | | | | | + | | +-------------------------+ | | | | | | +-------------------+ | | | | | | | +-----------------------+ | | | | Channel | 3 | | | | | _brpop_read | | | | | | | +----------------+ connection +<------------+ | _deliver(message, queue)| | | 5 4 | | | callback(message) | | +----------------------------------------------> callback(message)+---------------+ +-----------------------+ | | +----------------------+ | | Consumer | | on_m(message) | | | +---------------------------+ on_message | <------------+ | | | _receive_callback | +----------------------+ 6 | +-----------------------------------------------------------------------------------------------------------------------+ | v
手機以下:
既然調用到了 Celery 以前註冊的回調函數,咱們實際就來到了 Celery 領域。
須要回憶下 Celery 什麼時候配置回調函數。
在 celery/worker/loops.py 中有以下代碼,這樣就讓consumer能夠回調:
def asynloop(obj, connection, consumer, blueprint, hub, qos, heartbeat, clock, hbrate=2.0): """Non-blocking event loop.""" consumer.on_message = on_task_received
回調函數位於:celery/worker/consumer/consumer.py
能夠看到,create_task_handler 函數中,返回了on_task_received,這就是回調函數。
def create_task_handler(self, promise=promise): strategies = self.strategies on_unknown_message = self.on_unknown_message on_unknown_task = self.on_unknown_task on_invalid_task = self.on_invalid_task callbacks = self.on_task_message call_soon = self.call_soon def on_task_received(message): # payload will only be set for v1 protocol, since v2 # will defer deserializing the message body to the pool. payload = None try: type_ = message.headers['task'] # protocol v2 except TypeError: return on_unknown_message(None, message) except KeyError: try: payload = message.decode() except Exception as exc: # pylint: disable=broad-except return self.on_decode_error(message, exc) try: type_, payload = payload['task'], payload # protocol v1 except (TypeError, KeyError): return on_unknown_message(payload, message) try: strategy = strategies[type_] except KeyError as exc: return on_unknown_task(None, message, exc) else: try: strategy( message, payload, promise(call_soon, (message.ack_log_error,)), promise(call_soon, (message.reject_log_error,)), callbacks, ) except (InvalidTaskError, ContentDisallowed) as exc: return on_invalid_task(payload, message, exc) except DecodeError as exc: return self.on_decode_error(message, exc) return on_task_received
此時 變量爲:
call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>> callbacks = {set: 0} set() message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}> on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>> on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>> on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>> self = {Consumer} <Consumer: celery@ demini (running)> strategies = {dict: 10} {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
此時邏輯爲:
+-------------+ +-------------------+ +-------------------------+ | hub | 1 | Transport | 2 |MultiChannelPoller | | | fileno | | cycle.on_readable(fileno) | | | cb +--------------> on_readable +-------------------------------------> _fd_to_chan[fileno] | | | | | | | | poll | | +-<---------------+ | chan.handlers[type]+---------------+ +-------------+ | _callbacks[queue]| | | | | | + | | +-------------------------+ | | | | | | +-------------------+ | | | | | | | +-----------------------+ | | | | Channel | 3 | | | | | _brpop_read | | | | | | | +----------------+ connection +<------------+ | _deliver(message, queue)| | | 5 4 | | | callback(message) | | +----------------------------------------------> callback(message)+---------------+ +-----------------------+ | | +----------------------+ | | Consumer | | on_m(message) | | | +---------------------------+ on_message | <------------+ | | | _receive_callback kombu | +----------------------+ 6 | +-----------------------------------------------------------------------------------------------------------------------+ | Celery | +---------------------------+ | Consumer | | | | | | v | | on_task_received | | | | | +---------------------------+
手機以下:
至此,咱們開始在 Celery 之中活動。
首先來到了 Celery 的 Consumer 組件,這裏從概念上說是消費的邏輯入口。
Celery Consumer 的代碼位於:celery/worker/consumer/consumer.py,其做用以下:
解析 message,從 header 中拿到 task 名字,好比 'myTest.add';
根據 task 名字,得到 strategy;
調用 strategy;
代碼爲:
def create_task_handler(self, promise=promise): strategies = self.strategies on_unknown_message = self.on_unknown_message on_unknown_task = self.on_unknown_task on_invalid_task = self.on_invalid_task callbacks = self.on_task_message call_soon = self.call_soon def on_task_received(message): # payload will only be set for v1 protocol, since v2 # will defer deserializing the message body to the pool. payload = None try: type_ = message.headers['task'] # protocol v2 except TypeError: return on_unknown_message(None, message) except KeyError: try: payload = message.decode() except Exception as exc: # pylint: disable=broad-except return self.on_decode_error(message, exc) try: type_, payload = payload['task'], payload # protocol v1 except (TypeError, KeyError): return on_unknown_message(payload, message) try: strategy = strategies[type_] except KeyError as exc: return on_unknown_task(None, message, exc) else: try: strategy( message, payload, promise(call_soon, (message.ack_log_error,)), promise(call_soon, (message.reject_log_error,)), callbacks, ) except (InvalidTaskError, ContentDisallowed) as exc: return on_invalid_task(payload, message, exc) except DecodeError as exc: return self.on_decode_error(message, exc) return on_task_received
變量爲:
self.app.tasks = {TaskRegistry: 10} {'celery.chunks': <@task: celery.chunks of myTest at 0x7fcbade229e8>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of myTest at 0x7fcbade229e8>, 'celery.chord_unlock': <@task: celery.chord_unlock of myTest at 0x7fcbade229e8>, 'celery.group': <@ call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>> callbacks = {set: 0} set() message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}> on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>> on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>> on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>> self = {Consumer} <Consumer: celery@ demini (running)> strategies = {dict: 10} {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}
經過 以下代碼得到須要對應哪一個 task,這裏就爲 'myTest.add'。
type_ = message.headers['task']
message.headers以下,咱們能夠看出來定義一個 message 都須要考慮哪些方面。
message.headers = {dict: 15} 'lang' = {str} 'py' 'task' = {str} 'myTest.add' 'id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' 'shadow' = {NoneType} None 'eta' = {NoneType} None 'expires' = {NoneType} None 'group' = {NoneType} None 'group_index' = {NoneType} None 'retries' = {int} 0 'timelimit' = {list: 2} [None, None] 'root_id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' 'parent_id' = {NoneType} None 'argsrepr' = {str} '(2, 17)' 'kwargsrepr' = {str} '{}' 'origin' = {str} 'gen19806@ demini' __len__ = {int} 15
依據 task,這裏就爲 'myTest.add',從 strategies 得到對應的回調 function,回調 function就是開始處理 任務消息。
strategies = {dict: 10} 'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef230d0> 'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23620> 'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef238c8> 'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23b70> 'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23e18> 'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48158> 'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48400> 'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef486a8> 'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48950> 'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48bf8> __len__ = {int} 10
既然獲得 strategy,好比:
<function default.<locals>.task_message_handler at 0x7fcbaef48950>
所以會調用這個函數,具體調用以下:
strategy( message, payload, promise(call_soon, (message.ack_log_error,)), promise(call_soon, (message.reject_log_error,)), callbacks, )
Strategy 的做用是在 Consumer 和 Worker 之間作一箇中間層,用來根據不一樣條件作不一樣的處理,也就是策略的本意。
代碼爲:celery/worker/strategy.py,
功能具體就是:
具體以下:
def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): if body is None and 'args' not in message.payload: body, headers, decoded, utc = ( message.body, message.headers, False, app.uses_utc_timezone(), ) else: if 'args' in message.payload: body, headers, decoded, utc = hybrid_to_proto2(message, message.payload) else: body, headers, decoded, utc = proto1_to_proto2(message, body) req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) if (req.expires or req.id in revoked_tasks) and req.revoked(): return signals.task_received.send(sender=consumer, request=req) if task_sends_events: send_event( 'task-received', uuid=req.id, name=req.name, args=req.argsrepr, kwargs=req.kwargsrepr, root_id=req.root_id, parent_id=req.parent_id, retries=req.request_dict.get('retries', 0), eta=req.eta and req.eta.isoformat(), expires=req.expires and req.expires.isoformat(), ) bucket = None eta = None if req.eta: try: if req.utc: eta = to_timestamp(to_system_tz(req.eta)) else: eta = to_timestamp(req.eta, app.timezone) except (OverflowError, ValueError) as exc: error("Couldn't convert ETA %r to timestamp: %r. Task: %r", req.eta, exc, req.info(safe=True), exc_info=True) req.reject(requeue=False) if rate_limits_enabled: bucket = get_bucket(task.name) if eta and bucket: consumer.qos.increment_eventually() return call_at(eta, limit_post_eta, (req, bucket, 1), priority=6) if eta: consumer.qos.increment_eventually() call_at(eta, apply_eta_task, (req,), priority=6) return task_message_handler if bucket: return limit_task(req, bucket, 1) task_reserved(req) if callbacks: [callback(req) for callback in callbacks] handle(req) # 在這裏 return task_message_handler
具體還要看看細節。
Strategy 中,如下目的是爲了 根據 task 實例 構建一個 Request,從而把 broker 消息,consumer,多進程都聯繫起來。
具體能夠看到 Request. execute_using_pool 這裏就會和多進程處理開始關聯,好比和 comsumer 的 pool 進程池聯繫起來。
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
task 實例爲:
myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]
得到Requst代碼爲:
def create_request_cls(base, task, pool, hostname, eventer, ref=ref, revoked_tasks=revoked_tasks, task_ready=task_ready, trace=trace_task_ret): default_time_limit = task.time_limit default_soft_time_limit = task.soft_time_limit apply_async = pool.apply_async acks_late = task.acks_late events = eventer and eventer.enabled class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id if (self.expires or task_id in revoked_tasks) and self.revoked(): raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits result = apply_async( trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result def on_success(self, failed__retval__runtime, **kwargs): failed, retval, runtime = failed__retval__runtime if failed: if isinstance(retval.exception, ( SystemExit, KeyboardInterrupt)): raise retval.exception return self.on_failure(retval, return_ok=True) task_ready(self) if acks_late: self.acknowledge() if events: self.send_event( 'task-succeeded', result=retval, runtime=runtime, ) return Request
此時邏輯以下:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +---------------------------------------------------------------------------------------+ | strategy | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +--------------------------+ +---------------------+
task_message_handler 最終調用 handle(req),就是開始調用實例。
handle 函數實際對應了 WorkController._process_task_sem。
代碼以下:
def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp): req = Req( message, on_ack=ack, on_reject=reject, app=app, hostname=hostname, eventer=eventer, task=task, connection_errors=connection_errors, body=body, headers=headers, decoded=decoded, utc=utc, ) task_reserved(req) if callbacks: [callback(req) for callback in callbacks] handle(req) return task_message_handler
Request 爲:
req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] acknowledged = {bool} False app = {Celery} <Celery myTest at 0x7fcbade229e8> args = {list: 2} [2, 17] argsrepr = {str} '(2, 17)' body = {bytes: 82} b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' chord = {NoneType} None connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>) content_encoding = {str} 'utf-8' content_type = {str} 'application/json' correlation_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' delivery_info = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None} errbacks = {NoneType} None eta = {NoneType} None eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x7fcbaeef31d0> expires = {NoneType} None group = {NoneType} None group_index = {NoneType} None hostname = {str} 'celery@ demini' id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' kwargs = {dict: 0} {} kwargsrepr = {str} '{}' message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}> name = {str} 'myTest.add' on_ack = {promise} <promise@0x7fcbaeecc210 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>> on_reject = {promise} <promise@0x7fcbaeeccf20 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>> parent_id = {NoneType} None reply_to = {str} 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93' request_dict = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None} root_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' store_errors = {bool} True task = {add} <@task: myTest.add of myTest at 0x7fcbade229e8> task_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f' task_name = {str} 'myTest.add' time_limits = {list: 2} [None, None] time_start = {NoneType} None type = {str} 'myTest.add' tzlocal = {NoneType} None utc = {bool} True worker_pid = {NoneType} None
handle 爲:
handle = {method} <bound method WorkController._process_task_sem of <Worker: celery@ demini (running)>> headers = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
此時邏輯以下:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +--------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-------+ | WorkController | +----------------+
手機以下:
程序來到了Worker in Celery。Worker 是具體執行 task 的地方。
代碼位於:celery/worker/worker.py
能夠看到,就是:
具體以下:
class WorkController: """Unmanaged worker instance.""" def register_with_event_loop(self, hub): self.blueprint.send_all( self, 'register_with_event_loop', args=(hub,), description='hub.register', ) def _process_task_sem(self, req): return self._quick_acquire(self._process_task, req) def _process_task(self, req): """Process task by sending it to the pool of workers.""" try: req.execute_using_pool(self.pool)
變量爲:
req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] self = {Worker} celery
程序來到了Worker in Celery。代碼位於:celery/worker/request.py
由於有:
apply_async = pool.apply_async
因此調用到:pool.apply_async
變量爲:
apply_async = {method} <bound method BasePool.apply_async of <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>> pool = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8> revoked_tasks = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0> self = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]
代碼爲:
class Request(base): def execute_using_pool(self, pool, **kwargs): task_id = self.task_id# 獲取任務id if (self.expires or task_id in revoked_tasks) and self.revoked():# 檢查是否過時或者是否已經執行過 raise TaskRevokedError(task_id) time_limit, soft_time_limit = self.time_limits# 獲取時間 result = apply_async(# 執行對應的func並返回結果 trace, args=(self.type, task_id, self.request_dict, self.body, self.content_type, self.content_encoding), accept_callback=self.on_accepted, timeout_callback=self.on_timeout, callback=self.on_success, error_callback=self.on_failure, soft_timeout=soft_time_limit or default_soft_time_limit, timeout=time_limit or default_time_limit, correlation_id=task_id, ) # cannot create weakref to None # pylint: disable=attribute-defined-outside-init self._apply_result = maybe(ref, result) return result
此時邏輯爲:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +--------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +-----------+ myTest.add
手機以下:
apply_async 代碼來到了Celery 的 Pool,注意,這 還不是 多進程的具體實現,只是來到了多進程實現的入口。
此時就把 任務信息具體傳遞給了Pool,好比:
args = {tuple: 6} 0 = {str} 'myTest.add' 1 = {str} 'af6ed084-efc6-4608-a13a-d3065f457cd5' 2 = {dict: 21} {'lang': 'py', 'task': 'myTest.add', 'id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen1100@DESKTOP-0GO3RPO', 'reply_to': 'afb85541-d08c-3191-b89d-918e15f9e0bf', 'correlation_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}} 3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' 4 = {str} 'application/json' 5 = {str} 'utf-8'
文件位於:celery/concurrency/base.py,具體爲:
class BasePool: """Task pool.""" def apply_async(self, target, args=None, kwargs=None, **options): """Equivalent of the :func:`apply` built-in function. Callbacks should optimally return as soon as possible since otherwise the thread which handles the result will get blocked. """ kwargs = {} if not kwargs else kwargs args = [] if not args else args return self.on_apply(target, args, kwargs, waitforslot=self.putlocks, callbacks_propagate=self.callbacks_propagate, **options)
此時變量爲:
options = {dict: 7} {'accept_callback': <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>, 'timeout_callback': <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}> self = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>
apply_async 代碼位於:celery/billiard/pool.py。
這裏在 __init__
之中,self._initargs = initargs
就是 (<Celery myTest at 0x2663db3fe48>, 'celery@DESKTOP-0GO3RPO')
,這樣就把 Celery 應用傳遞了進來。
這裏依據操做系統的而不一樣,會調用 self._taskqueue.put 或者 self._quick_put 來給 多進程 pool 發送任務消息。
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None, accept_callback=None, timeout_callback=None, waitforslot=None, soft_timeout=None, timeout=None, lost_worker_timeout=None, callbacks_propagate=(), correlation_id=None): ''' Asynchronous equivalent of `apply()` method. Callback is called when the functions return value is ready. The accept callback is called when the job is accepted to be executed. Simplified the flow is like this: >>> def apply_async(func, args, kwds, callback, accept_callback): ... if accept_callback: ... accept_callback() ... retval = func(*args, **kwds) ... if callback: ... callback(retval) ''' if self._state == RUN: waitforslot = self.putlocks if waitforslot is None else waitforslot if waitforslot and self._putlock is not None: self._putlock.acquire() result = ApplyResult( self._cache, callback, accept_callback, timeout_callback, error_callback, soft_timeout, timeout, lost_worker_timeout, on_timeout_set=self.on_timeout_set, on_timeout_cancel=self.on_timeout_cancel, callbacks_propagate=callbacks_propagate, send_ack=self.send_ack if self.synack else None, correlation_id=correlation_id, ) if timeout or soft_timeout: # start the timeout handler thread when required. self._start_timeout_handler() if self.threads: self._taskqueue.put(([(TASK, (result._job, None, func, args, kwds))], None)) else: self._quick_put((TASK, (result._job, None, func, args, kwds))) return result
變量爲:
accept_callback = {method} <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>> args = {tuple: 6} ('myTest.add', '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') callback = {method} <bound method create_request_cls.<locals>.Request.on_success of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>> error_callback = {method} <bound method Request.on_failure of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>> self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fcbaee2ea20> timeout_callback = {method} <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>> waitforslot = {bool} False
咱們首先說說以前的部分變量設置。
好比以下代碼中有:
inq, outq, synq = self.get_process_queues() 和 self._process_register_queues(w, (inq, outq, synq))
就是具體設置父進程和子進程以前的管道。
def _create_worker_process(self, i): sentinel = self._ctx.Event() if self.allow_restart else None inq, outq, synq = self.get_process_queues() on_ready_counter = self._ctx.Value('i') w = self.WorkerProcess(self.Worker( inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, wrap_exception=self._wrap_exception, max_memory_per_child=self._max_memory_per_child, on_ready_counter=on_ready_counter, )) self._pool.append(w) self._process_register_queues(w, (inq, outq, synq)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.index = i w.start() self._poolctrl[w.pid] = sentinel self._on_ready_counters[w.pid] = on_ready_counter if self.on_process_up: self.on_process_up(w) return w
好比下面是管道的創建。
def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() self._outqueue = self._ctx.SimpleQueue() self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv
以及管道相關文件的確立。
def _create_write_handlers(self, hub, pack=pack, dumps=_pickle.dumps, protocol=HIGHEST_PROTOCOL): """Create handlers used to write data to child processes.""" fileno_to_inq = self._fileno_to_inq fileno_to_synq = self._fileno_to_synq outbound = self.outbound_buffer pop_message = outbound.popleft put_message = outbound.append
因此最終預置變量具體以下:
self._taskqueue = {Queue} <queue.Queue object at 0x7fcbaee57b00> self._quick_put = {function} <function AsynPool._create_write_handlers.<locals>.send_job at 0x7fcbaef569d8> self._outqueue = {NoneType} None self._inqueue = {NoneType} None self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>} self._quick_get = {NoneType} None self._fileno_to_inq = {dict: 0} {} self.outbound_buffer = {deque: 1} deque([<%s: 0 ack:False ready:False>]) self = {Pool} <billiard.pool.Pool object at 0x000002663FD6E948> ResultHandler = {type} <class 'billiard.pool.ResultHandler'> SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'> Supervisor = {type} <class 'billiard.pool.Supervisor'> TaskHandler = {type} <class 'billiard.pool.TaskHandler'> TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'> Worker = {type} <class 'billiard.pool.Worker'>
在 windows 就是
if self.threads: self._taskqueue.put(([(TASK, (result._job, None, func, args, kwds))], None))
*nix 就是:這裏創建了job,而且發送。就是經過 put_message(job) 往子進程 pipe發消息。
def send_job(tup): # Schedule writing job request for when one of the process # inqueues are writable. body = dumps(tup, protocol=protocol) body_size = len(body) header = pack('>I', body_size) # index 1,0 is the job ID. job = get_job(tup[1][0]) job._payload = buf_t(header), buf_t(body), body_size put_message(job) self._quick_put = send_job
此時邏輯爲:
+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------------------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +----+------------------+ myTest.add | | +--------------------------------------------------------------------------------------+ | v +----+------------------+ | billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +------------+--------------+ | | put(task) | +--------------------------------------------------------------------------------------+ | Sub process | v
手機以下:
因而從下文開始,咱們正式進入多進程是如何處理消息的。
本系列目前文章以下:
[源碼解析] 並行分佈式框架 Celery 之 worker 啓動 (1)
[源碼解析] 並行分佈式框架 Celery 之 worker 啓動 (2)
[源碼解析] 分佈式任務隊列 Celery 之啓動 Consumer
[源碼解析] 並行分佈式任務隊列 Celery 之 Task是什麼