RabbitMQ消息隊列

什麼是rabbitMQ

rabbitMQ是一款基於AMQP協議的消息中間件,它可以在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。並且使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。並且兩端可使用不一樣的語言編寫,大大提供了靈活性。python

rabbitMQ基本概念

  • exchange: producer只能將消息發送給exchange。而exchange負責將消息發送到queues。Exchange必須準確的知道怎麼處理它接受到的消息,是被髮送到一個特定的queue仍是許多quenes,仍是被拋棄,這些規則則是經過exchange type來定義。主要的type有direct,topic,headers,fanout。具體針對不一樣的場景使用不一樣的type。
  • queue: 消息隊列,消息的載體。接收來自exchange的消息,而後再由consumer取出。exchange和queue是能夠一對多的,它們經過routingKey來綁定。
  • Producer:生產者,消息的來源,消息必須發送給exchange。而不是直接給queue
  • Consumer:消費者,直接從queue中獲取消息進行消費,而不是從exchange。

從以上能夠看出Rabbitmq工做原理大體就是producer把一條消息發送給exchange。rabbitMQ根據routingKey負責將消息從exchange發送到對應綁定的queue中去,這是由rabbitMQ負責作的。而consumer只需從queue獲取消息便可。基本效果圖以下: 服務器

持久化問題

消息確認機制

這裏就會有一個問題,若是consumer在執行任務時須要花費一些時間,這個時候若是忽然掛了,消息尚未被完成,消息豈不是丟失了,爲了避免讓消息丟失,rabbitmq提供了消息確認機制,consumer在接收到,執行完消息後會發送一個ack給rabbitmq告訴它能夠從queue中移除消息了。若是沒收到ack。Rabbitmq會從新發送此條消息,若是有其餘的consumer在線,將會接收並消費這條消息。消息確認機制是默認打開的。若是想關閉它只須要設置no_ack=true。在此處咱們不須要設置。默認以下就行。負載均衡

隊列持久化

  • 除了consumer以外咱們還得確保rabbitMQ掛了以後消息不被丟失。這裏咱們就須要確保隊列queue和消息messages都得是持久化的。
  • 隊列的持久話須要設置durable屬性。
channel.queue_declare(queue= task_queue, durable=True)

 消息持久化

  消息的持久話則是經過delivery_mode屬性,設置值爲2便可。fetch

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

簡單發送模型

在rabbit MQ裏消息永遠不能被直接發送到queue。這裏咱們經過提供一個空字符串來使用默認的exchange。這個exchange是特殊的,它能夠根據routingKey把消息發送給指定的queue。因此咱們的設計看起來以下所示:ui

發送端:設計

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
 
#聲明queue
channel.queue_declare(queue='hello')
 
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

  接收端:3d

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
 
 
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 工做隊列模式

一個生產者發送消息到隊列中,有多個消費者共享一個隊列,每一個消費者獲取的消息是惟一的。code

消息公平分發原則(相似負載均衡)

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。server

 

channel.basic_qos(prefetch_count=1)

 

  生產者端: 中間件

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

 消費者端:

import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

Publish\Subscribe(消息發佈\訂閱)之廣播模式

在前面2個示例咱們都適用默認的exchange。這裏咱們將本身定義一個exchange。並設置type爲fanout。它能夠將消息廣播給綁定的每個queue。而再也不是某一個queue。 

       

  生產者端:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

   消費者端:

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
#不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(exclusive=True) 
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

Publish\Subscribe(消息發佈\訂閱)之direct模式

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。 

  生產者端: 

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

 消費者端:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

   運行結果:

發送消息

只收到warning的消息

只收到error的消息

error和waring的都能收到

Publish\Subscribe(消息發佈\訂閱)之Topic模式

這種模型是最靈活的,相比較於direct的徹底匹配和fanout的廣播。Topic能夠用相似正則的手法更好的匹配來知足咱們的應用。下面咱們首先了解一下topic類型的exchange。

topic類型的routing_key不能夠是隨意的單詞,它必須是一系列的單詞組合,中間以點號隔開,譬如「quick.orange.rabbit」這個樣子。發送消息的routing_key必須匹配上綁定到隊列的routing_key。消息纔會被髮送。

此外還有個重要的地方要說明,在以下代碼處綁定的routing_key種能夠有*和#2種字符。它們表明的意義以下:

  • *(星號) :能夠匹配任意一個單詞
  • #(井號) :能夠匹配0到多個單詞

由圖可知,Q1匹配3個單詞中間爲orange的routing_key ,而Q2能夠匹配3個單詞最後一個單詞爲rabbit和第一個單詞爲lazy後面能夠有多個單詞的routing_key。

生產者端:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

   消費者端:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 RPC模式

當咱們須要在遠程服務器上執行一個方法並等待它的結果的時候,咱們將這種模式稱爲RPC。

在rabbit MQ中爲了能讓client收到server端的response message。須要定義一個callback queue,不過如今有一個問題,就是每次請求都會建立一個callback queue .這樣的效率是極其低下的。幸運的是咱們能夠經過correlation_id爲每個client建立一個單獨的callback queue。經過指定correlation_id咱們能夠知道callback queue中的消息屬於哪一個client。要作到這樣只需client每次發送請求時帶上這惟一的correlation_id。而後當咱們從callback queue中收到消息時,咱們能基於 correlation_id 匹配上咱們的消息。匹配不上的消息將被丟棄,看上去就像下圖這樣:

  

總結一下流程以下:

    1. client發起請求,請求中帶有2個參數reply_to和correlation_id
    2. 請求發往rpc_queue
    3. server獲取到rpc_queue中的消息,處理完畢後,將結果發往reply_to指定的callback queue
    4. client 獲取到callback queue中的消息,匹配correlation_id,若是匹配就獲取,不匹配就丟棄.

  生產者端:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='rpc_queue')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()

   消費者端:

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
相關文章
相關標籤/搜索