[源碼分析] 消息隊列 Kombu 之 Producer

[源碼分析] 消息隊列 Kombu 之 Producer

0x00 摘要

本系列咱們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象。經過本文,你們能夠了解 Kombu 中的 Producer 概念。html

0x01 示例代碼

下面使用以下代碼來進行說明。java

本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。python

def main(arguments):
    hub = Hub()
    exchange = Exchange('asynt_exchange')
    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

    def send_message(conn):
        producer = Producer(conn)
        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
        print('message sent')

    def on_message(message):
        print('received: {0!r}'.format(message.body))
        message.ack()
        # hub.stop()  # <-- exit after one message

    conn = Connection('redis://localhost:6379')
    conn.register_with_event_loop(hub)

    def p_message():
        print(' kombu ')

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(3, p_message)
        hub.run_forever()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))

0x02 來由

前文已經完成了構建部分,Consumer部分,下面來到了Producer部分,即以下代碼:redis

def send_message(conn):
		producer = Producer(conn)
    producer.publish('hello world', exchange=exchange, routing_key='asynt')
   	print('message sent')

咱們知道,Transport須要把Channel與文件信息聯繫起來,可是此時Transport信息以下,文件信息依然沒有,這是咱們之後須要留意的promise

transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98>
 Channel = {type} <class 'kombu.transport.redis.Channel'>
 Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
 Management = {type} <class 'kombu.transport.virtual.base.Management'>
 channel_max = {int} 65535
 channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>]
 client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
 cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
  after_read = {set: 0} set()
  eventflags = {int} 25
  fds = {dict: 0} {}
  poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
 default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'}
 default_port = {int} 6379
 driver_name = {str} 'redis'
 driver_type = {str} 'redis'
 implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False}
 manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0>
 polling_interval = {NoneType} None
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>

0x03 創建

3.1 定義

Producer中,主要變量是:服務器

  • _channel :就是channel;
  • exchange :exchange;

可是本文示例沒有傳入exchange,這就有些奇怪,咱們須要繼續看看框架

class Producer:
    """Message Producer.

    Arguments:
        channel (kombu.Connection, ChannelT): Connection or channel.
        exchange (kombu.entity.Exchange, str): Optional default exchange.
        routing_key (str): Optional default routing key.
    """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ''

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method.  Disabled by default.
    compression = None

    #: By default, if a defualt exchange is set,
    #: that exchange will be declare when publishing a message.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None

3.2 init

init代碼以下。async

def __init__(self, channel, exchange=None, routing_key=None,
             serializer=None, auto_declare=None, compression=None,
             on_return=None):
    self._channel = channel
    self.exchange = exchange
    self.routing_key = routing_key or self.routing_key
    self.serializer = serializer or self.serializer
    self.compression = compression or self.compression
    self.on_return = on_return or self.on_return
    self._channel_promise = None
    if self.exchange is None:
        self.exchange = Exchange('')
    if auto_declare is not None:
        self.auto_declare = auto_declare

    if self._channel:
        self.revive(self._channel)

3.2.1 轉換channel

這裏有個重要轉換。函數

  • 最開始是把輸入參數 Connection 賦值到 self._channel。
  • 而後 revive 方法作了轉換爲 channel,即 self._channel 最終是 channel 類型。

可是 exchange 依然沒有意義,是 direct 類型。oop

代碼以下:

def revive(self, channel):
    """Revive the producer after connection loss."""
    if is_connection(channel):
        connection = channel
        self.__connection__ = connection
        channel = ChannelPromise(lambda: connection.default_channel)
    if isinstance(channel, ChannelPromise):
        self._channel = channel
        self.exchange = self.exchange(channel)
    else:
        # Channel already concrete
        self._channel = channel
        if self.on_return:
            self._channel.events['basic_return'].add(self.on_return)
        self.exchange = self.exchange(channel)

此時變量爲:

producer = {Producer} 
 auto_declare = {bool} True
 channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
 compression = {NoneType} None
 connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
 exchange = {Exchange} Exchange ''(direct)
 on_return = {NoneType} None
 routing_key = {str} ''
 serializer = {NoneType} None

邏輯如圖:

+----------------------+               +-------------------+
| Producer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|      channel   +------------------>  |                   |        +-----------------------------------------------------------+
|                      |               |    pool           |
|      exchange        |   +---------> |                   | <------------------------------------------------------------+
|                      |   |           |                   |                                                              |
|      connection      |   |    +----> |    connection +---------------+                                                  |
|             +        |   |    |      |                   |           |                                                  |
+--+-------------------+   |    |      +-------------------+           |                                                  |
   |          |            |    |                                      v                                                  |
   |          |            |    |      +-------------------+       +---+-----------------+       +--------------------+   |
   |          |            |    |      | Connection        |       | redis.Transport     |       | MultiChannelPoller |   |
   |          +----------------------> |                   |       |                     |       |                    |   |
   |                       |    |      |                   |       |                     |       |     _channels +--------+
   |                       |    |      |                   |       |        cycle +------------> |     _fd_to_chan    |
   |                       |    |      |     transport +---------> |                     |       |     _chan_to_sock  |
   |             +-------->+    |      |                   |       |                     |    +------+ poller         |
   |             |              |      +-------------------+       +---------------------+    |  |     after_read     |
   |             |              |                                                             |  |                    |
   |             |              |                                                             |  +--------------------+
   |             |              |      +------------------+                   +---------------+
   |             |              |      | Hub              |                   |
   |             |              |      |                  |                   v
   |             |              |      |                  |            +------+------+
   |             |              |      |      poller +---------------> | _poll       |
   | publish     |              |      |                  |            |             |         +-------+
   +--------------------------------+  |                  |            |    _poller+---------> |  poll |
                 |              |   |  +------------------+            |             |         +-------+
                 |              |   |                                  +-------------+
    +-------------------+       |   +-----> +----------------+
    | Queue      |      |       |           | Exchange       |
    |      _channel     |       +---------+ |                |
    |                   |                   |                |
    |      exchange +-------------------->  |     channel    |
    |                   |                   |                |
    |                   |                   |                |
    +-------------------+                   +----------------+

手機如圖:

0x04 發送

發送消息是經過producer.publish完成。

def send_message(conn):
    producer = Producer(conn)
    producer.publish('hello world', exchange=exchange, routing_key='asynt')
    print('message sent')

此時傳入exchange做爲參數。原來若是沒有 Exchange,是能夠在這裏進行補救

producer.publish繼續調用到以下,能夠看到分爲兩步:

  • 調用channel的組裝消息函數prepare_message
  • 調用channel的發送消息basic_publish

所以,最終發送消息仍是經過channel完成。

def _publish(self, body, priority, content_type, content_encoding,
             headers, properties, routing_key, mandatory,
             immediate, exchange, declare):
    channel = self.channel
    message = channel.prepare_message(
        body, priority, content_type,
        content_encoding, headers, properties,
    )
    if declare:
        maybe_declare = self.maybe_declare
        [maybe_declare(entity) for entity in declare]

    # handle autogenerated queue names for reply_to
    reply_to = properties.get('reply_to')
    if isinstance(reply_to, Queue):
        properties['reply_to'] = reply_to.name
    return channel.basic_publish(
        message,
        exchange=exchange, routing_key=routing_key,
        mandatory=mandatory, immediate=immediate,
    )

4.1 組裝消息 in channel

channel 的組裝消息函數prepare_message完成組裝功能,基本上是爲消息添加各類屬性。

def prepare_message(self, body, priority=None, content_type=None,
                    content_encoding=None, headers=None, properties=None):
    """Prepare message data."""
    properties = properties or {}
    properties.setdefault('delivery_info', {})
    properties.setdefault('priority', priority or self.default_priority)

    return {'body': body,
            'content-encoding': content_encoding,
            'content-type': content_type,
            'headers': headers or {},
            'properties': properties or {}}

消息以下:

message = {dict: 5} 
 'body' = {str} 'aGVsbG8gd29ybGQ='
 'content-encoding' = {str} 'utf-8'
 'content-type' = {str} 'text/plain'
 'headers' = {dict: 0} {}
  __len__ = {int} 0
 'properties' = {dict: 5} 
  'delivery_mode' = {int} 2
  'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'}
  'priority' = {int} 0
  'body_encoding' = {str} 'base64'
  'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'
  __len__ = {int} 5

4.2 發送消息 in channel

channel的發送消息basic_publish完成發送功能。此時使用了傳入的參數exchange。

發送消息basic_publish方法是調用_put方法:

def basic_publish(self, message, exchange, routing_key, **kwargs):
    """Publish message."""
    self._inplace_augment_message(message, exchange, routing_key)
    if exchange:
        return self.typeof(exchange).deliver(
            message, exchange, routing_key, **kwargs
        )
    # anon exchange: routing_key is the destination queue
    return self._put(routing_key, message, **kwargs)

4.3 deliver in exchange

self.typeof(exchange).deliver代碼接着來到exchange。本文是DirectExchange。

注意,這裏用到了self.channel._put。就是Exchange的成員變量channel。

class DirectExchange(ExchangeType):
    """Direct exchange.

    The `direct` exchange routes based on exact routing keys.
    """

    type = 'direct'

    def lookup(self, table, exchange, routing_key, default):
        return {
            queue for rkey, _, queue in table
            if rkey == routing_key
        }

    def deliver(self, message, exchange, routing_key, **kwargs):
        _lookup = self.channel._lookup
        _put = self.channel._put
        for queue in _lookup(exchange, routing_key):
            _put(queue, message, **kwargs)

4.4 binding 轉換

咱們知道,Exchange的做用只是將發送的 routing_key 轉化爲 queue 的名字。這樣發送就知道發到哪一個 queue

所以依據_lookup方法獲得對應的queue

def _lookup(self, exchange, routing_key, default=None):
    """Find all queues matching `routing_key` for the given `exchange`.

    Returns:
        str: queue name -- must return the string `default`
            if no queues matched.
    """
    if default is None:
        default = self.deadletter_queue
    if not exchange:  # anon exchange
        return [routing_key or default]

    try:
        R = self.typeof(exchange).lookup(
            self.get_table(exchange),
            exchange, routing_key, default,
        )
    except KeyError:
        R = []

    if not R and default is not None:
        warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
            exchange=exchange, routing_key=routing_key)),
        )
        self._new_queue(default)
        R = [default]
    return R

此處具體邏輯爲:

第一,調用到channel的方法。這裏的 exchange 名字爲 asynt_exchange。

def get_table(self, exchange):
    key = self.keyprefix_queue % exchange
    with self.conn_or_acquire() as client:
        values = client.smembers(key)
        if not values:
            raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
        return [tuple(bytes_to_str(val).split(self.sep)) for val in values]

咱們看看Redis內容,發現集合內容以下:

127.0.0.1:6379> smembers _kombu.binding.asynt_exchange
1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"

第二,所以獲得對應binding爲:

{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

即從 exchange 獲得 routing_key ---> queue 的規則,而後再依據 routing_key 獲得 queue。就知道 Consumer 和 Producer 須要依據哪一個 queue 交換消息。

邏輯以下:

+---------------------------------+
                                  |         exchange                |
                                  |                                 |
                 1 routing_key x  |                                 |
+----------+                      |                                 |      +------------+
| Producer |  +-----------------> |   routing_key x --->  queue x   |      |  Consumer  |
+--------+-+                      |                                 |      +------------+
         |                        |   routing_key y --->  queue y   |
         |                        |                                 |           ^
         |                        |   routing_key z --->  queue z   |           |
         |                        |                                 |           |
         |                        +---------------------------------+           |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                                                      |
         |                                  +-----------+                       |
         |        2 message                 |           |        3 message      |
         +------------------------------->  |  queue X  |  +--------------------+
                                            |           |
                                            +-----------+

4.5 _put in channel

channel的_put 方法被用來繼續處理,能夠看到其最終調用到了client.lpush。

client爲:

Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

代碼爲:

def _put(self, queue, message, **kwargs):
    """Deliver message."""
    pri = self._get_message_priority(message, reverse=False)

    with self.conn_or_acquire() as client:
        client.lpush(self._q_for_pri(queue, pri), dumps(message))

redis怎麼區別不一樣的queue?

實際是每一個 queue 被賦予一個字符串 name,這個 name 就是 redis 對應的 list 的 key。知道應該向哪一個 list 放消息,後續就是向此 list 中 lpush 消息。

以下方法完成轉換功能。

def _q_for_pri(self, queue, pri):
    pri = self.priority(pri)
    if pri:
        return f"{queue}{self.sep}{pri}"
    return queue

如今發消息以後,redis內容以下,咱們能夠看出來,消息做爲list 的item,放入到之中。

127.0.0.1:6379> lrange asynt_queue 0 -1
1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}"
127.0.0.1:6379>

0x05 總結

如今咱們總結以下:

  • Producers: 發送消息的抽象類;
  • Consumers:接受消息的抽象類,consumer須要聲明一個queue,並將queue與指定的exchange綁定,而後從queue裏面接收消息;
  • Exchange:MQ 路由,消息發送者將消息發至Exchange,Exchange負責將消息分發至隊列;
  • Queue:對應的 queue 抽象,存儲着即將被應用消費掉的消息,Exchange負責將消息分發Queue,消費者從Queue接收消息;
  • Channel:與AMQP中概念相似,能夠理解成共享一個Connection的多個輕量化連,就是真實redis鏈接;

因而邏輯鏈已經造成,大約是這樣的:

  • Producer的publish方法接受參數Exchange,因而就發送消息到此Exchange;
  • Producer調用channel的組裝消息函數prepare_message爲消息添加各類屬性;
  • Producer調用channel的發送消息basic_publish發送消息,此時使用了傳入的參數exchange。
  • basic_publish方法調用exchange.deliver(exchange, routing_key)來發送消息;
  • Exchange中有成員變量Channel,也有成員變量Queues,每一個queue對應一個routing_key;
  • deliver使用_lookup方法依據key獲得對應的queue;
  • deliver使用Exchange成員變量Channel的_put方法來向queue中投放消息;
  • Channel拿到本身的redis鏈接池,即client爲Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>;因而能夠基於此進行redis操做;
  • redis怎麼區別不一樣的queue,實際是每一個queue被賦予一個字符串name,這就是redis對應的list的key;
  • 既然獲得了名字爲queue的list,則向此list中lpush消息。
  • Consumer去Queue取消息;

動態邏輯以下:

+------------+                        +------------+               +------------+      +-----------------------+
       |  producer  |                        |  channel   |               |  exchange  |      | Redis<ConnectionPool> |
       +---+--------+                        +----+-------+               +-------+----+      +----------+------------+
           |                                      |                               |                      |
           |                                      |                               |                      |
publish('', exchange, routing_key)                |                               |                      |
           |                                      |                               |                      |
           |   prepare_message                    |                               |                      |
           |                                      |                               |                      |
           | +----------------------------------> |                               |                      |
           |                                      |                               |                      |
           | basic_publish (exchange, routing_key)|                               |                      |
           |                                      |                               |                      |
           | +----------------------------------> |                               |                      |
           |                                      |                               |                      |
           |                                      | deliver(exchange, routing_key)|                      |
           |                                      |                               |                      |
           |                                      +-----------------------------> |                      |
           |                                      |                               |                      |
           |                                      |                               |                      |
           |                                      |                _lookup(exchange, routing_key)        |
           |                                      |                               |                      |
           |                                      |                               |                      |
           |                                      |    _put(queue, message)       |                      |
           |                                      v                               |                      |
           |                                      | <---------------------------+ |                      |
           |                                      |                               |                      |
           |                                _q_for_pri(queue, pri)                |                      |
           |                                      +                               |                      |
           v                                      |                               |                      |
           |                                      |     client.lpush              |                      |
           |                                      | +--------------------------------------------------> |
           |                                      |                               |                      |
           v                                      v                               v                      v

手機以下:

0xFF 參考

celery 7 優秀開源項目kombu源碼分析之registry和entrypoint

放棄pika,選擇kombu

kombu消息框架<二>

AMQP中的概念

AMQP的基本概念

深刻理解AMQP協議

kombu和消息隊列總結

關於epoll版服務器的理解(Python實現)

相關文章
相關標籤/搜索