Celery是Python中最流行的異步消息隊列框架,支持RabbitMQ、Redis、ZoopKeeper等做爲Broker,而對這些消息隊列的抽象,都是經過Kombu實現的。Kombu實現了對AMQP transport和non-AMQP transports(Redis、Amazon SQS、ZoopKeeper等)的兼容。python
AMQP中的各類概念,Message、Producer、Exchange、Queue、Consumer、Connection、Channel在Kombu中都相應作了實現,另外Kombu還實現了Transport,就是存儲和發送消息的實體,用來區分底層消息隊列是用amqp、Redis仍是其它實現的。git
對於不一樣的Transport的支持:github
先從官網示例代碼開始:redis
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
基本上,各類角色都出場了。各類角色的使用都要從創建Connection開始。json
獲取鏈接很簡單:緩存
>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')
如今的鏈接其實並未真正創建,只有在須要使用的時候才真正創建鏈接並將鏈接緩存:app
@property
def connection(self):
"""The underlying connection object.
Warning:
This instance is transport specific, so do not
depend on the interface of this object.
"""
if not self._closed:
if not self.connected:
self.declared_entities.clear()
self._default_channel = None
self._connection = self._establish_connection()
self._closed = False
return self._connection
也能夠主動鏈接:框架
>>> connection.connect()
def connect(self):
"""Establish connection to server immediately."""
self._closed = False
return self.connection
固然,鏈接底層是由各自使用的不一樣的Transport
創建的:異步
conn = self.transport.establish_connection()
鏈接須要顯式的關閉:socket
>>> connection.release()
因爲Connection
實現了上下文生成器:
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
因此能夠使用with語句,以避免忘記關閉鏈接:
with Connection() as connection: # work with connection
能夠使用Connection
直接創建Procuder
和Consumer
,其實就是調用了各自的建立類:
def Producer(self, channel=None, *args, **kwargs):
"""Create new :class:`kombu.Producer` instance."""
from .messaging import Producer
return Producer(channel or self, *args, **kwargs)
def Consumer(self, queues=None, channel=None, *args, **kwargs):
"""Create new :class:`kombu.Consumer` instance."""
from .messaging import Consumer
return Consumer(channel or self, queues, *args, **kwargs)
鏈接建立後,能夠使用鏈接建立Producer
:
producer = conn.Producer(serializer='json')
也能夠直接使用Channel建立:
with connection.channel() as channel: producer = Producer(channel, ...)
Producer
實例初始化的時候會檢查第一個channel
參數:
self.revive(self.channel)
channel = self.channel = maybe_channel(channel)
這裏會檢查channel
是否是Connection
實例,是的話會將其替換爲Connection
實例的default_channel
屬性:
def maybe_channel(channel): """Get channel from object. Return the default channel if argument is a connection instance, otherwise just return the channel given. """ if is_connection(channel): return channel.default_channel return channel
因此Producer
仍是與Channel
聯繫在一塊兒的。
Producer
發送消息:
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
pulish
作的事情,主要是由Channel
完成的:
def _publish(self, body, priority, content_type, content_encoding,
┆ ┆ ┆ ┆headers, properties, routing_key, mandatory,
┆ ┆ ┆ ┆immediate, exchange, declare):
┆ channel = self.channel
┆ message = channel.prepare_message(
┆ ┆ body, priority, content_type,
┆ ┆ content_encoding, headers, properties,
┆ )
┆ if declare:
┆ ┆ maybe_declare = self.maybe_declare
┆ ┆ [maybe_declare(entity) for entity in declare]
┆ # handle autogenerated queue names for reply_to
┆ reply_to = properties.get('reply_to')
┆ if isinstance(reply_to, Queue):
┆ ┆ properties['reply_to'] = reply_to.name
┆ return channel.basic_publish(
┆ ┆ message,
┆ ┆ exchange=exchange, routing_key=routing_key,
┆ ┆ mandatory=mandatory, immediate=immediate,
┆ )
Channel
組裝消息prepare_message
,而且發送消息basic_publish
。
而Channel
又是Transport
建立的:
chan = self.transport.create_channel(self.connection)
當建立Connection
時,須要傳入hostname
,相似於:
amqp://guest:guest@localhost:5672//
而後獲取hostname
的scheme
,好比redis
:
transport = transport or urlparse(hostname).scheme
以此來區分建立的Transport
的類型。
建立過程:
self.transport_cls = transport
transport_cls = get_transport_cls(transport_cls)
def get_transport_cls(transport=None):
"""Get transport class by name.
The transport string is the full path to a transport class, e.g.::
┆ "kombu.transport.pyamqp:Transport"
If the name does not include `"."` (is not fully qualified),
the alias table will be consulted.
"""
if transport not in _transport_cache:
┆ _transport_cache[transport] = resolve_transport(transport)
return _transport_cache[transport]
transport = TRANSPORT_ALIASES[transport]
TRANSPORT_ALIASES = {
...
'redis': 'kombu.transport.redis:Transport',
...
}
以Redis
爲例,Transport
類在/kombu/transport/redis.py
文件,繼承自/kombu/transport/virtual/base.py
中的Transport
類。
建立Channel
:
channel = self.Channel(connection)
而後Channel
組裝消息prepare_message
,而且發送消息basic_publish
。
Channel
實例有幾個屬性關聯着Consumer、Queue等,virtual.Channel
:
class Channel(AbstractChannel, base.StdChannel):
def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = set()
self._cycle = None
self._tag_to_queue = {}
self._active_queues = []
...
其中,_consumers
是相關聯的消費者標籤集合,_active_queues
是相關聯的Queue列表,_tag_to_queue
則是消費者標籤與Queue的映射:
self._tag_to_queue[consumer_tag] = queue
self._consumers.add(consumer_tag)
self._active_queues.append(queue)
Channel
對於不一樣的底層消息隊列,也有不一樣的實現,以Redis
爲例:
class Channel(virtual.Channel):
"""Redis Channel."""
繼承自virtual.Channel
。
組裝消息函數prepare_message
:
def prepare_message(self, body, priority=None, content_type=None,
┆ ┆ ┆ ┆ ┆ content_encoding=None, headers=None, properties=None):
┆ """Prepare message data."""
┆ properties = properties or {}
┆ properties.setdefault('delivery_info', {})
┆ properties.setdefault('priority', priority or self.default_priority)
┆ return {'body': body,
┆ ┆ ┆ 'content-encoding': content_encoding,
┆ ┆ ┆ 'content-type': content_type,
┆ ┆ ┆ 'headers': headers or {},
┆ ┆ ┆ 'properties': properties or {}}
基本上是爲消息添加各類屬性。
發送消息basic_publish
方法是調用_put
方法:
def _put(self, queue, message, **kwargs):
┆ """Deliver message."""
┆ pri = self._get_message_priority(message, reverse=False)
┆ with self.conn_or_acquire() as client:
┆ ┆ client.lpush(self._q_for_pri(queue, pri), dumps(message))
client
是一個redis.StrictRedis
鏈接:
def _create_client(self, asynchronous=False):
┆ if asynchronous:
┆ ┆ return self.Client(connection_pool=self.async_pool)
┆ return self.Client(connection_pool=self.pool)
self.Client = self._get_client()
def _get_client(self):
┆ if redis.VERSION < (3, 2, 0):
┆ ┆ raise VersionMismatch(
┆ ┆ ┆ 'Redis transport requires redis-py versions 3.2.0 or later. '
┆ ┆ ┆ 'You have {0.__version__}'.format(redis))
┆ return redis.StrictRedis
Redis
將消息置於某個列表(lpush)中。還會根據是否異步的選項選擇不一樣的connection_pool
。
如今消息已經被放置與隊列中,那麼消息又被如何使用呢?
Consumer
初始化須要聲明Channel
和要消費的隊列列表以及處理消息的回調函數列表:
with Consumer(connection, queues, callbacks=[process_media], accept=['json']):
connection.drain_events(timeout=1)
當Consumer
實例被當作上下文管理器使用時,會調用consume
方法:
def __enter__(self):
self.consume()
return self
consume
方法代碼:
def consume(self, no_ack=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
will consume from new queues added since the last call,
it will not cancel consuming from removed queues (
use :meth:`cancel_by_queue`).
Arguments:
no_ack (bool): See :attr:`no_ack`.
"""
queues = list(values(self._queues))
if queues:
no_ack = self.no_ack if no_ack is None else no_ack
H, T = queues[:-1], queues[-1]
for queue in H:
self._basic_consume(queue, no_ack=no_ack, nowait=True)
self._basic_consume(T, no_ack=no_ack, nowait=False)
使用_basic_consume
方法處理相關的隊列列表中的每一項,其中處理最後一個Queue時設置標誌nowait=False
。
_basic_consume
方法代碼:
def _basic_consume(self, queue, consumer_tag=None,
no_ack=no_ack, nowait=True):
tag = self._active_tags.get(queue.name)
if tag is None:
tag = self._add_tag(queue, consumer_tag)
queue.consume(tag, self._receive_callback,
no_ack=no_ack, nowait=nowait)
return tag
是將消費者標籤以及回調函數傳給Queue
的consume
方法。
Queue
的consume
方法代碼:
def consume(self, consumer_tag='', callback=None,
no_ack=None, nowait=False):
"""Start a queue consumer.
Consumers last as long as the channel they were created on, or
until the client cancels them.
Arguments:
consumer_tag (str): Unique identifier for the consumer.
The consumer tag is local to a connection, so two clients
can use the same consumer tags. If this field is empty
the server will generate a unique tag.
no_ack (bool): If enabled the broker will automatically
ack messages.
nowait (bool): Do not wait for a reply.
callback (Callable): callback called for each delivered message.
"""
if no_ack is None:
no_ack = self.no_ack
return self.channel.basic_consume(
queue=self.name,
no_ack=no_ack,
consumer_tag=consumer_tag or '',
callback=callback,
nowait=nowait,
arguments=self.consumer_arguments)
又回到了Channel
,Channel
的basic_consume
代碼:
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
"""Consume from `queue`."""
self._tag_to_queue[consumer_tag] = queue
self._active_queues.append(queue)
def _callback(raw_message):
message = self.Message(raw_message, channel=self)
if not no_ack:
self.qos.append(message, message.delivery_tag)
return callback(message)
self.connection._callbacks[queue] = _callback
self._consumers.add(consumer_tag)
self._reset_cycle()
Channel
將Consumer
標籤,Consumer
要消費的隊列,以及標籤與隊列的映射關係都記錄下來,等待循環調用。另外,還經過Transport
將隊列與回調函數列表的映射關係記錄下來,以便於從隊列中取出消息後執行回調函數。
真正的調用是下面這行代碼實現的:
connection.drain_events(timeout=1)
如今來到Transport
的drain_events
方法:
def drain_events(self, connection, timeout=None):
time_start = monotonic()
get = self.cycle.get
polling_interval = self.polling_interval
if timeout and polling_interval and polling_interval > timeout:
polling_interval = timeout
while 1:
try:
get(self._deliver, timeout=timeout)
except Empty:
if timeout is not None and monotonic() - time_start >= timeout:
raise socket.timeout()
if polling_interval is not None:
sleep(polling_interval)
else:
break
看上去是在無限執行get(self._deliver, timeout=timeout)
get
是self.cycle
的一個方法,cycle
是一個FairCycle
實例:
self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)
@python_2_unicode_compatible
class FairCycle(object):
"""Cycle between resources.
Consume from a set of resources, where each resource gets
an equal chance to be consumed from.
Arguments:
fun (Callable): Callback to call.
resources (Sequence[Any]): List of resources.
predicate (type): Exception predicate.
"""
def __init__(self, fun, resources, predicate=Exception):
self.fun = fun
self.resources = resources
self.predicate = predicate
self.pos = 0
def _next(self):
while 1:
try:
resource = self.resources[self.pos]
self.pos += 1
return resource
except IndexError:
self.pos = 0
if not self.resources:
raise self.predicate()
def get(self, callback, **kwargs):
"""Get from next resource."""
for tried in count(0): # for infinity
resource = self._next()
try:
return self.fun(resource, callback, **kwargs)
except self.predicate:
# reraise when retries exchausted.
if tried >= len(self.resources) - 1:
raise
FairCycle
接受兩個參數,fun
是要執行的函數fun
,而resources
做爲一個迭代器,每次提供一個item供fun
調用。
此處的fun
是_drain_channel
,resources
是channels
:
def _drain_channel(self, channel, callback, timeout=None): return channel.drain_events(callback=callback, timeout=timeout)
Transport
相關聯的每個channel都要執行drain_events
。
Channel
的drain_events
代碼:
def drain_events(self, timeout=None, callback=None):
callback = callback or self.connection._deliver
if self._consumers and self.qos.can_consume():
if hasattr(self, '_get_many'):
return self._get_many(self._active_queues, timeout=timeout)
return self._poll(self.cycle, callback, timeout=timeout)
raise Empty()
_poll
代碼:
def _poll(self, cycle, callback, timeout=None):
"""Poll a list of queues for available messages."""
return cycle.get(callback)
又回到了FairCycle
,Channel
的FairCycle
實例:
def _reset_cycle(self):
self._cycle = FairCycle(
self._get_and_deliver, self._active_queues, Empty)
_get_and_deliver
方法從隊列中取出消息,而後調用Transport
傳遞過來的_deliver
方法:
def _get_and_deliver(self, queue, callback):
message = self._get(queue)
callback(message, queue)
_deliver
代碼:
def _deliver(self, message, queue):
if not queue:
raise KeyError(
'Received message without destination queue: {0}'.format(
message))
try:
callback = self._callbacks[queue]
except KeyError:
logger.warning(W_NO_CONSUMERS, queue)
self._reject_inbound_message(message)
else:
callback(message)
作的事情是根據隊列取出註冊到此隊列的回調函數列表,而後對消息執行列表中的全部回調函數。
可見,Kombu中Channel
和Transport
很是重要,Channel
記錄了隊列列表、消費者列表以及二者的映射關係,而Transport
記錄了隊列與回調函數的映射關係。Kombu對全部須要監聽的隊列_active_queues
都查詢一遍,直到查詢完畢或者遇到一個能夠使用的Queue,而後就獲取消息,回調此隊列對應的callback。