RabbitMq 是實現了高級消息隊列協議(AMQP)的開源消息代理中間件。消息隊列是一種應用程序對應用程序的通行方式,應用程序經過寫消息,將消息傳遞於隊列,由另外一應用程序讀取 完成通訊。而做爲中間件的 RabbitMq 無疑是目前最流行的消息隊列之一。python
RabbitMq 應用場景普遍:正則表達式
生產者(producter):隊列消息的產生者,負責生產消息,並將消息傳入隊列json
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列須要指定參數 virtual_host,若是是默認的能夠不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則建立 result = channel.queue_declare(queue = 'python-test') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向隊列插入數值 routing_key是隊列名 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message) print(message) connection.close()
消費者(consumer):隊列消息的接收者,負責 接收並處理 消息隊列中的消息服務器
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 申明消息隊列,消息在這個隊列傳遞,若是不存在,則建立隊列 channel.queue_declare(queue = 'python-test', durable = False) # 定義一個回調函數來處理消息隊列中的消息,這裏是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) # 告訴rabbitmq,用callback來接收消息 channel.basic_consume(callback, queue = 'python-test') # 開始接收信息,並進入阻塞狀態,隊列裏有信息纔會調用callback進行處理 channel.start_consuming()
MQ默認創建的是臨時 queue 和 exchange,若是不聲明持久化,一旦 rabbitmq 掛掉,queue、exchange 將會所有丟失。因此咱們通常在建立 queue 或者 exchange 的時候會聲明 持久化。架構
1.queue 聲明持久化併發
# 聲明消息隊列,消息將在這個隊列傳遞,如不存在,則建立。durable = True 表明消息隊列持久化存儲,False 非持久化存儲 result = channel.queue_declare(queue = 'python-test',durable = True)
2.exchange 聲明持久化分佈式
# 聲明exchange,由exchange指定消息在哪一個隊列傳遞,如不存在,則建立.durable = True 表明exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:若是已存在一個非持久化的 queue 或 exchange ,執行上述代碼會報錯,由於當前狀態不能更改 queue 或 exchange 存儲屬性,須要刪除重建。若是 queue 和 exchange 中一個聲明瞭持久化,另外一個沒有聲明持久化,則不容許綁定。函數
3.消息持久化高併發
雖然 exchange 和 queue 都申明瞭持久化,但若是消息只存在內存裏,rabbitmq 重啓後,內存裏的東西仍是會丟失。因此必須聲明消息也是持久化,從內存轉存到硬盤。fetch
# 向隊列插入數值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4.acknowledgement 消息不丟失
消費者(consumer)調用callback函數時,會存在處理消息失敗的風險,若是處理失敗,則消息丟失。可是也能夠選擇消費者處理失敗時,將消息回退給 rabbitmq ,從新再被消費者消費,這個時候須要設置確認標識。
channel.basic_consume(callback,queue = 'python-test', # no_ack 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,不管調用callback成功與否,消息都被消費掉 no_ack = False)
rabbitmq 的發佈與訂閱要藉助交換機(Exchange)的原理實現:
Exchange 一共有三種工做模式:fanout, direct, topicd
這種模式下,傳遞到 exchange 的消息將會轉發到全部與其綁定的 queue 上。
發佈者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列須要指定參數 virtual_host,若是是默認的能夠不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明exchange,由exchange指定消息在哪一個隊列傳遞,如不存在,則建立。durable = True 表明exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout')
for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 向隊列插入數值 routing_key是隊列名。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化。routing_key 不須要配置 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
訂閱者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立臨時隊列,consumer關閉後,隊列自動刪除 result = channel.queue_declare(exclusive=True) # 聲明exchange,由exchange指定消息在哪一個隊列傳遞,如不存在,則建立。durable = True 表明exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 綁定exchange和隊列 exchange 使咱們可以確切地指定消息應該到哪一個隊列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue) # 定義一個回調函數來處理消息隊列中的消息,這裏是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(callback, queue = result.method.queue, # 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,不管調用callback成功與否,消息都被消費掉 no_ack = False) channel.start_consuming()
這種工做模式的原理是 消息發送至 exchange,exchange 根據 路由鍵(routing_key)轉發到相對應的 queue 上。
發佈者:
import pika import json credentials = pika.PlainCredentials('shampoo', '123456') # mq用戶名和密碼 # 虛擬隊列須要指定參數 virtual_host,若是是默認的能夠不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel=connection.channel() # 聲明exchange,由exchange指定消息在哪一個隊列傳遞,如不存在,則建立。durable = True 表明exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') for i in range(10): message=json.dumps({'OrderId':"1000%s"%i}) # 指定 routing_key。delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = 'python-test',routing_key = 'OrderId',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
消費者:
import pika credentials = pika.PlainCredentials('shampoo', '123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.1.62.170',port = 5672,virtual_host = '/',credentials = credentials)) channel = connection.channel() # 建立臨時隊列,consumer關閉後,隊列自動刪除 result = channel.queue_declare(exclusive=True) # 聲明exchange,由exchange指定消息在哪一個隊列傳遞,如不存在,則建立。durable = True 表明exchange持久化存儲,False 非持久化存儲 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='direct') # 綁定exchange和隊列 exchange 使咱們可以確切地指定消息應該到哪一個隊列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue,routing_key='OrderId') # 定義一個回調函數來處理消息隊列中的消息,這裏是打印出來 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) #channel.basic_qos(prefetch_count=1) # 告訴rabbitmq,用callback來接受消息 channel.basic_consume(callback, queue = result.method.queue, # 設置成 False,在調用callback函數時,未收到確認標識,消息會重回隊列。True,不管調用callback成功與否,消息都被消費掉 no_ack = False) channel.start_consuming()
這種模式和第二種模式差很少,exchange 也是經過 路由鍵 routing_key 來轉發消息到指定的 queue 。 不一樣點是 routing_key 使用正則表達式支持模糊匹配,但匹配規則又與常規的正則表達式不一樣,好比‘’#‘’是匹配所有,「*」是匹配一個詞。
舉例:routing_key =「#orderid#」,意思是將消息轉發至全部 routing_key 包含 「orderid」 字符的隊列中。代碼和模式二 相似,就不貼出來了。