kombu源碼Producer收穫一

celery內置了kombu庫,看了一下kombu的源碼,從官網最簡單的一個例子來分析---消息發佈,源碼以下:python

from __future__ import absolute_import, unicode_literals import datetime from kombu import Connection with Connection('redis://localhost:6379/0') as conn: simple_queue = conn.SimpleQueue('simple_queue') message = 'helloworld, sent at {0}'.format(datetime.datetime.today()) simple_queue.put(message) print('Sent: {0}'.format(message)) simple_queue.close()

運行以前開啓redis服務。這真是簡單到不能到簡單的例子-.-git

一步步分析畫出以下類圖:github

大概十七八個類。流程省略幾百萬個字。redis

 

記一下關鍵步驟:spa

一、建立生產者 messaging.Producer 時不會操做redis。設計

二、建立消息者 messaging.Consumer 時會建立exchange,及其對應的 routing_key、patter、queue(隊列名稱),具體格式像這樣:3d

 _kombu.binding.exchange_name => (routing_key\x06\x16pattern\x06\x16queue_name)
這是一個sadd操做,key是 _kombu.binding.exchange_name,前面是固定的,exchange_name是變化的;
value是 routing_key、pattern、和綁定的隊列名。\x06\x16是分隔符。這能夠從redis裏面看出:

 



生產者在publish消息時,調用的是:
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):

能夠看到,生產者只須要知道exchange、routing_key就能夠發消息到隊列。發送到redis的消息內容以下:code

這是個lpush命令,key是隊列名、value是消息內容連同元數據:orm

lpush queue_name => [message, ... ]

 

 

生產者producer發佈消息到此結束。blog

其中kombu對redis庫作了一下簡單的封裝,裏面有個AsyncRedis類,不過貌似沒什麼卵用。

借鑑kombu裏對redis封裝的設計,我封裝了一下redis,使用簡單,絕對無公害。地址在這:Python RedisChannel

相關文章
相關標籤/搜索