[源碼解析] 並行分佈式任務隊列 Celery 之 消費動態流程

[源碼解析] 並行分佈式任務隊列 Celery 之 消費動態流程

0x00 摘要

Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,專一於實時處理的異步任務隊列,同時也支持任務調度。html

通過多篇文章以後(在文末有連接),咱們介紹了 Celery 如何啓動,也介紹了 Task。本文咱們就看看收到一個任務以後,Celery(包括 Kombu)內部的消費流程脈絡(到多進程以前)。java

目的 是 作一個暫時性總結,梳理目前思路,爲下一階段分析多進程作準備。python

由於是具體流程梳理,因此會涉及到比較多的堆棧信息和運行時變量,但願你們理解。redis

0x01 來由

以前在分析celery的worker的啓動過程當中,咱們提到了,Celery 最後開啓了loop等待任務來消費,啓動時候定義的回調函數就是 on_task_received,縮減版堆棧以下。json

on_task_received, consumer.py:542
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_brpop_read, redis.py:748
on_readable, redis.py:358
create_loop, hub.py:361
asynloop, loops.py:81
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
main, core.py:782
start, base.py:358
worker_main, base.py:374

咱們能夠大體看出一個邏輯流程:windows

  • 在 Kombu 範疇是:
    • 在消息循環(hub) 中獲取了消息;
    • 經由 Broker抽象(Transport)和 執行引擎(MultiChannelPoller)處理以後,把消息解讀出來;
    • 開始調用 Celery 的回調函數;
  • 在 Celery 範疇是:
    • 經由回調函數這個邏輯入口開始,先調用到了 Strategy 根據不一樣條件作不一樣的處理;
    • 而後把 工做 委託給 Worker,就是由 Worker 來執行用戶 task;
    • 由於須要提升效率,因此須要有多線程處理,就是運行多個線程執行多個 worker;

僅憑堆棧沒有一個總體概念,本文咱們就看看 Celery 是如何消費消息的。promise

具體咱們從 poll 開始看起,即 Redis 之中有一個新的任務消息,Celery 的 BRPOP 對應的 FD 收到了 Poll 響應多線程

0x02 邏輯 in kombu

咱們從 kombu 開始看。架構

首先給出 Kombu 部分的整理邏輯圖,這樣你們就有了一個總體直觀的瞭解:app

+-------------+        +-------------------+                                +-------------------------+
 | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
 |             | fileno |                   |      cycle.on_readable(fileno) |                         |
 |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
 |             |        |                   |                                |                         |
 |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
 +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                        |        +          |                 |              +-------------------------+           |
                        |        |          |                 |                                                    |
                        +-------------------+                 |                                                    |
                                 |                            |                                                    |
                                 |                            |              +-----------------------+             |
                                 |                            |              | Channel               |      3      |
                                 |                            |              |                       | _brpop_read |
                                 |                            |              |                       |             |
                                 |                            +----------------+ connection          +<------------+
                                 |                   _deliver(message, queue)|                       |
                                 |        5                      4           |                       |
                                 |     callback(message)                     |                       |
                                 +----------------------------------------------> callback(message)+---------------+
                                                                             +-----------------------+             |
                                                                                                                   |
                                                                             +----------------------+              |
                                                                             | Consumer             |              |
                                                             on_m(message)   |                      |              |
                                                      +---------------------------+  on_message     | <------------+
                                                      |                      |                      |  _receive_callback
    kombu                                             |                      +----------------------+          6
                                                      |
+-----------------------------------------------------------------------------------------------------------------------+
                                                      |
    Celery                                            |
                                         +---------------------------+
                                         | Consumer   |              |
                                         |            |              |
                                         |            v              |
                                         |      on_task_received     |
                                         |                           |
                                         |                           |
                                         +---------------------------+

手機以下:

咱們在上圖中能夠看到邏輯上分爲 Kombu 和 Celery 兩個範疇,消息先從 Kombu 開始,而後來到了 Celery。

2.1 消息循環 -- hub in kombu

咱們首先從消息循環 hub 開始入手。

在 kombu/asynchronous/hub.py 中有以下代碼:

能夠看到,當 poll 有消息,就會調用 readers[fd] 配置的 cb這裏的 td 就是 redis socket 對應的 fd

簡略版代碼以下:

def create_loop(self,
                generator=generator, sleep=sleep, min=min, next=next,
                Empty=Empty, StopIteration=StopIteration,
                KeyError=KeyError, READ=READ, WRITE=WRITE, ERR=ERR):
    
    readers, writers = self.readers, self.writers
    poll = self.poller.poll
    
    while 1:
        if readers or writers:
            to_consolidate = []
            try:
                events = poll(poll_timeout)

            for fd, event in events or ():

                if event & READ:
                   cb, cbargs = readers[fd]

                cb(*cbargs)

2.2 Broker抽象 -- Transport in kombu

readers[fd] 之中註冊的是 Transport 類的 on_readable 回調函數,因此代碼來到 Transport。

其做用爲調用 MultiChannelPoller 處理。

代碼位置爲:kombu/transport/redis.py,這裏的 cycle 就是 Transport。

def on_readable(self, fileno):
    """Handle AIO event for one of our file descriptors."""
    self.cycle.on_readable(fileno)

此時變量爲:

fileno = {int} 34

self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>

以下,邏輯跑到了 Transport:

+-------------+         +---------------+
|   hub       |         |   Transport   |
|             | fileno  |               |
|       cb +--------------> on_readable |
|             |         |               |
|      poll   |         |               |
+-------------+         +---------------+

2.3 執行引擎 --- MultiChannelPoller in kombu

此時代碼來到 MultiChannelPoller。由前面系列文章咱們知道,MultiChannelPoller 的做用是把 Channel 和 Poll 聯繫起來。其做用爲調用 poll fd 對應的 Channel 進一步處理

從代碼能看到,每個 fd 對應一個 Channel,由於 poll 只是告訴 Celery 某個 fd 有消息,可是具體怎麼讀消息,還須要 Celery 進一步處理

由於 Celery 任務 使用的是 redis BRPOP 操做實現,因此此時獲取的是 BRPOP 對應的回調函數 _brpop_read。

代碼位置爲:kombu/transport/redis.py。

def on_readable(self, fileno):
    chan, type = self._fd_to_chan[fileno]
    if chan.qos.can_consume():
        chan.handlers[type]()

此時變量以下,咱們能夠看到對應的各個邏輯部分:

chan.handlers[type] = {method} <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>

chan = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>

fileno = {int} 34

self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fcbaddfd048>

type = {str} 'BRPOP'

2.4 解讀消息 -- Channel in kombu

此時 代碼來到 Channel。代碼爲:kombu/transport/redis.py。

Channel 這部分的做用爲調用 redis client進行讀消息,對消息進行解讀,從而提出其中的 queue(就是代碼片斷裏面的 dest 變量),這樣就知道應該哪一個用戶(即 queue 對應的用戶)來處理消息。而後使用 self.connection._deliver 對消息進行相應分發

具體 _brpop_read 代碼以下:

def _brpop_read(self, **options):
    try:
        try:
            dest__item = self.client.parse_response(self.client.connection,
                                                    'BRPOP',
                                                    **options)
        except self.connection_errors:
            # if there's a ConnectionError, disconnect so the next
            # iteration will reconnect automatically.
            self.client.connection.disconnect()
            raise
        if dest__item:
            dest, item = dest__item
            dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
            self._queue_cycle.rotate(dest)
            self.connection._deliver(loads(bytes_to_str(item)), dest) # 消息分發
            return True
        else:
            raise Empty()
    finally:
        self._in_poll = None

此時變量爲:

dest = {str} 'celery'

dest__item = {tuple: 2} 
 0 = {bytes: 6} b'celery'
 1 = {bytes: 861} b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-

item = b'{"body": "W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "myTest.add", "id": "863cf9b2-

self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>

2.5 開始回調 -- Transport in kombu

代碼回到 Transport。

此時代碼做用爲調用 self._callbacks 的 回調函數 進行處理。能夠看出來,這裏記錄了對於 queue 的 回調。

_callback 爲:<function Channel.basic_consume. ._callback at 0x7fcbaef56d08>。

並且能夠看出來任務消息的具體格式和內容,好比 {'exchange': '', 'routing_key': 'celery'},從這裏就能知道 對應的 queue 是什麼。

代碼位置爲:transport/virtual/base.py。

def _deliver(self, message, queue):
    try:
        callback = self._callbacks[queue]
    except KeyError:
        logger.warning(W_NO_CONSUMERS, queue)
        self._reject_inbound_message(message)
    else:
        callback(message)

變量以下,咱們能夠看到,Celery 此時的三個不一樣的回調就對應了三個不一樣功能。

  • celeryev.c755f81c-415e-478f-bb51-def341a96c0c 就是對應 Event處理;
  • celery@.celery.pidbox 就是對應 control;
  • celery 就是正常消息消費;
self._callbacks = {dict: 3} 
 'celeryev.c755f81c-415e-478f-bb51-def341a96c0c' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef23048>
 'celery@.celery.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56488>
 'celery' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fcbaef56d08>

message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}

queue = {str} 'celery'
    
self = {Transport} <kombu.transport.redis.Transport object at 0x7fcbaeeb6710>

此時邏輯圖以下:

+-------------+        +-------------------+                                +-------------------------+
| hub         |    1   | Transport         |                2               |MultiChannelPoller       |
|             | fileno |                   |      cycle.on_readable(fileno) |                         |
|       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
|             |        |                   |                                |                         |
|      poll   |        |                   +-<---------------+              |  chan.handlers[type]+------------+
+-------------+        |  _callbacks[queue]|                 |              |                         |        |
                       |                   |                 |              +-------------------------+        |
                       |                   |                 |                                                 |
                       +-------------------+                 |                                                 |
                                                             |                                                 |
                                                             |              +-----------------+                |
                                                             |              | Channel         |        3       |
                                                             |              |                 |   _brpop_read  |
                                                             |              |                 |                |
                                                             +----------------+ connection    | <--------------+
                                                    _deliver(message, queue)|                 |
                                                                4           |                 |
                                                                            +-----------------+

手機以下:

2.6 開始回調 -- Channel in kombu

代碼繼續回調到 kombu/transport/virtual/base.py。

就是 queue 的 回調函數 basic_consume。由於此時 channel 獲得了 queue 對應的 redis 消息,因此 Channel 就須要調用這個 queue 對應的回調函數。就是 調用 Consumer 的回調函數

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
    """Consume from `queue`."""
    self._tag_to_queue[consumer_tag] = queue
    self._active_queues.append(queue)

    def _callback(raw_message):
        message = self.Message(raw_message, channel=self)
        if not no_ack:
            self.qos.append(message, message.delivery_tag)
        return callback(message)

    self.connection._callbacks[queue] = _callback
    self._consumers.add(consumer_tag)

    self._reset_cycle()

此時 變量爲:

callback = {method} <bound method Consumer._receive_callback of <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>>
    
message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'd

raw_message = {dict: 5} {'body': 'W1syLCAxN10sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==', 'content-encoding': 'utf-8', 'content-type': 'application/json', 'headers': {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini'}, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc'}}
                                                                    
self = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>

此時邏輯圖以下:

+-------------+        +-------------------+                                +-------------------------+
| hub         |    1   | Transport         |                2               |MultiChannelPoller       |
|             | fileno |                   |      cycle.on_readable(fileno) |                         |
|       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
|             |        |                   |                                |                         |
|      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
+-------------+        |  _callbacks[queue]|                 |              |                         |           |
                       |        +          |                 |              +-------------------------+           |
                       |        |          |                 |                                                    |
                       +-------------------+                 |                                                    |
                                |                            |                                                    |
                                |                            |              +-----------------------+             |
                                |                            |              | Channel               |      3      |
                                |                            |              |                       | _brpop_read |
                                |                            |              |                       |             |
                                |                            +----------------+ connection          +<------------+
                                |                   _deliver(message, queue)|                       |
                                |        5                      4           |                       |
                                |     callback(message)                     |                       |
                                +----------------------------------------------> callback(message)+--------------->
                                                                            +-----------------------+

手機以下:

2.7 調用回調 -- Consumer in kombu

Kombu Consumer 回調的代碼位於:kombu/messaging.py

具體是調用 用戶註冊在 Kombu Consumer 的回調函數。注意的是: Kombu Comsumer 的用戶就是 Celery,因此這裏立刻就調用到了 Celery 以前註冊的回調函數。

def _receive_callback(self, message):
    accept = self.accept
    on_m, channel, decoded = self.on_message, self.channel, None
    try:
        m2p = getattr(channel, 'message_to_python', None)
        if m2p:
            message = m2p(message)
        if accept is not None:
            message.accept = accept
        if message.errors:
            return message._reraise_error(self.on_decode_error)
        decoded = None if on_m else message.decode()
    except Exception as exc:
        if not self.on_decode_error:
            raise
        self.on_decode_error(message, exc)
    else:
        return on_m(message) if on_m else self.receive(decoded, message)

變量爲:

on_m = {function} <function Consumer.create_task_handler.<locals>.on_task_received at 0x7fcbaef562f0>

accept = {set: 1} {'application/json'}

channel = {Channel} <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>

m2p = {method} <bound method Channel.message_to_python of <kombu.transport.redis.Channel object at 0x7fcbaeeb68d0>>

message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>

self = {Consumer} <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>

具體邏輯以下:

+-------------+        +-------------------+                                +-------------------------+
 | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
 |             | fileno |                   |      cycle.on_readable(fileno) |                         |
 |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
 |             |        |                   |                                |                         |
 |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
 +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                        |        +          |                 |              +-------------------------+           |
                        |        |          |                 |                                                    |
                        +-------------------+                 |                                                    |
                                 |                            |                                                    |
                                 |                            |              +-----------------------+             |
                                 |                            |              | Channel               |      3      |
                                 |                            |              |                       | _brpop_read |
                                 |                            |              |                       |             |
                                 |                            +----------------+ connection          +<------------+
                                 |                   _deliver(message, queue)|                       |
                                 |        5                      4           |                       |
                                 |     callback(message)                     |                       |
                                 +----------------------------------------------> callback(message)+---------------+
                                                                             +-----------------------+             |
                                                                                                                   |
                                                                             +----------------------+              |
                                                                             | Consumer             |              |
                                                             on_m(message)   |                      |              |
                                                      +---------------------------+  on_message     | <------------+
                                                      |                      |                      |  _receive_callback
                                                      |                      +----------------------+          6
                                                      |
+-----------------------------------------------------------------------------------------------------------------------+
                                                      |
                                                      v

手機以下:

2.8 來到 Celery 範疇 -- Consumer in Celery

既然調用到了 Celery 以前註冊的回調函數,咱們實際就來到了 Celery 領域。

2.8.1 配置回調

須要回憶下 Celery 什麼時候配置回調函數。

在 celery/worker/loops.py 中有以下代碼,這樣就讓consumer能夠回調:

def asynloop(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0):
    """Non-blocking event loop."""
    consumer.on_message = on_task_received

2.8.2 回調函數

回調函數位於:celery/worker/consumer/consumer.py

能夠看到,create_task_handler 函數中,返回了on_task_received,這就是回調函數。

def create_task_handler(self, promise=promise):
    strategies = self.strategies
    on_unknown_message = self.on_unknown_message
    on_unknown_task = self.on_unknown_task
    on_invalid_task = self.on_invalid_task
    callbacks = self.on_task_message
    call_soon = self.call_soon

    def on_task_received(message):
        # payload will only be set for v1 protocol, since v2
        # will defer deserializing the message body to the pool.
        payload = None
        try:
            type_ = message.headers['task']                # protocol v2
        except TypeError:
            return on_unknown_message(None, message)
        except KeyError:
            try:
                payload = message.decode()
            except Exception as exc:  # pylint: disable=broad-except
                return self.on_decode_error(message, exc)
            try:
                type_, payload = payload['task'], payload  # protocol v1
            except (TypeError, KeyError):
                return on_unknown_message(payload, message)
        try:
            strategy = strategies[type_]
        except KeyError as exc:
            return on_unknown_task(None, message, exc)
        else:
            try:
                strategy(
                    message, payload,
                    promise(call_soon, (message.ack_log_error,)),
                    promise(call_soon, (message.reject_log_error,)),
                    callbacks,
                )
            except (InvalidTaskError, ContentDisallowed) as exc:
                return on_invalid_task(payload, message, exc)
            except DecodeError as exc:
                return self.on_decode_error(message, exc)

    return on_task_received

此時 變量爲:

call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>

callbacks = {set: 0} set()

message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>

on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>

on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>

on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>

self = {Consumer} <Consumer: celery@ demini (running)>

strategies = {dict: 10} {'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}

此時邏輯爲:

+-------------+        +-------------------+                                +-------------------------+
 | hub         |    1   | Transport         |                2               |MultiChannelPoller       |
 |             | fileno |                   |      cycle.on_readable(fileno) |                         |
 |       cb +--------------> on_readable   +-------------------------------------> _fd_to_chan[fileno] |
 |             |        |                   |                                |                         |
 |      poll   |        |                   +-<---------------+              |  chan.handlers[type]+---------------+
 +-------------+        |  _callbacks[queue]|                 |              |                         |           |
                        |        +          |                 |              +-------------------------+           |
                        |        |          |                 |                                                    |
                        +-------------------+                 |                                                    |
                                 |                            |                                                    |
                                 |                            |              +-----------------------+             |
                                 |                            |              | Channel               |      3      |
                                 |                            |              |                       | _brpop_read |
                                 |                            |              |                       |             |
                                 |                            +----------------+ connection          +<------------+
                                 |                   _deliver(message, queue)|                       |
                                 |        5                      4           |                       |
                                 |     callback(message)                     |                       |
                                 +----------------------------------------------> callback(message)+---------------+
                                                                             +-----------------------+             |
                                                                                                                   |
                                                                             +----------------------+              |
                                                                             | Consumer             |              |
                                                             on_m(message)   |                      |              |
                                                      +---------------------------+  on_message     | <------------+
                                                      |                      |                      |  _receive_callback
    kombu                                             |                      +----------------------+          6
                                                      |
+-----------------------------------------------------------------------------------------------------------------------+
                                                      |
    Celery                                            |
                                         +---------------------------+
                                         | Consumer   |              |
                                         |            |              |
                                         |            v              |
                                         |      on_task_received     |
                                         |                           |
                                         |                           |
                                         +---------------------------+

手機以下:

0x03 邏輯 in Celery

至此,咱們開始在 Celery 之中活動。

3.1 邏輯入口 --- consumer in Celery

首先來到了 Celery 的 Consumer 組件,這裏從概念上說是消費的邏輯入口

Celery Consumer 的代碼位於:celery/worker/consumer/consumer.py,其做用以下:

  • 解析 message,從 header 中拿到 task 名字,好比 'myTest.add';

  • 根據 task 名字,得到 strategy;

  • 調用 strategy;

代碼爲:

def create_task_handler(self, promise=promise):
    strategies = self.strategies
    on_unknown_message = self.on_unknown_message
    on_unknown_task = self.on_unknown_task
    on_invalid_task = self.on_invalid_task
    callbacks = self.on_task_message
    call_soon = self.call_soon

    def on_task_received(message):
        # payload will only be set for v1 protocol, since v2
        # will defer deserializing the message body to the pool.
        payload = None
        try:
            type_ = message.headers['task']                # protocol v2
        except TypeError:
            return on_unknown_message(None, message)
        except KeyError:
            try:
                payload = message.decode()
            except Exception as exc:  # pylint: disable=broad-except
                return self.on_decode_error(message, exc)
            try:
                type_, payload = payload['task'], payload  # protocol v1
            except (TypeError, KeyError):
                return on_unknown_message(payload, message)
        try:
            strategy = strategies[type_]
        except KeyError as exc:
            return on_unknown_task(None, message, exc)
        else:
            try:
                strategy(
                    message, payload,
                    promise(call_soon, (message.ack_log_error,)),
                    promise(call_soon, (message.reject_log_error,)),
                    callbacks,
                )
            except (InvalidTaskError, ContentDisallowed) as exc:
                return on_invalid_task(payload, message, exc)
            except DecodeError as exc:
                return self.on_decode_error(message, exc)

    return on_task_received

變量爲:

self.app.tasks = {TaskRegistry: 10} {'celery.chunks': <@task: celery.chunks of myTest at 0x7fcbade229e8>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of myTest at 0x7fcbade229e8>, 'celery.chord_unlock': <@task: celery.chord_unlock of myTest at 0x7fcbade229e8>, 'celery.group': <@

                                     call_soon = {method} <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>
                                     
callbacks = {set: 0} set()
                                     
message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
                                     
on_invalid_task = {method} <bound method Consumer.on_invalid_task of <Consumer: celery@ demini (running)>>
                                     
on_unknown_message = {method} <bound method Consumer.on_unknown_message of <Consumer: celery@ demini (running)>>
                                     
on_unknown_task = {method} <bound method Consumer.on_unknown_task of <Consumer: celery@ demini (running)>>    
                                     
self = {Consumer} <Consumer: celery@ demini (running)>
                                     
strategies = {dict: 10} 
{'celery.chunks': <function default.<locals>.task_message_handler at 0x7fcbaef230d0>, 'celery.backend_cleanup': <function default.<locals>.task_message_handler at 0x7fcbaef23620>, 'celery.chord_unlock': <function default.<locals>.task_message_handler at 0x7fcbaef238c8>, 'celery.group': <function default.<locals>.task_message_handler at 0x7fcbaef23b70>, 'celery.map': <function default.<locals>.task_message_handler at 0x7fcbaef23e18>, 'celery.chain': <function default.<locals>.task_message_handler at 0x7fcbaef48158>, 'celery.starmap': <function default.<locals>.task_message_handler at 0x7fcbaef48400>, 'celery.chord': <function default.<locals>.task_message_handler at 0x7fcbaef486a8>, 'myTest.add': <function default.<locals>.task_message_handler at 0x7fcbaef48950>, 'celery.accumulate': <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>}

3.1.1 解析 message

經過 以下代碼得到須要對應哪一個 task,這裏就爲 'myTest.add'。

type_ = message.headers['task']

message.headers以下,咱們能夠看出來定義一個 message 都須要考慮哪些方面。

message.headers = {dict: 15} 
 'lang' = {str} 'py'
 'task' = {str} 'myTest.add'
 'id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 'shadow' = {NoneType} None
 'eta' = {NoneType} None
 'expires' = {NoneType} None
 'group' = {NoneType} None
 'group_index' = {NoneType} None
 'retries' = {int} 0
 'timelimit' = {list: 2} [None, None]
 'root_id' = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 'parent_id' = {NoneType} None
 'argsrepr' = {str} '(2, 17)'
 'kwargsrepr' = {str} '{}'
 'origin' = {str} 'gen19806@ demini'
 __len__ = {int} 15

3.1.2 得到 strategy

依據 task,這裏就爲 'myTest.add',從 strategies 得到對應的回調 function,回調 function就是開始處理 任務消息。

strategies = {dict: 10} 
 'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef230d0>
 'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23620>
 'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef238c8>
 'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23b70>
 'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef23e18>
 'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48158>
 'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48400>
 'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef486a8>
 'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48950>
 'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fcbaef48bf8>
 __len__ = {int} 10

3.1.3 調用 strategy

既然獲得 strategy,好比:

<function default.<locals>.task_message_handler at 0x7fcbaef48950>

所以會調用這個函數,具體調用以下:

strategy(
    message, payload,
    promise(call_soon, (message.ack_log_error,)),
    promise(call_soon, (message.reject_log_error,)),
    callbacks,
)

3.2 策略 --- strategy

Strategy 的做用是在 Consumer 和 Worker 之間作一箇中間層,用來根據不一樣條件作不一樣的處理,也就是策略的本意。

3.2.1 邏輯 in strategy

代碼爲:celery/worker/strategy.py,

功能具體就是:

  • 進一步解析消息;
  • 根據消息構建內部的Req;
  • 若是須要發送,則發送 'task-received'’;
  • 進行時間 eta 處理;
  • 進行qos 和 limit 處理;
  • 調用 Req ,即來到 Worker;

具體以下:

def task_message_handler(message, body, ack, reject, callbacks,
                         to_timestamp=to_timestamp):
    if body is None and 'args' not in message.payload:
        body, headers, decoded, utc = (
            message.body, message.headers, False, app.uses_utc_timezone(),
        )
    else:
        if 'args' in message.payload:
            body, headers, decoded, utc = hybrid_to_proto2(message,
                                                           message.payload)
        else:
            body, headers, decoded, utc = proto1_to_proto2(message, body)

    req = Req(
        message,
        on_ack=ack, on_reject=reject, app=app, hostname=hostname,
        eventer=eventer, task=task, connection_errors=connection_errors,
        body=body, headers=headers, decoded=decoded, utc=utc,
    )
  
    if (req.expires or req.id in revoked_tasks) and req.revoked():
        return

    signals.task_received.send(sender=consumer, request=req)

    if task_sends_events:
        send_event(
            'task-received',
            uuid=req.id, name=req.name,
            args=req.argsrepr, kwargs=req.kwargsrepr,
            root_id=req.root_id, parent_id=req.parent_id,
            retries=req.request_dict.get('retries', 0),
            eta=req.eta and req.eta.isoformat(),
            expires=req.expires and req.expires.isoformat(),
        )

    bucket = None
    eta = None
    if req.eta:
        try:
            if req.utc:
                eta = to_timestamp(to_system_tz(req.eta))
            else:
                eta = to_timestamp(req.eta, app.timezone)
        except (OverflowError, ValueError) as exc:
            error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
                  req.eta, exc, req.info(safe=True), exc_info=True)
            req.reject(requeue=False)
    if rate_limits_enabled:
        bucket = get_bucket(task.name)

    if eta and bucket:
        consumer.qos.increment_eventually()
        return call_at(eta, limit_post_eta, (req, bucket, 1),
                       priority=6)
    if eta:
        consumer.qos.increment_eventually()
        call_at(eta, apply_eta_task, (req,), priority=6)
        return task_message_handler
    if bucket:
        return limit_task(req, bucket, 1)

    task_reserved(req)
    if callbacks:
        [callback(req) for callback in callbacks]
        
    handle(req) # 在這裏
    
return task_message_handler

具體還要看看細節。

3.2.2 得到實例

Strategy 中,如下目的是爲了 根據 task 實例 構建一個 Request,從而把 broker 消息,consumer,多進程都聯繫起來

具體能夠看到 Request. execute_using_pool 這裏就會和多進程處理開始關聯,好比和 comsumer 的 pool 進程池聯繫起來。

Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)

task 實例爲:

myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]

得到Requst代碼爲:

def create_request_cls(base, task, pool, hostname, eventer,
                       ref=ref, revoked_tasks=revoked_tasks,
                       task_ready=task_ready, trace=trace_task_ret):
    default_time_limit = task.time_limit
    default_soft_time_limit = task.soft_time_limit
    apply_async = pool.apply_async
    acks_late = task.acks_late
    events = eventer and eventer.enabled

    class Request(base):

        def execute_using_pool(self, pool, **kwargs):
            task_id = self.task_id
            if (self.expires or task_id in revoked_tasks) and self.revoked():
                raise TaskRevokedError(task_id)

            time_limit, soft_time_limit = self.time_limits
            result = apply_async(
                trace,
                args=(self.type, task_id, self.request_dict, self.body,
                      self.content_type, self.content_encoding),
                accept_callback=self.on_accepted,
                timeout_callback=self.on_timeout,
                callback=self.on_success,
                error_callback=self.on_failure,
                soft_timeout=soft_time_limit or default_soft_time_limit,
                timeout=time_limit or default_time_limit,
                correlation_id=task_id,
            )
            # cannot create weakref to None
            # pylint: disable=attribute-defined-outside-init
            self._apply_result = maybe(ref, result)
            return result

        def on_success(self, failed__retval__runtime, **kwargs):
            failed, retval, runtime = failed__retval__runtime
            if failed:
                if isinstance(retval.exception, (
                        SystemExit, KeyboardInterrupt)):
                    raise retval.exception
                return self.on_failure(retval, return_ok=True)
            task_ready(self)

            if acks_late:
                self.acknowledge()

            if events:
                self.send_event(
                    'task-succeeded', result=retval, runtime=runtime,
                )

    return Request

此時邏輯以下:

+
  Consumer               |
                 message |
                         v         strategy  +------------------------------------+
            +------------+------+            | strategies                         |
            | on_task_received  | <--------+ |                                    |
            |                   |            |[myTest.add : task_message_handler] |
            +------------+------+            +------------------------------------+
                         |
                         |
+---------------------------------------------------------------------------------------+
                         |
 strategy                |
                         |
                         v                Request [myTest.add]
            +------------+-------------+                       +---------------------+
            | task_message_handler     | <-------------------+ | create_request_cls  |
            |                          |                       |                     |
            +--------------------------+                       +---------------------+

3.2.3 調用實例

task_message_handler 最終調用 handle(req),就是開始調用實例。

handle 函數實際對應了 WorkController._process_task_sem。

代碼以下:

def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):

        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )

        task_reserved(req)
        
        if callbacks:
            [callback(req) for callback in callbacks]
            
        handle(req)
        
    return task_message_handler

Request 爲:

req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  
 acknowledged = {bool} False
 app = {Celery} <Celery myTest at 0x7fcbade229e8>
 args = {list: 2} [2, 17]
 argsrepr = {str} '(2, 17)'
 body = {bytes: 82} b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
 chord = {NoneType} None
 connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
 content_encoding = {str} 'utf-8'
 content_type = {str} 'application/json'
 correlation_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 delivery_info = {dict: 4} {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}
 errbacks = {NoneType} None
 eta = {NoneType} None
 eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x7fcbaeef31d0>
 expires = {NoneType} None
 group = {NoneType} None
 group_index = {NoneType} None
 hostname = {str} 'celery@ demini'
 id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 kwargs = {dict: 0} {}
 kwargsrepr = {str} '{}'
 message = {Message} <Message object at 0x7fcbaef3eaf8 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'cfa3a261-c9b4-4d7e-819c-37608c0bb0cc', 'body_length': 82, 'properties': {'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'}, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}}>
 name = {str} 'myTest.add'
 on_ack = {promise} <promise@0x7fcbaeecc210 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
 on_reject = {promise} <promise@0x7fcbaeeccf20 --> <bound method Consumer.call_soon of <Consumer: celery@ demini (running)>>>
 parent_id = {NoneType} None
 reply_to = {str} 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93'
 request_dict = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}
 root_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 store_errors = {bool} True
 task = {add} <@task: myTest.add of myTest at 0x7fcbade229e8>
 task_id = {str} '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f'
 task_name = {str} 'myTest.add'
 time_limits = {list: 2} [None, None]
 time_start = {NoneType} None
 type = {str} 'myTest.add'
 tzlocal = {NoneType} None
 utc = {bool} True
 worker_pid = {NoneType} None

handle 爲:

handle = {method} <bound method WorkController._process_task_sem of <Worker: celery@ demini (running)>>
headers = {dict: 25} {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}

此時邏輯以下:

+
  Consumer               |
                 message |
                         v         strategy  +------------------------------------+
            +------------+------+            | strategies                         |
            | on_task_received  | <--------+ |                                    |
            |                   |            |[myTest.add : task_message_handler] |
            +------------+------+            +------------------------------------+
                         |
                         |
 +------------------------------------------------------------------------------------+
 strategy                |
                         |
                         |
                         v                Request [myTest.add]
            +------------+-------------+                       +---------------------+
            | task_message_handler     | <-------------------+ | create_request_cls  |
            |                          |                       |                     |
            +------------+-------------+                       +---------------------+
                         | _process_task_sem
                         | 
+--------------------------------------------------------------------------------------+
 Worker                  | req[{Request} myTest.add]
                         v
                +--------+-------+
                | WorkController |
                +----------------+

手機以下:

3.3 打工人 -- Worker in Celery

程序來到了Worker in Celery。Worker 是具體執行 task 的地方

代碼位於:celery/worker/worker.py

能夠看到,就是:

  • _process_task_sem 調用了 _process_task;
  • _process_task 調用了req.execute_using_pool(self.pool);

具體以下:

class WorkController:
    """Unmanaged worker instance."""

    def register_with_event_loop(self, hub):
        self.blueprint.send_all(
            self, 'register_with_event_loop', args=(hub,),
            description='hub.register',
        )

    def _process_task_sem(self, req):
        return self._quick_acquire(self._process_task, req)

    def _process_task(self, req):
        """Process task by sending it to the pool of workers."""
        try:
            req.execute_using_pool(self.pool)

變量爲:

req = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]  

self = {Worker} celery

3.3.1 Request in Celery

程序來到了Worker in Celery。代碼位於:celery/worker/request.py

由於有:

apply_async = pool.apply_async

因此調用到:pool.apply_async

變量爲:

apply_async = {method} <bound method BasePool.apply_async of <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>>

pool = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>

revoked_tasks = {LimitedSet: 0} <LimitedSet(0): maxlen=50000, expires=10800, minlen=0>
    
self = {Request} myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f]

代碼爲:

class Request(base):

    def execute_using_pool(self, pool, **kwargs):
        task_id = self.task_id# 獲取任務id
        if (self.expires or task_id in revoked_tasks) and self.revoked():# 檢查是否過時或者是否已經執行過
            raise TaskRevokedError(task_id)

        time_limit, soft_time_limit = self.time_limits# 獲取時間
        result = apply_async(# 執行對應的func並返回結果
            trace,
            args=(self.type, task_id, self.request_dict, self.body,
                  self.content_type, self.content_encoding),
            accept_callback=self.on_accepted,
            timeout_callback=self.on_timeout,
            callback=self.on_success,
            error_callback=self.on_failure,
            soft_timeout=soft_time_limit or default_soft_time_limit,
            timeout=time_limit or default_time_limit,
            correlation_id=task_id,
        )
        # cannot create weakref to None
        # pylint: disable=attribute-defined-outside-init
        self._apply_result = maybe(ref, result)
        return result

此時邏輯爲:

+
  Consumer               |
                 message |
                         v         strategy  +------------------------------------+
            +------------+------+            | strategies                         |
            | on_task_received  | <--------+ |                                    |
            |                   |            |[myTest.add : task_message_handler] |
            +------------+------+            +------------------------------------+
                         |
                         |
 +------------------------------------------------------------------------------------+
 strategy                |
                         |
                         |
                         v                Request [myTest.add]
            +------------+-------------+                       +---------------------+
            | task_message_handler     | <-------------------+ | create_request_cls  |
            |                          |                       |                     |
            +------------+-------------+                       +---------------------+
                         | _process_task_sem
                         |
+--------------------------------------------------------------------------------------+
 Worker                  | req[{Request} myTest.add]
                         v
                +--------+-----------+
                | WorkController     |
                |                    |
                |            pool +-------------------------+
                +--------+-----------+                      |
                         |                                  |
                         |               apply_async        v
             +-----------+----------+                   +---+-------+
             |{Request} myTest.add  | +---------------> | TaskPool  |
             +----------------------+                   +-----------+
                                        myTest.add

手機以下:

3.3.2 BasePool in Celery

apply_async 代碼來到了Celery 的 Pool,注意,這 還不是 多進程的具體實現,只是來到了多進程實現的入口

此時就把 任務信息具體傳遞給了Pool,好比:

args = {tuple: 6} 
 0 = {str} 'myTest.add'
 1 = {str} 'af6ed084-efc6-4608-a13a-d3065f457cd5'
 2 = {dict: 21} {'lang': 'py', 'task': 'myTest.add', 'id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'parent_id': None, 'argsrepr': '(2, 8)', 'kwargsrepr': '{}', 'origin': 'gen1100@DESKTOP-0GO3RPO', 'reply_to': 'afb85541-d08c-3191-b89d-918e15f9e0bf', 'correlation_id': 'af6ed084-efc6-4608-a13a-d3065f457cd5', 'hostname': 'celery@DESKTOP-0GO3RPO', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 8], 'kwargs': {}}
 3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
 4 = {str} 'application/json'
 5 = {str} 'utf-8'

文件位於:celery/concurrency/base.py,具體爲:

class BasePool:
    """Task pool."""

    def apply_async(self, target, args=None, kwargs=None, **options):
        """Equivalent of the :func:`apply` built-in function.

        Callbacks should optimally return as soon as possible since
        otherwise the thread which handles the result will get blocked.
        """
        kwargs = {} if not kwargs else kwargs
        args = [] if not args else args

        return self.on_apply(target, args, kwargs,
                             waitforslot=self.putlocks,
                             callbacks_propagate=self.callbacks_propagate,
                             **options)

此時變量爲:

options = {dict: 7} {'accept_callback': <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>, 'timeout_callback': <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>

self = {TaskPool} <celery.concurrency.prefork.TaskPool object at 0x7fcbaddfa2e8>

3.3.3 AsynPool in Celery

apply_async 代碼位於:celery/billiard/pool.py。

這裏在 __init__ 之中,self._initargs = initargs 就是 (<Celery myTest at 0x2663db3fe48>, 'celery@DESKTOP-0GO3RPO')這樣就把 Celery 應用傳遞了進來

這裏依據操做系統的而不一樣,會調用 self._taskqueue.put 或者 self._quick_put 來給 多進程 pool 發送任務消息。

def apply_async(self, func, args=(), kwds={},
                callback=None, error_callback=None, accept_callback=None,
                timeout_callback=None, waitforslot=None,
                soft_timeout=None, timeout=None, lost_worker_timeout=None,
                callbacks_propagate=(),
                correlation_id=None):
    '''
    Asynchronous equivalent of `apply()` method.

    Callback is called when the functions return value is ready.
    The accept callback is called when the job is accepted to be executed.

    Simplified the flow is like this:

        >>> def apply_async(func, args, kwds, callback, accept_callback):
        ...     if accept_callback:
        ...         accept_callback()
        ...     retval = func(*args, **kwds)
        ...     if callback:
        ...         callback(retval)

    '''

    if self._state == RUN:
        waitforslot = self.putlocks if waitforslot is None else waitforslot
        if waitforslot and self._putlock is not None:
            self._putlock.acquire()
        result = ApplyResult(
            self._cache, callback, accept_callback, timeout_callback,
            error_callback, soft_timeout, timeout, lost_worker_timeout,
            on_timeout_set=self.on_timeout_set,
            on_timeout_cancel=self.on_timeout_cancel,
            callbacks_propagate=callbacks_propagate,
            send_ack=self.send_ack if self.synack else None,
            correlation_id=correlation_id,
        )
        if timeout or soft_timeout:
            # start the timeout handler thread when required.
            self._start_timeout_handler()
        if self.threads:
            self._taskqueue.put(([(TASK, (result._job, None,
                                func, args, kwds))], None))
        else:
            self._quick_put((TASK, (result._job, None, func, args, kwds)))
        return result

變量爲:

accept_callback = {method} <bound method Request.on_accepted of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
args = {tuple: 6} ('myTest.add', '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', {'lang': 'py', 'task': 'myTest.add', 'id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'parent_id': None, 'argsrepr': '(2, 17)', 'kwargsrepr': '{}', 'origin': 'gen19806@ demini', 'reply_to': 'ef1b446d-e3a9-3345-b027-b7bd8a93aa93', 'correlation_id': '863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f', 'hostname': 'celery@ demini', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 17], 'kwargs': {}, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 17], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8')

callback = {method} <bound method create_request_cls.<locals>.Request.on_success of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>

error_callback = {method} <bound method Request.on_failure of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>

self = {AsynPool} <celery.concurrency.asynpool.AsynPool object at 0x7fcbaee2ea20>

timeout_callback = {method} <bound method Request.on_timeout of <Request: myTest.add[863cf9b2-8440-4ea2-8ac4-06b3dcd2fd1f] (2, 17) {}>>
waitforslot = {bool} False
3.3.3.1 部分變量事先設置

咱們首先說說以前的部分變量設置。

好比以下代碼中有:

inq, outq, synq = self.get_process_queues() 和 self._process_register_queues(w, (inq, outq, synq)) 就是具體設置父進程和子進程以前的管道。

def _create_worker_process(self, i):
    sentinel = self._ctx.Event() if self.allow_restart else None
    
    inq, outq, synq = self.get_process_queues()
    on_ready_counter = self._ctx.Value('i')
    
    w = self.WorkerProcess(self.Worker(
        inq, outq, synq, self._initializer, self._initargs,
        self._maxtasksperchild, sentinel, self._on_process_exit,
        # Need to handle all signals if using the ipc semaphore,
        # to make sure the semaphore is released.
        sigprotection=self.threads,
        wrap_exception=self._wrap_exception,
        max_memory_per_child=self._max_memory_per_child,
        on_ready_counter=on_ready_counter,
    ))
    self._pool.append(w)
    self._process_register_queues(w, (inq, outq, synq))
    w.name = w.name.replace('Process', 'PoolWorker')
    w.daemon = True
    w.index = i
    w.start()
    self._poolctrl[w.pid] = sentinel
    self._on_ready_counters[w.pid] = on_ready_counter
    if self.on_process_up:
        self.on_process_up(w)
    return w

好比下面是管道的創建。

def _setup_queues(self):
    self._inqueue = self._ctx.SimpleQueue()
    self._outqueue = self._ctx.SimpleQueue()
    self._quick_put = self._inqueue._writer.send
    self._quick_get = self._outqueue._reader.recv

以及管道相關文件的確立。

def _create_write_handlers(self, hub,
                           pack=pack, dumps=_pickle.dumps,
                           protocol=HIGHEST_PROTOCOL):
    """Create handlers used to write data to child processes."""
    fileno_to_inq = self._fileno_to_inq
    fileno_to_synq = self._fileno_to_synq
    outbound = self.outbound_buffer
    pop_message = outbound.popleft
    put_message = outbound.append

因此最終預置變量具體以下:

self._taskqueue = {Queue} <queue.Queue object at 0x7fcbaee57b00>
self._quick_put = {function} <function AsynPool._create_write_handlers.<locals>.send_job at 0x7fcbaef569d8>
self._outqueue = {NoneType} None
self._inqueue = {NoneType} None
self._fileno_to_synq = {dict: 1} {None: <ForkProcess(ForkPoolWorker-4, started daemon)>}
self._quick_get = {NoneType} None
self._fileno_to_inq = {dict: 0} {}
self.outbound_buffer = {deque: 1} deque([<%s: 0 ack:False ready:False>])


self = {Pool} <billiard.pool.Pool object at 0x000002663FD6E948>
 ResultHandler = {type} <class 'billiard.pool.ResultHandler'>
 SoftTimeLimitExceeded = {type} <class 'billiard.exceptions.SoftTimeLimitExceeded'>
 Supervisor = {type} <class 'billiard.pool.Supervisor'>
 TaskHandler = {type} <class 'billiard.pool.TaskHandler'>
 TimeoutHandler = {type} <class 'billiard.pool.TimeoutHandler'>
 Worker = {type} <class 'billiard.pool.Worker'>
3.3.3.2 發送給子進程

在 windows 就是

if self.threads:    self._taskqueue.put(([(TASK, (result._job, None,                        func, args, kwds))], None))

*nix 就是:這裏創建了job,而且發送。就是經過 put_message(job) 往子進程 pipe發消息。

def send_job(tup):
    # Schedule writing job request for when one of the process
    # inqueues are writable.
    body = dumps(tup, protocol=protocol)
    body_size = len(body)
    header = pack('>I', body_size)
    # index 1,0 is the job ID.
    job = get_job(tup[1][0])
    job._payload = buf_t(header), buf_t(body), body_size
    put_message(job)
    
self._quick_put = send_job

此時邏輯爲:

+
    Consumer               |
                   message |
                           v         strategy  +------------------------------------+
              +------------+------+            | strategies                         |
              | on_task_received  | <--------+ |                                    |
              |                   |            |[myTest.add : task_message_handler] |
              +------------+------+            +------------------------------------+
                           |
                           |
   +------------------------------------------------------------------------------------+
   strategy                |
                           |
                           |
                           v                Request [myTest.add]
              +------------+-------------+                       +---------------------+
              | task_message_handler     | <-------------------+ | create_request_cls  |
              |                          |                       |                     |
              +------------+-------------+                       +---------------------+
                           | _process_task_sem
                           |
  +------------------------------------------------------------------------------------+
   Worker                  | req[{Request} myTest.add]
                           v
                  +--------+-----------+
                  | WorkController     |
                  |                    |
                  |            pool +-------------------------+
                  +--------+-----------+                      |
                           |                                  |
                           |               apply_async        v
               +-----------+----------+                   +---+-------------------+
               |{Request} myTest.add  | +---------------> | TaskPool              |
               +----------------------+                   +----+------------------+
                                          myTest.add           |
                                                               |
+--------------------------------------------------------------------------------------+
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +------------+--------------+
                                |
                                |  put(task)
                                |
+--------------------------------------------------------------------------------------+
                                |
 Sub process                    |
                                v

手機以下:

因而從下文開始,咱們正式進入多進程是如何處理消息的。

0xEE 本系列文章

本系列目前文章以下:

[源碼分析] 消息隊列 Kombu 之 mailbox

[源碼分析] 消息隊列 Kombu 之 Hub

[源碼分析] 消息隊列 Kombu 之 Consumer

[源碼分析] 消息隊列 Kombu 之 Producer

[源碼分析] 消息隊列 Kombu 之 啓動過程

[源碼解析] 消息隊列 Kombu 之 基本架構

[源碼解析] 並行分佈式框架 Celery 之架構 (1)

[源碼解析] 並行分佈式框架 Celery 之架構 (2)

[源碼解析] 並行分佈式框架 Celery 之 worker 啓動 (1)

[源碼解析] 並行分佈式框架 Celery 之 worker 啓動 (2)

[源碼解析] 分佈式任務隊列 Celery 之啓動 Consumer

[源碼解析] 並行分佈式任務隊列 Celery 之 Task是什麼

0xFF 參考

1: Worker 啓動流程概述

2: Worker 的執行引擎

3: Task 對象的實現

4: 定時任務的實現

5: 遠程控制管理

6: Events 的實現

7: Worker 之間的交互

Celery 源碼解析一:Worker 啓動流程概述

Celery 源碼解析二:Worker 的執行引擎

Celery 源碼解析三:Task 對象的實現

Celery 源碼解析四:定時任務的實現

Celery 源碼解析五:遠程控制管理

Celery 源碼解析六:Events 的實現

相關文章
相關標籤/搜索