最近因爲業務須要進行性能升級,將原來須要通過http進行數據交互的方式修改成消息隊列的形式。因而原來的同步處理的方式變成了異步處理,在必定程度上提高咱們系統的性能,不過debug的時候,難免哭了出來。由於每一個環節都須要進行詳細檢查。
對於RabbitMQ,咱們知道,其是AMQP的一種代理服服務器,具備一套嚴格的通訊方式,即在覈心產品進行通訊的各個方面幾乎都採用了RPC(Remote Procedure Call, 遠程過程調用)
模式。python
RabbitMQ通訊時用到的類和方法與AMQP協議層面的類和方法一一對應。所以AMQP本質上是RPC的一種傳輸機制shell
AMQ(Advanced Message Queuing)
模型,這個模型是針對代理服務器軟件例如(RabbitMQ)設計的,該模型在邏輯上定義了三種抽象組件用於指定消息的路由行爲,分別是:api
Exchange
,消息代理服務器中用於把消息路由到隊列的組件Queue
,用來存儲消息的數據結構,位於硬盤或內存中,以FIFO的順序進行投遞Binding
,一套規則,用於告訴交換器消息應該被存儲到哪一個隊列
在將消息發佈到隊列以前,咱們須要經歷過如下若干個步驟。至少,必需要設置交換器和隊列,而後將他們綁定再一塊兒。接下來咱們將經過python來實現AMQP機制。
我用到了pika這個庫,須要的話,須要經過如下指令安裝。該庫實現了絕大部分rabbitmq的api以及提供了相關的調優參數,後續有機會不妨能夠詳談。服務器
pip install pika
交換器在AMQ模型中是很是重要的角色存在。所以,在AMQP規範中都有本身的類。聲明一個交換器,咱們能夠直接在控制檯界面進行建立。
不過這樣僅僅是在極少數的狀況下才適合,動手調戲鼠標對開發工程師的來講實在是太蠢啦,能玩鍵盤就別玩鼠標啊,咱們不妨經過如下代碼來聲明(建立)一個交換器。pika內置函數會事先經過get的方式來檢查咱們待聲明的交換器是否存在,若是存在則不建立,不然建立一個新的交換器。數據結構
self.channel.exchange_declare( exchange=exchange, exchange_type="direct", passive=False, durable=True, auto_delete=False)
一旦交換器建立成功,就能夠經過發送相似queue.declare命令讓rabbitmq建立一個隊列。一樣的,咱們仍然能夠在圖形化界面裏面建立隊列。
仍是那句話,動手調戲鼠標對開發工程師的來講實在是太蠢啦,能玩鍵盤就別玩鼠標啊,咱們不妨經過如下代碼來聲明(建立)若干個隊列。pika內置函數會事先經過get的方式來檢查咱們待聲明的隊列是否存在,若是存在則不建立,不然建立一個新的隊列。架構
self.channel.queue_declare(queue=queue, durable=True)
當隊列同名時,即若是咱們屢次發送同一個queue.declare命令並不會有任何反作用,由於RabbitMQ並不會處理後續的隊列聲明,究其緣由,每次建立都會先經過get的方式調用消息隊列引擎查詢隊列是否存在。若是須要返回隊列相關的有用信息,則將會返回隊列中待處理消息的數量以及該隊列的消費者數量。固然了若是隊列同名,並且新隊列的屬性與原有的隊列不同,那麼RabbitMQ將關閉發出的RPC請求的信道,返回403錯誤app
一旦建立了交換器和隊列,以後就能夠將它們綁定在一塊兒了,如同queue.declare命令,將隊列綁定到交換器Queue.Bind每次只能指定一個隊列。咱們既能夠經過圖形化界面進行綁定,也能夠經過代碼實現這個效果dom
self.channel.queue_bind( queue=queue, exchange=exchange, routing_key=rk)
發佈消息到RabbitMQ時,多個幀封裝了發送到服務器的消息數據。在實際的消息內容到達rabbitMQ以前,客戶端應用程序會發送一個basic.publish方法幀、一個內容頭幀和至少一個消息體幀。
默認狀況下,只要沒有消費者正在監聽隊列,消息就會被存儲在隊列中。當添加更多消息時,隊列大小也會隨之增長。RabbitMQ能夠將這些消息保存在內存或者寫入磁盤。異步
def produce(self, body): self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body, properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1) )
一旦發佈消息被路由而且保存在一個或者多個隊列中,剩下的就是如何對其進行消費。注意到,發送和消費是異步的。 消費時,可讓RabbitMQ知道如何消費他們
Basic.Consume命令中
no_ack爲true時,RabbitMQ將連續發送消息直到消費者發送一個Basic.Cancel命令或者斷開鏈接爲止
若是爲false,則須要發送一個Basic.Ack來確認收到每條消息的請求ide
def on_message(chan, method_frame, _header_frame, body, userdata=None): """Called when a message is received. Log message and ack it.""" # LOGGER.info('Userdata: %s Message body: %s', userdata, body) # print(" [x] Received %r" % body.decode()) data = body.decode() result = alarmFun(data) publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm') publish.produce(result) # chan.basic_ack(delivery_tag=method_frame.delivery_tag) on_message_callback = functools.partial(on_message) self.channel.basic_consume(on_message_callback=on_message_callback, queue=self.queue, auto_ack=True )
通過前面的描述,咱們須要理論聯繫實踐,讓咱們經過python開發消費者角色和發佈者角色。
按照配置流程,咱們須要初始化鏈接、配置交換器、隊列、綁定,而後才能經過鏈接件信息推送(publish)到隊列中。
import logging from random import randint import pika BROKER_USER = os.environ.get('BROKER_USER', 'guest') BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest') BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1') BROKER_PORT = os.environ.get('BROKER_PORT', '5672') BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost') CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST) # logging.basicConfig(level=logging.DEBUG) # LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' # '-35s %(lineno) -5d: %(message)s') # LOGGER = logging.getLogger(__name__) class Publish(object): def __init__(self, exchange, queue, rk): # LOGGER.info('Connecting to %s', BROKER_URL) # logging.basicConfig(level=logging.DEBUG) self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD) # 經過這個方式設置備用鏈路,保證connection穩定性 self.parameters = ( pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials), pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5, retry_delay=1)) self.connection = pika.BlockingConnection(self.parameters) self.channel = self.connection.channel() self.exchange = exchange self.channel.exchange_declare( exchange=exchange, exchange_type="direct", passive=False, durable=True, auto_delete=False) self.channel.queue_declare(queue=queue, durable=True) self.route_key = rk def produce(self, body): self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body, properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1) ) def close(self): self.connection.close() def test(): publish = Publish(exchange='test_yerik', queue='test_test', rk='rk-test_test') for i in range(1, 10000): publish.produce(randint(1, 100).__str__()) publish.close() if __name__ == '__main__': test()
消費者的設計和生產者在初始化的時候設計大體相同,都是經過創建鏈接、開啓channel、exange、queue、bind等過程,主要的區別在於commsum
import functools import logging import pika BROKER_USER = os.environ.get('BROKER_USER', 'guest') BROKER_PASSWD = os.environ.get('BROKER_PASSWD', 'guest') BROKER_IP = os.environ.get('BROKER_IP', '127.0.0.1') BROKER_PORT = os.environ.get('BROKER_PORT', '5672') BROKER_VHOST = os.environ.get('BROKER_VHOST', 'my_vhost') CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' BROKER_URL = 'amqp://{}:{}@{}:{}/{}'.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST) # print('pika version: %s' % pika.__version__) # logging.basicConfig(level=logging.DEBUG) # LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' # '-35s %(lineno) -5d: %(message)s') # LOGGER = logging.getLogger(__name__) from apps.alarm.alarmfun import alarmFun from apps.utils.rabbitmq.publish import Publish class Consummer(object): def __init__(self, exchange, queue, rk): # LOGGER.info('Connecting to %s', BROKER_URL) self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD) self.parameters = ( pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials), pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5, retry_delay=1)) self.connection = pika.BlockingConnection(self.parameters) self.channel = self.connection.channel() self.exchange = exchange self.channel.basic_qos(prefetch_count=1) self.exchange = exchange self.queue = queue self.channel.exchange_declare( exchange=exchange, exchange_type="direct", passive=False, durable=True, auto_delete=False) self.channel.queue_declare(queue=queue, durable=True) self.channel.queue_bind( queue=queue, exchange=exchange, routing_key=rk) self.channel.basic_qos(prefetch_count=1) def consum_message(self): # LOGGER.info('Comsummer by {}'.format(name)) def on_message(chan, method_frame, _header_frame, body, userdata=None): """Called when a message is received. Log message and ack it.""" # LOGGER.info('Userdata: %s Message body: %s', userdata, body) # print(" [x] Received %r" % body.decode()) data = body.decode() result = alarmFun(data) publish = Publish(exchange='spider', queue='alarm', rk='rk-alarm') publish.produce(result) # chan.basic_ack(delivery_tag=method_frame.delivery_tag) on_message_callback = functools.partial(on_message) self.channel.basic_consume(on_message_callback=on_message_callback, queue=self.queue, auto_ack=True ) try: self.channel.start_consuming() except KeyboardInterrupt: self.channel.stop_consuming() def cancel(self): self.connection.close() def test(): consummer = Consummer('test_yerik', 'test_test', 'rk-test_test') consummer.consum_message() print(consummer.receive) if __name__ == '__main__': test()
參考文檔:
- 深刻RabbitMQ, Gavin M.Roy 著 汪佳南 鄭天民 譯