RabbitMQ基本使用

RabbitMQ

RabbitMQ能夠說是目前較爲火熱的一款消息中間件,其自己由Erlang語言進行編寫,部署簡單操做方便,是必備的一門技術棧。python

RabbitMQ官網docker

它支持各類主流語言的驅動,以下所示:函數

image-20210409172537673

那麼如今本章將用Python來探究一下RabbitMQ的使用。fetch

RabbitMQ官方提供多種安裝方式,具體可參照官網,這裏將採用Docker部署,版本爲3.8.14:this

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

同時咱們還須要爲Python安裝對應操縱RabbitMQ的驅動模塊,名爲pika,可直接經過pip進行安裝:code

pip3 install pika

基礎的p2p

簡單模式

基礎的p2p在RabbitMQ中被稱爲簡單模式,即一個生產者的信息僅能被一個消費者所接收,整個流程步驟以下:orm

  • 生產者/消費者連接RabbitMQ服務
  • 生產者/消費者建立消息隊列
  • 生產者產生消息,放入消息隊列中
  • 消費者得到消息,而且消費該消息

image-20210409183509514

生產者代碼以下:中間件

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 建立/獲取隊列
channel.queue_declare(queue="q1")

# exchange = "": 普通的p2p模式
# routing_key:放進那個隊列
# body:消息主體
channel.basic_publish(
    exchange="",
    routing_key="q1",
    body="this is a message",
)

print("The message is sent to q1!")

消費者代碼以下:對象

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 建立/獲取隊列
channel.queue_declare(queue="q1")


# 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式
def callback(ch, method, properties, body):
    print(body.decode("utf8"))
    print("The consumer successfully gets the message from the q1 queue!")


# queue:監聽的隊列
# auto_ack:自動回覆ack確認
channel.basic_consume(
    queue="q1",
    auto_ack=True,
    on_message_callback=callback,
)

# 開始監聽隊列,會一直進行監聽
channel.start_consuming()

多個消費

若是僅有一個生產者,而有多個消費者想要獲取數據,那這些消費者則會輪詢的依次的從隊列中得到數據,以下代碼可對其進行驗證,你只須要並行的多開幾個消費者便可:blog

image-20210409184725668

生產者代碼 以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()
channel.queue_declare(queue="q1")

for i in range(5):
    channel.basic_publish(
        exchange="",
        routing_key="q1",
        body="this is a message{0}".format(i),
    )

    print("The message{0} is sent to q1".format(i))

消費者代碼以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()
channel.queue_declare(queue="q1")


def callback(ch, method, properties, body):
    print(body.decode("utf8"))
    print("The consumer successfully gets the message from the q1 queue!")


channel.basic_consume(
    queue="q1",
    auto_ack=True,
    on_message_callback=callback,
)
channel.start_consuming()

相關參數

應答參數

在消費者中,有一條這樣的代碼:

# auto_ack=True

channel.basic_consume(
    queue="q1",
    auto_ack=True,
    on_message_callback=callback,
)

這條代碼的意思是一旦消費者從隊列中取出消息,不管是否消費該消息,都會當即向RabbitMQ服務發送一個我以接收,你能夠從隊列中將該消息抹除的信號。

以下圖所示:

image-20210409185148313

若是該參數設置爲False,則表明消費者向RabbitMQ的這條ack確認信號轉爲手動觸發,也就是說,咱們能夠在消費者成功的消費掉這條信息後再手動通知RabbitMQ從隊列中將該消息進行移除。

本質上,該參數若是爲False,消費者是不會取出隊列中的信息,而是徹底拷貝一份。

image-20210409185504961

在消費完成後,你能夠手動通知RabbitMQ刪除消息的代碼以下,固定寫法:

ch.basic_ack(delivery_tag=method.delivery_tag)

仍是上一個總體的消費者代碼吧...

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()
channel.queue_declare(queue="q1")


def callback(ch, method, properties, body):
    print("Processing...")
    time.sleep(3)
    # 通知RabbitMQ,你能夠刪除了
    ch.basic_ack(delivery_tag=method.delivery_tag)


# auto_ack:手動回覆ack確認
channel.basic_consume(
    queue="q1",
    auto_ack=False,
    on_message_callback=callback,
)

channel.start_consuming()

另外,若是此時你啓動3個消費者,你會發現隊列中的消息不是輪詢了,而是被第一個消費者獨佔:

image-20210409190036510

持久化參數

RabbitMQ中全部的消息都存儲在內存中,這意味着某些特殊狀況下,如RabbitMQ服務忽然宕掉以後,在隊列中的數據都會丟失。

咱們能夠對隊列進行持久化設置,讓其將數據保存在磁盤中。

有趣的是,RabbitMQ中對隊列的持久化分爲2個層次:

  • 你這個隊列要不要持久化?
  • 你這個隊列中的消息要不要持久化?

須要注意的是,在RabbitMQ的一次服務週期中,一個隊列若是已經聲明是非持久化隊列,則不能將其改變爲持久化隊列,你須要從新建立一個新的持久化隊列。

用代碼看一下實際效果吧,將下面這段生產者代碼嘗試運行:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()

# durable:若是爲True則表明着是持久化隊列,默認是False
channel.queue_declare(queue="q2", durable=True)

# delivery_mode:2是對該消息持久化,1是不持久化,默認爲1
channel.basic_publish(
    exchange="",
    routing_key="q2",
    body="持久化信息",
    properties=pika.BasicProperties(
        delivery_mode=2,
    )
)

channel.basic_publish(
    exchange="",
    routing_key="q2",
    body="非持久化信息",
)

print("The messages is sent to q2")

如今q2隊列中應該具備2條信息,咱們中止Docker容器的運行在對其從新進行啓動:

$ docker container stop rabbitmq
$ docker container start rabbitmq

而後啓動消費者,看能拿到幾條信息:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()

# durable:若是爲True則表明着是持久化隊列,默認是False
channel.queue_declare(queue="q2", durable=True)


def callback(ch, method, properties, body):
    print(body.decode("utf8"))


channel.basic_consume(
    queue="q2",
    auto_ack=True,
    on_message_callback=callback,
)

channel.start_consuming()

固然,結果只能拿到持久化信息,非持久化信息是拿不到的。

閒置消費

默認的隊列消息分發策略是輪詢分發,這會致使一個問題,如我有2個消費者:

  • 消費者A拿出消息,並處理
  • 消費者B拿出消息,並處理
  • 消費者A想拿出消息,可是消費者B尚未處理完,消費者A拿不出消息

因此咱們能夠將分發策略改成閒置消費,即誰處理的快,下一條消息就歸誰,而再也不使用輪詢分發,你只須要在消費者的下面加上這句代碼便可。

channel.basic_qos(prefetch_count=1)

仍是拿多個消費一節的例子來舉例,修改一下消費者的代碼,生產者依舊用上面的便可:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()
channel.queue_declare(queue="q1")


def callback(ch, method, properties, body):
    print(body.decode("utf8"))
    # 第二個消費者取消註釋
    time.sleep(50)
    ch.basic_ack(delivery_tag=method.delivery_tag)


# 關閉輪詢策略,改成閒置優先,必須寫在監聽的上面
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue="q1",
    auto_ack=False,
    on_message_callback=callback,
)

channel.start_consuming()

交換機模式

普通發佈訂閱

RabbitMQ中的發佈訂閱與Kafka中的有所不一樣,它必須依賴一個被稱爲交換機的東西來進行消息的發佈,整個流程以下:

  • 生產者建立交換機
  • 消費者建立隊列連接至交換機
  • 生產者建立消息,放入交換機中
  • 消費者經過隊列拿出交換機中的消息

以下圖所示:

image-20210409195346859

不一樣於p2p模式,交換機模式下全部監聽該交換機的隊列都會獲取到信息,而且傳遞給消費者。

注意!必須先啓動消費者,再啓動生產者

生產者代碼以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))
channel = connection.channel()

# 建立交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,普通的發佈訂閱模式
channel.exchange_declare(
    exchange="switch",
    exchange_type="fanout",
)

# exchange = "switch": 向交換機中發送消息
# routing_key:消息關鍵字
# body:消息主體
for i in range(5):
    channel.basic_publish(
        exchange="switch",
        routing_key="",
        body="this is a message{0}".format(i),
    )

    print("The message{0} is sent to switch".format(i))

消費者代碼以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time
import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 監聽的交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,普通類型(發佈訂閱)
channel.exchange_declare(
    exchange="switch",
    exchange_type="fanout",
)

# 建立一個用於去交換機中獲取消息的隊列
# exclusive:隊列名隨機
# result:建立結果
result = channel.queue_declare("", exclusive=True)

# 從建立結果中獲取隊列名
queue_name = result.method.queue

# 隊列綁定交換機
channel.queue_bind(
    exchange="switch",
    queue=queue_name
)

# 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式
def callback(ch, method, properties, body):
    print(body.decode("utf8"))


# queue:監聽的隊列
# auto_ack:自動回覆ack確認
channel.basic_consume(
    queue=queue_name,
    auto_ack=True,
    on_message_callback=callback,
)

# 開始監聽隊列
channel.start_consuming()

關鍵字訂閱

在上面的普通發佈訂閱模式中,只要生產者生產了數據,消費者就必須接收。

而在關鍵字訂閱中,消費者能夠篩選交換機中的數據,以下圖所示:

image-20210409200641433

咱們須要作的是改變交換機的類型爲關鍵字類型,而且指定消費者所關心的數據關鍵字。

注意!必須先啓動消費者,再啓動生產者

生產者代碼以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 建立交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,關鍵字發佈訂閱模式
channel.exchange_declare(
    exchange="switch1",
    exchange_type="direct",
)

# exchange = "switch1": 向交換機中發送消息
# routing_key:消息關鍵字
# body:消息主題
for i in range(3):
    li1 = ["新聞", "天氣", "國家"]
    li2 = ["大新聞", "好天氣", "某國家成立了"]
    channel.basic_publish(
        exchange="switch1",
        routing_key=li1[i],
        body=li2[i],
    )

    print("The message{0} is sent to switch1".format(i))

消費者代碼以下,僅能接收到大新聞:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time
import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 監聽的交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,關鍵字發佈訂閱模式
channel.exchange_declare(
    exchange="switch1",
    exchange_type="direct",
)

# 建立一個用於去交換機中獲取消息的隊列
# exclusive:隊列名隨機
# result:建立結果
result = channel.queue_declare("", exclusive=True)

# 從建立結果中獲取隊列名
queue_name = result.method.queue

# 隊列綁定交換機,僅獲取新聞相關的
channel.queue_bind(
    exchange="switch1",
    queue=queue_name,
    routing_key="新聞",
)


# 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式
def callback(ch, method, properties, body):
    print(body.decode("utf8"))


# queue:監聽的隊列
# auto_ack:自動回覆ack確認
channel.basic_consume(
    queue=queue_name,
    auto_ack=True,
    on_message_callback=callback,
)

# 開始監聽隊列
channel.start_consuming()

模糊訂閱

模糊訂閱是關鍵字訂閱的一種升級版。

關鍵字訂閱的信息必須歸於某一類型,關鍵字一個不能多一個不能少,好比我綁定了國家這個關鍵字,那麼就只能匹配國家的信息。

而對於國家.天氣、國家.新聞這種信息一律不會匹配。

而模糊訂閱就能夠作到關鍵字訂閱作不到的,咱們可使用通配符*以及#來對關鍵字進行模糊匹配。

  • *是指僅匹配後面的任意的一個字符
  • #是指匹配後面的連續多個字符

如今,咱們可使用國家.#來匹配到任何關於國家的詞彙,如國家天氣、國家新聞等等信息。

以下圖所示:

image-20210409205919907

注意!必須先啓動消費者,再啓動生產者

生產者代碼以下:

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-

import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 建立交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,模糊的訂閱模式
channel.exchange_declare(
    exchange="switch3",
    exchange_type="topic",
)

# exchange = "switch3": 向交換機中發送消息
# routing_key:消息關鍵字,必須嚴格按照.進行分割才能匹配
# body:消息主體
channel.basic_publish(
    exchange="switch3",
    routing_key="國家.新聞",
    body="xx國家的新聞",
)

channel.basic_publish(
    exchange="switch3",
    routing_key="國家.天氣",
    body="xx國家的天氣",
)

channel.basic_publish(
    exchange="switch3",
    routing_key="天氣.新聞",
    body="xx天氣的新聞",
)

print("The messages is sent to switch3")

消費者代碼以下,僅能接收到國家.新聞、國家.天氣,而對於天氣.新聞來講是接收不到的::

#!/usr/local/bin/python3
# -*- coding:utf-8 -*-
import time
import pika

# 創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672))

# 拿到操縱對象
channel = connection.channel()

# 監聽的交換機
# exchange:交換機的名字
# exchange_type:交換機的類型,模糊的訂閱模式
channel.exchange_declare(
    exchange="switch3",
    exchange_type="topic",
)

# 建立一個用於去交換機中獲取消息的隊列
# exclusive:隊列名隨機
# result:建立結果
result = channel.queue_declare("", exclusive=True)

# 從建立結果中獲取隊列名
queue_name = result.method.queue

# 隊列綁定交換機,僅獲取國家xx相關的
channel.queue_bind(
    exchange="switch3",
    queue=queue_name,
    routing_key="國家.#",
)


# 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式
def callback(ch, method, properties, body):
    print(body.decode("utf8"))


# queue:監聽的隊列
# auto_ack:自動回覆ack確認
channel.basic_consume(
    queue=queue_name,
    auto_ack=True,
    on_message_callback=callback,
)

# 開始監聽隊列
channel.start_consuming()
相關文章
相關標籤/搜索