RabbitMQ能夠說是目前較爲火熱的一款消息中間件,其自己由Erlang語言進行編寫,部署簡單操做方便,是必備的一門技術棧。python
RabbitMQ官網docker
它支持各類主流語言的驅動,以下所示:函數
那麼如今本章將用Python來探究一下RabbitMQ的使用。fetch
RabbitMQ官方提供多種安裝方式,具體可參照官網,這裏將採用Docker部署,版本爲3.8.14:this
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
同時咱們還須要爲Python安裝對應操縱RabbitMQ的驅動模塊,名爲pika,可直接經過pip進行安裝:code
pip3 install pika
基礎的p2p在RabbitMQ中被稱爲簡單模式,即一個生產者的信息僅能被一個消費者所接收,整個流程步驟以下:orm
生產者代碼以下:中間件
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 建立/獲取隊列 channel.queue_declare(queue="q1") # exchange = "": 普通的p2p模式 # routing_key:放進那個隊列 # body:消息主體 channel.basic_publish( exchange="", routing_key="q1", body="this is a message", ) print("The message is sent to q1!")
消費者代碼以下:對象
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 建立/獲取隊列 channel.queue_declare(queue="q1") # 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式 def callback(ch, method, properties, body): print(body.decode("utf8")) print("The consumer successfully gets the message from the q1 queue!") # queue:監聽的隊列 # auto_ack:自動回覆ack確認 channel.basic_consume( queue="q1", auto_ack=True, on_message_callback=callback, ) # 開始監聽隊列,會一直進行監聽 channel.start_consuming()
若是僅有一個生產者,而有多個消費者想要獲取數據,那這些消費者則會輪詢的依次的從隊列中得到數據,以下代碼可對其進行驗證,你只須要並行的多開幾個消費者便可:blog
生產者代碼 以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() channel.queue_declare(queue="q1") for i in range(5): channel.basic_publish( exchange="", routing_key="q1", body="this is a message{0}".format(i), ) print("The message{0} is sent to q1".format(i))
消費者代碼以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() channel.queue_declare(queue="q1") def callback(ch, method, properties, body): print(body.decode("utf8")) print("The consumer successfully gets the message from the q1 queue!") channel.basic_consume( queue="q1", auto_ack=True, on_message_callback=callback, ) channel.start_consuming()
在消費者中,有一條這樣的代碼:
# auto_ack=True channel.basic_consume( queue="q1", auto_ack=True, on_message_callback=callback, )
這條代碼的意思是一旦消費者從隊列中取出消息,不管是否消費該消息,都會當即向RabbitMQ服務發送一個我以接收,你能夠從隊列中將該消息抹除的信號。
以下圖所示:
若是該參數設置爲False,則表明消費者向RabbitMQ的這條ack確認信號轉爲手動觸發,也就是說,咱們能夠在消費者成功的消費掉這條信息後再手動通知RabbitMQ從隊列中將該消息進行移除。
本質上,該參數若是爲False,消費者是不會取出隊列中的信息,而是徹底拷貝一份。
在消費完成後,你能夠手動通知RabbitMQ刪除消息的代碼以下,固定寫法:
ch.basic_ack(delivery_tag=method.delivery_tag)
仍是上一個總體的消費者代碼吧...
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() channel.queue_declare(queue="q1") def callback(ch, method, properties, body): print("Processing...") time.sleep(3) # 通知RabbitMQ,你能夠刪除了 ch.basic_ack(delivery_tag=method.delivery_tag) # auto_ack:手動回覆ack確認 channel.basic_consume( queue="q1", auto_ack=False, on_message_callback=callback, ) channel.start_consuming()
另外,若是此時你啓動3個消費者,你會發現隊列中的消息不是輪詢了,而是被第一個消費者獨佔:
RabbitMQ中全部的消息都存儲在內存中,這意味着某些特殊狀況下,如RabbitMQ服務忽然宕掉以後,在隊列中的數據都會丟失。
咱們能夠對隊列進行持久化設置,讓其將數據保存在磁盤中。
有趣的是,RabbitMQ中對隊列的持久化分爲2個層次:
須要注意的是,在RabbitMQ的一次服務週期中,一個隊列若是已經聲明是非持久化隊列,則不能將其改變爲持久化隊列,你須要從新建立一個新的持久化隊列。
用代碼看一下實際效果吧,將下面這段生產者代碼嘗試運行:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() # durable:若是爲True則表明着是持久化隊列,默認是False channel.queue_declare(queue="q2", durable=True) # delivery_mode:2是對該消息持久化,1是不持久化,默認爲1 channel.basic_publish( exchange="", routing_key="q2", body="持久化信息", properties=pika.BasicProperties( delivery_mode=2, ) ) channel.basic_publish( exchange="", routing_key="q2", body="非持久化信息", ) print("The messages is sent to q2")
如今q2隊列中應該具備2條信息,咱們中止Docker容器的運行在對其從新進行啓動:
$ docker container stop rabbitmq $ docker container start rabbitmq
而後啓動消費者,看能拿到幾條信息:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() # durable:若是爲True則表明着是持久化隊列,默認是False channel.queue_declare(queue="q2", durable=True) def callback(ch, method, properties, body): print(body.decode("utf8")) channel.basic_consume( queue="q2", auto_ack=True, on_message_callback=callback, ) channel.start_consuming()
固然,結果只能拿到持久化信息,非持久化信息是拿不到的。
默認的隊列消息分發策略是輪詢分發,這會致使一個問題,如我有2個消費者:
因此咱們能夠將分發策略改成閒置消費,即誰處理的快,下一條消息就歸誰,而再也不使用輪詢分發,你只須要在消費者的下面加上這句代碼便可。
channel.basic_qos(prefetch_count=1)
仍是拿多個消費一節的例子來舉例,修改一下消費者的代碼,生產者依舊用上面的便可:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() channel.queue_declare(queue="q1") def callback(ch, method, properties, body): print(body.decode("utf8")) # 第二個消費者取消註釋 time.sleep(50) ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉輪詢策略,改成閒置優先,必須寫在監聽的上面 channel.basic_qos(prefetch_count=1) channel.basic_consume( queue="q1", auto_ack=False, on_message_callback=callback, ) channel.start_consuming()
RabbitMQ中的發佈訂閱與Kafka中的有所不一樣,它必須依賴一個被稱爲交換機的東西來進行消息的發佈,整個流程以下:
以下圖所示:
不一樣於p2p模式,交換機模式下全部監聽該交換機的隊列都會獲取到信息,而且傳遞給消費者。
注意!必須先啓動消費者,再啓動生產者
生產者代碼以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) channel = connection.channel() # 建立交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,普通的發佈訂閱模式 channel.exchange_declare( exchange="switch", exchange_type="fanout", ) # exchange = "switch": 向交換機中發送消息 # routing_key:消息關鍵字 # body:消息主體 for i in range(5): channel.basic_publish( exchange="switch", routing_key="", body="this is a message{0}".format(i), ) print("The message{0} is sent to switch".format(i))
消費者代碼以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 監聽的交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,普通類型(發佈訂閱) channel.exchange_declare( exchange="switch", exchange_type="fanout", ) # 建立一個用於去交換機中獲取消息的隊列 # exclusive:隊列名隨機 # result:建立結果 result = channel.queue_declare("", exclusive=True) # 從建立結果中獲取隊列名 queue_name = result.method.queue # 隊列綁定交換機 channel.queue_bind( exchange="switch", queue=queue_name ) # 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式 def callback(ch, method, properties, body): print(body.decode("utf8")) # queue:監聽的隊列 # auto_ack:自動回覆ack確認 channel.basic_consume( queue=queue_name, auto_ack=True, on_message_callback=callback, ) # 開始監聽隊列 channel.start_consuming()
在上面的普通發佈訂閱模式中,只要生產者生產了數據,消費者就必須接收。
而在關鍵字訂閱中,消費者能夠篩選交換機中的數據,以下圖所示:
咱們須要作的是改變交換機的類型爲關鍵字類型,而且指定消費者所關心的數據關鍵字。
注意!必須先啓動消費者,再啓動生產者
生產者代碼以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 建立交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,關鍵字發佈訂閱模式 channel.exchange_declare( exchange="switch1", exchange_type="direct", ) # exchange = "switch1": 向交換機中發送消息 # routing_key:消息關鍵字 # body:消息主題 for i in range(3): li1 = ["新聞", "天氣", "國家"] li2 = ["大新聞", "好天氣", "某國家成立了"] channel.basic_publish( exchange="switch1", routing_key=li1[i], body=li2[i], ) print("The message{0} is sent to switch1".format(i))
消費者代碼以下,僅能接收到大新聞:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 監聽的交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,關鍵字發佈訂閱模式 channel.exchange_declare( exchange="switch1", exchange_type="direct", ) # 建立一個用於去交換機中獲取消息的隊列 # exclusive:隊列名隨機 # result:建立結果 result = channel.queue_declare("", exclusive=True) # 從建立結果中獲取隊列名 queue_name = result.method.queue # 隊列綁定交換機,僅獲取新聞相關的 channel.queue_bind( exchange="switch1", queue=queue_name, routing_key="新聞", ) # 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式 def callback(ch, method, properties, body): print(body.decode("utf8")) # queue:監聽的隊列 # auto_ack:自動回覆ack確認 channel.basic_consume( queue=queue_name, auto_ack=True, on_message_callback=callback, ) # 開始監聽隊列 channel.start_consuming()
模糊訂閱是關鍵字訂閱的一種升級版。
關鍵字訂閱的信息必須歸於某一類型,關鍵字一個不能多一個不能少,好比我綁定了國家這個關鍵字,那麼就只能匹配國家的信息。
而對於國家.天氣、國家.新聞這種信息一律不會匹配。
而模糊訂閱就能夠作到關鍵字訂閱作不到的,咱們可使用通配符*以及#來對關鍵字進行模糊匹配。
如今,咱們可使用國家.#來匹配到任何關於國家的詞彙,如國家天氣、國家新聞等等信息。
以下圖所示:
注意!必須先啓動消費者,再啓動生產者
生產者代碼以下:
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 建立交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,模糊的訂閱模式 channel.exchange_declare( exchange="switch3", exchange_type="topic", ) # exchange = "switch3": 向交換機中發送消息 # routing_key:消息關鍵字,必須嚴格按照.進行分割才能匹配 # body:消息主體 channel.basic_publish( exchange="switch3", routing_key="國家.新聞", body="xx國家的新聞", ) channel.basic_publish( exchange="switch3", routing_key="國家.天氣", body="xx國家的天氣", ) channel.basic_publish( exchange="switch3", routing_key="天氣.新聞", body="xx天氣的新聞", ) print("The messages is sent to switch3")
消費者代碼以下,僅能接收到國家.新聞、國家.天氣,而對於天氣.新聞來講是接收不到的::
#!/usr/local/bin/python3 # -*- coding:utf-8 -*- import time import pika # 創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", port=5672)) # 拿到操縱對象 channel = connection.channel() # 監聽的交換機 # exchange:交換機的名字 # exchange_type:交換機的類型,模糊的訂閱模式 channel.exchange_declare( exchange="switch3", exchange_type="topic", ) # 建立一個用於去交換機中獲取消息的隊列 # exclusive:隊列名隨機 # result:建立結果 result = channel.queue_declare("", exclusive=True) # 從建立結果中獲取隊列名 queue_name = result.method.queue # 隊列綁定交換機,僅獲取國家xx相關的 channel.queue_bind( exchange="switch3", queue=queue_name, routing_key="國家.#", ) # 回調函數:ch,method,properties都是固定寫法,body參數是消息體,bytes格式 def callback(ch, method, properties, body): print(body.decode("utf8")) # queue:監聽的隊列 # auto_ack:自動回覆ack確認 channel.basic_consume( queue=queue_name, auto_ack=True, on_message_callback=callback, ) # 開始監聽隊列 channel.start_consuming()