消息隊列之RabbitMQ

  消息隊列產品有不少,好比 ActiveMQ、RabbitMQ ,ZeroMQ ,RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,本文先講 RabbitMQ ,在此以前先看下消息隊列的相關概念。

什麼是消息隊列

  消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。html

  消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。redis

RabbitMQ特色

  RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:數據庫

  1. 可靠性(Reliability)
    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。服務器

  2. 靈活的路由(Flexible Routing)
    在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。網絡

  3. 消息集羣(Clustering)
    多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。分佈式

  4. 高可用(Highly Available Queues)
    隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。ide

  5. 多種協議(Multi-protocol)
    RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。fetch

  6. 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。加密

  7. 管理界面(Management UI)
    RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。spa

  8. 跟蹤機制(Tracing)
    若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。

  9. 插件機制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

RabbitMQ的安裝能夠參考博客:

  https://www.cnblogs.com/ericli-ericli/p/5902270.html

RabbitMQ的概念模型

基本模型

全部 MQ 產品從模型抽象上來講都是同樣的過程:
  消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。

基本概念

  • Message
    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
  • Publisher
    消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
  • Exchange
    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
  • Binding
    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
  • Queue
    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
  • Connection
    網絡鏈接,好比一個TCP鏈接。
  • Channel
    信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
  • Consumer
    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Virtual Host
    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
  • Broker
    表示消息隊列服務器實體。

RabbitMQ實現消息傳遞實例

最簡單的隊列通訊
接收端
import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello1')

def callback(ch, method, properties, body):
    print('>>',ch,method,properties)
    time.sleep(30)
    print(" [x] Received %r" % body)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='hello1',
                      no_ack=True
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

發送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello1')

channel.basic_publish(exchange='',
                      routing_key='hello1',
                      body='Hello World!')
connection.close()

PS:遠程鏈接rabbitmq server的話,須要配置權限

可參考博客:https://www.cnblogs.com/alex3714/articles/5248247.html

 

  在這種工做模式下,若是啓動了多個接收端,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),至關於輪詢。

  可是上面這樣寫的話,隊列裏面的消息若是沒有被所有取出來,並且同時RabbitMQ又down機了,那這樣的話隊列裏面的消息就都沒有了,因此咱們須要作一個消息的持久化

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello',durable=True)

def callback(ch, method, properties, body):
    print('>>',ch,method,properties)
    time.sleep(30)
    print(" [x] Received %r" % body)


channel.basic_consume(callback,
                      queue='hello',
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive端
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 聲明queue
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
send端
消息公平分發

  若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(60)
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()
接收端
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,
                      ))
connection.close()
發送端
Publish\Subscribe(消息發佈\訂閱) 

  以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了。

fanout: 全部bind到此exchange的queue均可以接收消息

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
publisher
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

 

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

PS:表達式符號說明:#表明收全部,*表明任何字符。(在終端運行,運行命令後面跟上符合條件的topic)

相關文章
相關標籤/搜索