本系列咱們介紹消息隊列 Kombu。Kombu 的定位是一個兼容 AMQP 協議的消息隊列抽象。經過本文,你們能夠了解 Kombu 中的 Producer 概念。html
下面使用以下代碼來進行說明。java
本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。python
def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print(' kombu ') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever() if __name__ == '__main__': sys.exit(main(sys.argv[1:]))
前文已經完成了構建部分,Consumer部分,下面來到了Producer部分,即以下代碼:redis
def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent')
咱們知道,Transport須要把Channel與文件信息聯繫起來,可是此時Transport信息以下,文件信息依然沒有,這是咱們之後須要留意的:promise
transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98> Channel = {type} <class 'kombu.transport.redis.Channel'> Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'> Management = {type} <class 'kombu.transport.virtual.base.Management'> channel_max = {int} 65535 channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>] client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0> cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0> after_read = {set: 0} set() eventflags = {int} 25 fds = {dict: 0} {} poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048> default_connection_params = {dict: 2} {'port': 6379, 'hostname': 'localhost'} default_port = {int} 6379 driver_name = {str} 'redis' driver_type = {str} 'redis' implements = {Implements: 3} {'asynchronous': True, 'exchange_type': frozenset({'direct', 'topic', 'fanout'}), 'heartbeats': False} manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0> polling_interval = {NoneType} None state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>
Producer中,主要變量是:服務器
可是本文示例沒有傳入exchange,這就有些奇怪,咱們須要繼續看看。框架
class Producer: """Message Producer. Arguments: channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. """ #: Default exchange exchange = None #: Default routing key. routing_key = '' #: Default serializer to use. Default is JSON. serializer = None #: Default compression method. Disabled by default. compression = None #: By default, if a defualt exchange is set, #: that exchange will be declare when publishing a message. auto_declare = True #: Basic return callback. on_return = None #: Set if channel argument was a Connection instance (using #: default_channel). __connection__ = None
init代碼以下。async
def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): self._channel = channel self.exchange = exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or self.on_return self._channel_promise = None if self.exchange is None: self.exchange = Exchange('') if auto_declare is not None: self.auto_declare = auto_declare if self._channel: self.revive(self._channel)
這裏有個重要轉換。函數
可是 exchange 依然沒有意義,是 direct 類型。oop
代碼以下:
def revive(self, channel): """Revive the producer after connection loss.""" if is_connection(channel): connection = channel self.__connection__ = connection channel = ChannelPromise(lambda: connection.default_channel) if isinstance(channel, ChannelPromise): self._channel = channel self.exchange = self.exchange(channel) else: # Channel already concrete self._channel = channel if self.on_return: self._channel.events['basic_return'].add(self.on_return) self.exchange = self.exchange(channel)
此時變量爲:
producer = {Producer} auto_declare = {bool} True channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278> compression = {NoneType} None connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0> exchange = {Exchange} Exchange ''(direct) on_return = {NoneType} None routing_key = {str} '' serializer = {NoneType} None
邏輯如圖:
+----------------------+ +-------------------+ | Producer | | Channel | | | | | +-----------------------------------------------------------+ | | | client +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> | | channel +------------------> | | +-----------------------------------------------------------+ | | | pool | | exchange | +---------> | | <------------------------------------------------------------+ | | | | | | | connection | | +----> | connection +---------------+ | | + | | | | | | | +--+-------------------+ | | +-------------------+ | | | | | | v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+ | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ | | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+
手機如圖:
發送消息是經過producer.publish完成。
def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt') print('message sent')
此時傳入exchange做爲參數。原來若是沒有 Exchange,是能夠在這裏進行補救。
producer.publish繼續調用到以下,能夠看到分爲兩步:
prepare_message
;basic_publish
;所以,最終發送消息仍是經過channel完成。
def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare): channel = self.channel message = channel.prepare_message( body, priority, content_type, content_encoding, headers, properties, ) if declare: maybe_declare = self.maybe_declare [maybe_declare(entity) for entity in declare] # handle autogenerated queue names for reply_to reply_to = properties.get('reply_to') if isinstance(reply_to, Queue): properties['reply_to'] = reply_to.name return channel.basic_publish( message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, )
channel 的組裝消息函數prepare_message
完成組裝功能,基本上是爲消息添加各類屬性。
def prepare_message(self, body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Prepare message data.""" properties = properties or {} properties.setdefault('delivery_info', {}) properties.setdefault('priority', priority or self.default_priority) return {'body': body, 'content-encoding': content_encoding, 'content-type': content_type, 'headers': headers or {}, 'properties': properties or {}}
消息以下:
message = {dict: 5} 'body' = {str} 'aGVsbG8gd29ybGQ=' 'content-encoding' = {str} 'utf-8' 'content-type' = {str} 'text/plain' 'headers' = {dict: 0} {} __len__ = {int} 0 'properties' = {dict: 5} 'delivery_mode' = {int} 2 'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange', 'routing_key': 'asynt_routing_key'} 'priority' = {int} 0 'body_encoding' = {str} 'base64' 'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a' __len__ = {int} 5
channel的發送消息basic_publish
完成發送功能。此時使用了傳入的參數exchange。
發送消息basic_publish
方法是調用_put
方法:
def basic_publish(self, message, exchange, routing_key, **kwargs): """Publish message.""" self._inplace_augment_message(message, exchange, routing_key) if exchange: return self.typeof(exchange).deliver( message, exchange, routing_key, **kwargs ) # anon exchange: routing_key is the destination queue return self._put(routing_key, message, **kwargs)
self.typeof(exchange).deliver
代碼接着來到exchange。本文是DirectExchange。
注意,這裏用到了self.channel._put。就是Exchange的成員變量channel。
class DirectExchange(ExchangeType): """Direct exchange. The `direct` exchange routes based on exact routing keys. """ type = 'direct' def lookup(self, table, exchange, routing_key, default): return { queue for rkey, _, queue in table if rkey == routing_key } def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup _put = self.channel._put for queue in _lookup(exchange, routing_key): _put(queue, message, **kwargs)
咱們知道,Exchange的做用只是將發送的 routing_key
轉化爲 queue
的名字。這樣發送就知道發到哪一個 queue
。
所以依據_lookup方法獲得對應的queue
。
def _lookup(self, exchange, routing_key, default=None): """Find all queues matching `routing_key` for the given `exchange`. Returns: str: queue name -- must return the string `default` if no queues matched. """ if default is None: default = self.deadletter_queue if not exchange: # anon exchange return [routing_key or default] try: R = self.typeof(exchange).lookup( self.get_table(exchange), exchange, routing_key, default, ) except KeyError: R = [] if not R and default is not None: warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format( exchange=exchange, routing_key=routing_key)), ) self._new_queue(default) R = [default] return R
此處具體邏輯爲:
第一,調用到channel的方法。這裏的 exchange 名字爲 asynt_exchange。
def get_table(self, exchange): key = self.keyprefix_queue % exchange with self.conn_or_acquire() as client: values = client.smembers(key) if not values: raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
咱們看看Redis內容,發現集合內容以下:
127.0.0.1:6379> smembers _kombu.binding.asynt_exchange 1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
第二,所以獲得對應binding爲:
{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}
即從 exchange 獲得 routing_key ---> queue 的規則,而後再依據 routing_key 獲得 queue。就知道 Consumer 和 Producer 須要依據哪一個 queue 交換消息。
邏輯以下:
+---------------------------------+ | exchange | | | 1 routing_key x | | +----------+ | | +------------+ | Producer | +-----------------> | routing_key x ---> queue x | | Consumer | +--------+-+ | | +------------+ | | routing_key y ---> queue y | | | | ^ | | routing_key z ---> queue z | | | | | | | +---------------------------------+ | | | | | | | | | | | | | | | | | | +-----------+ | | 2 message | | 3 message | +-------------------------------> | queue X | +--------------------+ | | +-----------+
channel的_put 方法被用來繼續處理,能夠看到其最終調用到了client.lpush。
client爲:
Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
代碼爲:
def _put(self, queue, message, **kwargs): """Deliver message.""" pri = self._get_message_priority(message, reverse=False) with self.conn_or_acquire() as client: client.lpush(self._q_for_pri(queue, pri), dumps(message))
redis怎麼區別不一樣的queue?
實際是每一個 queue 被賦予一個字符串 name,這個 name 就是 redis 對應的 list 的 key。知道應該向哪一個 list 放消息,後續就是向此 list 中 lpush 消息。
以下方法完成轉換功能。
def _q_for_pri(self, queue, pri): pri = self.priority(pri) if pri: return f"{queue}{self.sep}{pri}" return queue
如今發消息以後,redis內容以下,咱們能夠看出來,消息做爲list 的item,放入到之中。
127.0.0.1:6379> lrange asynt_queue 0 -1 1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}" 127.0.0.1:6379>
如今咱們總結以下:
因而邏輯鏈已經造成,大約是這樣的:
prepare_message
爲消息添加各類屬性;Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
;因而能夠基於此進行redis操做;動態邏輯以下:
+------------+ +------------+ +------------+ +-----------------------+ | producer | | channel | | exchange | | Redis<ConnectionPool> | +---+--------+ +----+-------+ +-------+----+ +----------+------------+ | | | | | | | | publish('', exchange, routing_key) | | | | | | | | prepare_message | | | | | | | | +----------------------------------> | | | | | | | | basic_publish (exchange, routing_key)| | | | | | | | +----------------------------------> | | | | | | | | | deliver(exchange, routing_key)| | | | | | | +-----------------------------> | | | | | | | | | | | | _lookup(exchange, routing_key) | | | | | | | | | | | _put(queue, message) | | | v | | | | <---------------------------+ | | | | | | | _q_for_pri(queue, pri) | | | + | | v | | | | | client.lpush | | | | +--------------------------------------------------> | | | | | v v v v
手機以下: