MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較爲成熟的MQ產品有IBM WEBSPHERE MQ等等。python
與如下二者不一樣的是,rabbitMq 能夠跨進程。socket
線程queue:同一進程下不一樣線程之間的交互。函數
進程queue:父進程與子進程進行交互,或者同屬於同一父進程下多個子進程進行交互。不一樣進程之間不能交互。spa
RabbitMQ能夠同時維護不少的隊列,生產者能夠把消息放到不一樣的隊列發送給消費者。線程
send端blog
import pika #至關於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = conn.channel() #聲明queue channel.queue_declare(queue='hello') channel.basic_publish(exchange='',routing_key='hello',body='Hello World!') # routing_key 消息的key body 消息的內容 print('Sent "hello world"') conn.close()
運行結果隊列
Sent "hello world"
receive端進程
import pika #至關於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = conn.channel() #聲明queue 這裏能夠不用聲明,可是若是消費者先運行,又不但願出錯,就要消費者先運行 channel.queue_declare(queue='hello') def callback(ch,method,properties,body): print('[x] Received %r' % body ) channel.basic_consume(callback, queue='hello',no_ack=True ) #消費消息 若是收到消息就調用CALLBACK函數處理 print('[*] Waiting for message.To exit press CTRL+C') channel.start_consuming() #開始收消息
運行結果get
[*] Waiting for message.To exit press CTRL+C [x] Received b'Hello World!'
代碼同上,啓動三個消費者和一個生產者,第一個消費者會接收到生產者的第一個消息,第二個消費者會接收到生產者的第二個消息,這樣依次輪訓。消息隊列
若是某一個消費者在處理消息的過程當中,斷電或者當機了,那麼這個消息狀態須要確認。消費者處理完成後髮狀態給生產者,生產者把消息從隊列裏刪除。
no_ack=True (消費者端參數)
表示不確認消息狀態,生產者不關心消費者狀態。 若是把這個參數去掉,那麼生產者須要獲得消費者的迴應狀態,好比我啓動了3個消費者和一個生產者,生產者須要獲得消費者的迴應。若是第一個消費者獲得消息,中斷了鏈接,那麼消息會發送到第二個消費者,依次類推,直到消費者給生產者一個狀態,生產者纔會把消息從隊列裏刪除。
這個狀態要手動發送給生產者:
ch.basic_ack(delivery_tag=method.delivery_tag)
生產者接收到狀態就會把消息從隊列裏面刪除。
從上面咱們知道,消費者在發生當機或者其它狀況,只要沒有把狀態返回給生產者,那麼這個消息一直都在。若是生產者當機了怎麼辦?消息還存不存在了呢?
若是生產者當機,那麼以前存的消息都會丟失。爲了不這種狀況,那麼就要要求數據持久化。
durable=True
在聲明隊列的時候 同時聲明隊列持久化。
channel.queue_declare(queue='hello',durable=True)
這裏是隊列持久化了,可是消息尚未了。
在生產者發送消息的一端加上以上參數。
properties=pika.BasicProperties( delivery_mode=2, )
查看hello2的消息。