簡介
RabbitMQ:接受消息再傳遞消息,能夠視爲一個「郵局」。發送者和接受者經過隊列來進行交互,隊列的大小能夠視爲無限的,多個發送者能夠發生給一個隊列,多個接收者也能夠從一個隊列中接受消息。python
code
rabbitmq使用的協議是amqp,用於python的推薦客戶端是pikaweb
pip install pika -i https://pypi.douban.com/simple/
生產者:send.py服務器
import pika # 創建一個鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 鏈接本地的RabbitMQ服務器 channel = connection.channel() # 得到channel
這裏連接的是本機的,若是想要鏈接其餘機器上的服務器,只要填入地址或主機名便可。函數
接下來咱們開始發送消息了,注意要確保接受消息的隊列是存在的,不然rabbitmq就丟棄掉該消息post
channel.queue_declare(queue='hello') # 在RabbitMQ中建立hello這個隊列 channel.basic_publish(exchange='', # 使用默認的exchange來發送消息到隊列 routing_key='hello', # 發送到該隊列 hello 中 body='Hello World!') # 消息內容 connection.close() # 關閉 同時flush
RabbitMQ默認須要1GB的空閒磁盤空間,不然發送會失敗。fetch
這時已在本地隊列hello中存放了一個消息,若是使用 rabbitmqctl list_queues 可看到ui
hello 1
說明有一個hello隊列 裏面存放了一個消息spa
消費者:receive.pycode
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
仍是先連接到服務器,和以前發送時相同blog
channel.queue_declare(queue='hello') # 此處就是聲明瞭 來確保該隊列 hello 存在 能夠屢次聲明 這裏主要是爲了防止接受程序先運行時出錯 def callback(ch, method, properties, body): # 用於接收到消息後的回調 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', # 收指定隊列hello的消息 no_ack=True) #在處理完消息後不發送ack給服務器 channel.start_consuming() # 啓動消息接受 這會進入一個死循環
工做隊列(任務隊列)
工做隊列是用於分發耗時任務給多個工做進程的。不當即作那些耗費資源的任務(須要等待這些任務完成),而是安排這些任務以後執行。例如咱們把task做爲message發送到隊列裏,啓動工做進程來接受並最終執行,且可啓動多個工做進程來工做。這適用於web應用,即不該在一個http請求的處理窗口內完成複雜任務。
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
分配消息的方式爲 輪詢 即每一個工做進程得到相同的消息數。
消息ack
若是消息分配給某個工做進程,可是該工做進程未處理完成就崩潰了,可能該消息就丟失了,由於rabbitmq一旦把一個消息分發給工做進程,它就把該消息刪掉了。
爲了預防消息丟失,rabbitmq提供了ack,即工做進程在收到消息並處理後,發送ack給rabbitmq,告知rabbitmq這時候能夠把該消息從隊列中刪除了。若是工做進程掛掉 了,rabbitmq沒有收到ack,那麼會把該消息 從新分發給其餘工做進程。不須要設置timeout,即便該任務須要很長時間也能夠處理。
ack默認是開啓的,以前咱們的工做進程顯示指定了no_ack=True
channel.basic_consume(callback, queue='hello') # 會啓用ack
帶ack的callback:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack
消息持久化
可是,有時RabbitMQ重啓了,消息也會丟失。可在建立隊列時設置持久化:
(隊列的性質一旦肯定沒法改變)
channel.queue_declare(queue='task_queue', durable=True)
同時在發送消息時也得設置該消息的持久化屬性:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
可是,若是在RabbitMQ剛接收到消息還沒來得及存儲,消息仍是會丟失。同時,RabbitMQ也不是在接受到每一個消息都進行存盤操做。若是還須要更完善的保證,須要使用publisher confirm。
公平的消息分發
輪詢模式的消息分發可能並不公平,例如奇數的消息都是繁重任務的話,某些進程則會一直運行繁 重任務。即便某工做進程上有積壓的消息未處理,如不少都沒發ack,可是RabbitMQ仍是會按照順序發消息給它。能夠在接受進程中加設置:
channel.basic_qos(prefetch_count=1)
告知RabbitMQ,這樣在一個工做進程沒回發ack狀況下是不會再分配消息給它。
羣發
通常狀況下,一條消息是發送給一個工做進程,而後完成,有時想把一條消息同時發送給多個進程:
exchange
發送者是否是直接發送消息到隊列中的,事實上發生者根本不知道消息會發送到那個隊列,發送者只能把消息發送到exchange裏。exchange一方面收生產者的消息,另外一方面把他們推送到隊列中。因此做爲exchange,它須要知道當收到消息時它須要作什麼,是應該把它加到一個特殊的隊列中仍是放到不少的隊列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而羣發使用的即fanout。以前在發佈消息時,exchange的值爲 '' 即便用default exchange。
channel.exchange_declare(exchange='logs', type='fanout') # 該exchange會把消息發送給全部它知道的隊列中
臨時隊列
result = channel.queue_declare() # 建立一個隨機隊列 result = channel.queue_declare(exclusive=True) # 建立一個隨機隊列,同時在沒有接收者鏈接該隊列後則銷燬它 queue_name = result.method.queue
這樣result.method.queue便是隊列名稱,在發送或接受時便可使用。
綁定exchange 和 隊列
channel.queue_bind(exchange='logs', queue='hello')
logs在發送消息時給hello也發一份。
在發送消息是使用剛剛建立的 logs exchange
channel.basic_publish(exchange='logs', routing_key='', body=message)
路由
以前已經使用過bind,即創建exchange和queue的關係(該隊列對來自該exchange的消息有興趣),bind時可另外指定routing_key選項。
使用direct exchange
將對應routing key的消息發送到綁定相同routing key的隊列中
channel.exchange_declare(exchange='direct_logs', type='direct')
發送函數,發佈不一樣severity的消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
接受函數中綁定對應severity的:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
使用topic exchange
以前使用的direct exchange 只能綁定一個routing key,可使用這種能夠拿.隔開routing key的topic exchange,例如:
"stock.usd.nyse" "nyse.vmw"
和direct exchange同樣,在接受者那邊綁定的key與發送時指定的routing key相同便可,另外有些特殊的值:
* 表明1個單詞 # 表明0個或多個單詞
若是發送者發出的routing key都是3個部分的,如:celerity.colour.species。
Q1: *.orange.* 對應的是中間的colour都爲orange的 Q2: *.*.rabbit 對應的是最後部分的species爲rabbit的 lazy.# 對應的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2均可接收到,quick.orange.fox 只有Q1能接受到,對於lazy.pink.rabbit雖然匹配到了Q2兩次,可是隻會發送一次。若是綁定時直接綁定#,則會收到全部的。
RPC
在遠程機器上運行一個函數而後得到結果。
1、客戶端啓動 同時設置一個臨時隊列用於接受回調,綁定該隊列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客戶端發送rpc請求,同時附帶reply_to對應回調隊列,correlation_id設置爲每一個請求的惟一id(雖說能夠爲每一次RPC請求都建立一個回調隊列,可是這樣效率不高,若是一個客戶端只使用一個隊列,則須要使用correlation_id來匹配是哪一個請求),以後阻塞在回調隊列直到收到回覆
注意:若是收到了非法的correlation_id直接丟棄便可,由於有這種狀況--服務器已經發了響應可是還沒發ack就掛了,等一會服務器重啓了又會從新處理該任務,又發了一遍相應,可是這時那個請求已經被處理掉了
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 發出調用 while self.response is None: # 這邊就至關於阻塞了 self.connection.process_data_events() # 查看回調隊列 return int(self.response)
三、請求會發送到rpc_queue隊列
四、RPC服務器從rpc_queue中取出,執行,發送回覆
channel.basic_consume(on_request, queue='rpc_queue') # 綁定 等待請求 # 處理以後: ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # 發送回覆到回調隊列 ch.basic_ack(delivery_tag = method.delivery_tag) # 發送ack
五、客戶端從回調隊列中取出數據,檢查correlation_id,執行相應操做
if self.corr_id == props.correlation_id: self.response = body