今天總結一下Python關於Rabbitmq的使用html
RabbitMQ官網說明,其實也是一種隊列,那和前面說的線程queue和進程queue有什麼區別呢?python
線程queue只能在同一個進程下進行數據交互shell
進程queue只能在父進程和子進程之間,或者同一父進程下的子進程之間作數據交互windows
若是須要對不一樣進程(eg:微信和qq)兩個獨立的程序間通訊瀏覽器
方法1就是直接把數據寫在硬盤(disk)上而後各自的進程讀取數據就能夠,可是因爲硬盤的讀寫速度太慢,效率過低安全
方法2本身寫個socket,直接作數據交互,問題是若是改變程序,或者再加一個程序,須要對寫好的socket進行修改,還要處理黏包什麼的複雜的鏈接關係,維護成本過高。微信
方法3,利用已有的中間商(代理)。這個broker其實就是封裝好的socket,咱們拿來直接用就行了。socket
這裏的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。ide
一.安裝及環境配置函數
windows的安裝和配置方法較爲簡單,直接安裝就行了
Rabbit支持多種語言: Java, .NET, PHP, Python, JavaScript, Ruby, Go這些經常使用語言都支持
如圖所示,python操做RabbitMQ須要的模塊有上述幾種選擇,咱們用最簡單的pika,用pip直接安裝
pip install pika
二.RabbitMQ的使用
這裏全部的用法都是基於RabbitMQ是工做在‘localhost’上,而且端口號爲15672,能在瀏覽器裏訪問http://localhost:15672這個地址。
1.消息分發(基礎版)
這就是RabbitMQ最簡單的工做模式,p爲生產者(Producer),生產者發送message給queue,queue再把消息發送至消費者c(Customer)
先看看生產者至隊列(send)這個過程
import pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel()
咱們先創建了一個連接,而後就須要定義一個隊列,隊列的名字就暫時定位‘hello'
channel.queue_declare(queue='hello')
在RabbitMQ裏消息並不能直接發送給隊列,全部的信息發送都要經過一個exchange,可是這裏咱們先把這個exchange定義成一個空的字符串,後面在將他的具體用法
channel.basic_publish(exchange='', routing_key='hello', body='123')
在發送確認完成後,能夠將鏈接關閉
connect.close()
這就是send端的代碼
import pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='123') print("[x] Sent 'hello world!'") connect.close()
運行了send代碼後咱們能夠在terminal裏RabbitMQ安裝目錄下sbin文件夾裏查看一下消息隊列
rabbitmqctl.bat list_queues
若是是Linux命令爲
sudo rabbitmqctl list_queues
這裏就說明了隊列信息和消息狀態。
而後再看一下消費者這一端的代碼是什麼樣的,一樣,先要創建鏈接並定義好隊列名
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello')
這裏可能有個疑問:咱們不是在生產者裏已經定義了隊列名嗎?爲何在消費者裏還要定義呢?
由於在實際工做中,咱們並不能肯定是生產者仍是消費者先一步運行,若是隊列名沒有定義的話運行時候是會報錯的。下面就是對消息的處理
def callback(ch,method,properties,body): print("[x] Received %r"%body) channel.basic_consume(callback, queue='hello', no_ack=True)
當消息來臨時,消費者會執行回調函數callback。這裏的callback就是直接打印消息內容(body)。
回調函數另外的幾個參數:ch是conne.channel的內存對象地址,
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>
method是包含了發送的信息
<Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>
properties咱們之後再瞭解。
2.公平分發(workqueue)
在這種結構裏,咱們要考慮到這樣一種狀況:有多個消費者,消費者在獲得消息時須要對消息進行處理,而且有可能處理消息所消耗的時間是不一樣的
。這裏咱們用的queue叫作workqueue。
爲了模擬消費者對消息進行處理的過程,咱們用time.sleep()作一個消耗時間的過程。消息的產生和接收是這樣的
message = ' '.join(sys.argv[1:]) or "Hello World!"
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")
這裏插播一條''.join(sys.argv[1:])的做用:就是把在shell裏輸入的指令後跟的代碼加在message裏。消費者獲得消息後,數消息裏有幾個「.」,sleep相應的秒數。
放出初版的代碼
import pika,sys connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello') message = ' '.join(sys.argv[1:]) or "Hello World." channel.basic_publish(exchange='', routing_key='hello', body=message) print('send %s'%message)
import time,pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello') print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello') channel.start_consuming()
這時候,咱們能夠多啓動幾個消費者,再用生產者發送消息,看看效果!
能夠發現,消息是被公平的依次被分發給各個消費者的(Fair dispatch),這種分發的方式叫輪詢。
消息確認message acknowledgments
如今考慮下這種情形:消費者在處理消息時須要較長的時間,在這時把這個消費者kill掉,正在處理的消息和已經接收但未被處理的消息就丟失了。這應該是不容許的,咱們可不但願有數據丟失,就須要將這些任務從新發送給其餘正常工做的消費者。
爲了保證任務不丟失,RabbitMQ支持使用message acknowledgments,消費者在完成任務後會給RabbitMQ發送個消息,告訴他活已經幹完了,RabbitMQ就會把這個任務給釋放掉。而當出現消費者宕機、掉線等狀況時,RabbitMQ會從新把這個任務發送給其餘的消費者。
往回看看上文說到的no_ack,這個值默認的是False,RabbitMQ是不主動銷燬消息的因此咱們一看看在這裏把值置爲True。
channel.basic_consume(callback, queue='hello', no_ack=True)
這樣只要消費者接收到消息,RabbitMQ就直接銷燬掉這個消息,就成了手動確認。咱們要想實現剛纔說的消息不丟失,就要這樣定義
def callback(ch, method, properties, body): print(" [x] Received %r" % body) # time.sleep(body.count(b'.'))
time.sleep(10) #修改了一下,在延時的10s把消費者斷掉 print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')#這裏的no_ack默認爲False
這樣,當一個消費者宕機了,RabbitMQ就會直接把任務拍個下一個消費者。
消息持久化
剛纔經過了消息確認,咱們保證了消費者在掉線的時候任務不丟失,但是還有一個問題,若是RabbitMQ若是斷掉(或者服務重啓)了,裏面的任務(包括全部queue和exchange依舊會丟失)這時候咱們能夠用到——消息持久化Message durability
channel.queue_declare(queue='hello',durable=True)#將隊列持久化(只保存了隊列)
channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化
必須同時將隊列和消息持久化,能夠保證RabbitMQ服務在重啓後任務還存在。
注意幾點:
1.若是隻持久化了消息,服務重啓後消息丟失
2.若是隻持久化了隊列,服務重啓後隊列還在,但消息丟失
3.在持久化隊列的時候要保持生產者和消費者的一致性
最後一點,由於有可能每一個消費者處理信息的能力不同,若是按公平分發的化有可能致使負載不平衡,旱的旱死、澇的澇死。爲避免這種狀況發生還有一個知識點
channel.basic_qos(prefetch_count=1)
用這個語句限制了消費者待處理信息的個數
workQueue的終極代碼
import pika,sys connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello',durable=True)#隊列持久化 message = ' '.join(sys.argv[1:]) or "Hello World2." channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties(delivery_mode=2)) #消息持久化 print('send %s'%message)
import time,pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello',durable=True)#隊列持久化 print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) #消息回執 channel.basic_qos(prefetch_count=1) #限制消費者待處理任務個數 channel.basic_consume(callback, queue='hello' ) channel.start_consuming()
3.發佈/訂閱(publish/subscribe)
咱們在前面兩部分將的都是將消息由生產者到消費者之間經過queue傳遞,如今將引入一個新的成員:exchange。
其實生產者在發送的時候是不知道消息要發送給那個queue的,甚至他都不知道消息是由queue接收的。實際上生產者只是把message發送給了exchange。至於message後續的處理都是由exchange決定的。
就像圖上標示的,exchange在sender和queue之間起到了轉呈的做用。
按照工做方式,咱們將exchange分紅了fanout、direct、topic和headers四種類型。
fanout:全部綁定到這個exchange的隊列都接收消息
direct:經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(能夠是表達式)的queue能夠接收消息
表達式說明:#表示一個或多個字符
*表示任何字符
使用RoutingKey爲#時至關於fanout
headers:經過headers來決定把消息發送給哪些queue。
在這個part咱們來看fanout的做用。
channel.exchange_declare(exchange='logs', exchange_type='fanout')
咱們定義一個exchange,名字隨便起一個‘logs’,類型就聲明爲fanout。
(在前面兩節咱們尚未引入exchange這個概念,就用了默認的exchange設置
channel.basic_publish(exchange='', routing_key='hello', body='123')
,exchange=''空的字符串表示了默認的exchange或名字是空的,那exchange就把消息發送給routing_key指定的queue裏(前提是這個queue是存在的),在聲明瞭exchange之後,咱們就能夠用這個exchange發送消息了
channel.basic_publish(exchange='logs', #使用的exchange名稱
routing_key='', #使用的隊列名稱
body='123') #消息內容
注意到了一點沒有?這裏並無定義隊列的名稱?爲何?在廣播的時候是不用固定具體的哪一個queue的,咱們
result = channel.queue_declare() #生成隨機queue
咱們在消費端聲明queue的時候能夠生成一個隨機的queue,這裏還要加個命令
result = channel.queue_declare(exclusive=True)
這個exclusive表示在鏈接在關閉之後這個queue直接被銷燬掉。
而後把這個queue綁定在轉發器上。全部進入這個exchange的消息被髮送給全部和他綁定的隊列裏。
隨機的queue已經聲明瞭,如今就把他跟exchange綁定
channel.queue_bind(exchange='logs', queue=result.method.queue)#注意queue名的獲取方法
這就是最終的代碼:
import pika import sys connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connect.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') #logs 是隨便起的名字,聲明瞭exchange message = ' '.join(sys.argv[1:]) or 'info: Hello World!' channel.basic_publish(exchange='logs', routing_key='', body=message)
import pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') result = channel.queue_declare(exclusive=True) #exclusive 惟一的,爲True時不指定queue名的化隨機生成一個queue, # 在斷開鏈接後把queue刪除,至關於生成一個隨機queue channel.queue_bind(exchange='logs', queue=result.method.queue) #綁定的是exchange對應的queue print('waiting for logs.') def callback(ch,method,preproteries,body): print('get data:%r'%body) channel.basic_consume(callback, queue=result.method.queue, no_ack=True) channel.start_consuming()
整體看一下,發送端的代碼跟前面的差不太多,最重要的差異就是把routingKey給忽略掉了,可是明確了exchange的對象。
而接收方是在創建鏈接後要聲明exchange,而且要和隊列綁定。若是沒有隊列和exchange綁定,消息就被銷燬了。這就是整個發送的過程
還有一點,這個訂閱——發佈的模型就像電臺和收音機同樣,若是customer下線了是收不到信息的,消息也是在線發送的,並不會保存。
4.routing(exchange type:direct)
在這個過程當中,咱們大體瞭解了發佈——訂閱模型。其實就是在發送端定義了一個exchange,在接收端定義了一個隊列,而後把這二者綁定,就OK了。但是咱們如今只想訂閱一部分有用的信息,好比只獲取錯誤信息寫到日誌文件裏,但同時又能將全部的信息都顯示在控制檯(或者terminal上)。
上一節所講述的bind,也能夠簡單的理解爲這個queue對這個exchange的內容「感興趣」。
在binding的時候,還能夠加一個routingKey的參數,這個參數就描述了queue對哪些exchange感興趣。
channel.queue_bind(exchange='logs', #被綁定的exchange名
queue='queue_name', #被綁定的queue名
routing_key='black') #queue的‘興趣愛好’
對queue和exchange進行bind時,bind的參數主要取決於exchange的類型,好比在fanout模式下是不能有這個routingKey的,運行時候會報錯。
咱們使用了fanout的發佈訂閱模式,在這個模式下接收端不能對信息進行必定原則的過濾,一股腦的照單全收,已經不能知足咱們的要求了,如今就要用direct模式。
在上面的圖裏,有兩個queue分別和exchange綁定,Q1的routingKey是orange,Q2則有兩個分別是black和green。在這個模型中,發佈的消息關鍵字是orange則被分發到Q1內,而包含有black或green的則發給Q2.剩餘的消息就被discard了。
而在上圖中,一樣的key同多個隊列進行綁定的方法也是合法的。全部包含關鍵字black的消息會被同時發送 給Q1和Q2。
瞭解了上面所說的方法,咱們來按照本節一開始的目標來修改下代碼
首先要聲明exchange
channel.exchange_declare(exchange='logs', exchange_type='direct')#聲明exchange,注意接收端的代碼是同樣的
在發送的時候對消息進行分類
serverity = sys.argv[1] if len(sys.argv)>1 else 'info'
而後發送消息
channel.basic_publish(exchange='logs', routing_key=serverity, #消息的分類
body=message)
在接收端,咱們用一個循環把全部的routingKey和queue綁定(有可能出現多個關鍵字和一個queue同時綁定的狀況)
servrities = sys.argv[1:] #獲取全部的關鍵字
if not servrities: sys.stderr.write('Usage: %s [info] [warning] [error]\n'%sys.argv[0]) sys.exit(1) #關鍵字不存在打印提示後退出
print('recived:%s'%servrities) for servrity in servrities: #循環綁定
channel.queue_bind(exchange='logs', queue=queue_name, routing_key=servrity)
整個方案就是這樣的
咱們啓動兩個terminal,按這樣的方式啓動
python direct_consumer.py info error warning
python direct_consumer.py error
在分別發送
python direct_publisher.py info 123 python direct_publisher.py error 456 python direct_publisher.py warning 789
看看是什麼效果
是否是達到了訂閱的效果!
4.更加細緻的消息過濾(topic模式)
在上一節咱們利用了direct的模式實現了初步的消息過濾,在這一節裏,咱們要看看如何實現如何實現更加細緻的消息過濾,好比我在獲取info的前提下還要知道哪些message是RabbitMQ發來的,哪些是Redis發來的,那怎麼區分呢?
就想這個圖裏的同樣,咱們在定義RoutingKey的時候利用了表達式,就像模糊查詢同樣其中
*表示任意一個字符
#表示0個或多個字符
topic模式的代碼和上一節的基本一致,只是改變了exchange的模式
channel.exchange_declare(exchange='logs', exchange_type='topic')#聲明exchange
啓動terminal,輸入指令
python topic_customer kern.* 能夠接收以kern.開頭的全部消息 kern.123 abc 接收到abc
python topic_customer.py *.kern.* 中間包含.kern.的消息 123.kern.345 abc 接收到abc
同時綁定多個關鍵字
接收端
d:\python\week11>python topic_customer.py kern.* pip.* ['kern.*', 'pip.*'] [*] Waiting for logs. To exit press CTRL+C [x] 'pip.11':b'duziele' [x] 'kern.11':b'duziele'
發送端 d:\python\week11>python topic_publisher.py pip.11 duziele [x] Sent 'pip.11':'duziele' d:\python\week11>python topic_publisher.py kern.11 duziele [x] Sent 'kern.11':'duziele'
還能夠用#獲取全部消息
d:\python\week11>python topic_customer.py #
ps:#的做用我一直不大明白,我試過了
d:\python\week11>python topic_customer.py kern.#
效果和kern.*是同樣的。
6.Remote procedure call(RPC)
咱們在前面的章節將到了在多個消費者之間分發耗時任務的方法,但是如今要實現這樣的功能:調用遠程的設備上的一個函數,而後等執行完畢返回結果。這樣的工做模式就叫遠程過程調用——Remote Procedure Call(RPC)。
利用RabbitMQ也能夠實現RPC的功能,爲了能模擬這個過程,咱們在server端設立一個fun:給定一個整數n,而後返回n對應的斐波那契數列。
callback queue
經過RabbitMQ實現RPC的方法很簡單——客戶端發送請求,服務端對請求響應而後把消息發送至叫callback的queue,過程相似這樣
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
咱們剛纔爲每一個請求對應的響應都聲明瞭一個隊列,可是在這裏等待着結果的返回效率是否是過低了?還好有另外的一種方法:爲每一個客戶端建立一個callback的隊列。然而又引起了一個新問題:在這個隊列裏我不知道哪一個響應是對應我這個請求的!這時候就到大神出馬了——Correlation ID。對每一個請求都設置一個惟一的ID,在callback的隊列裏經過查看屬性來判斷他對應哪一個請求。若是出現沒有對應的ID,安全起見咱們仍是把他忽略掉。
總之咱們的RPC的工做流程就是這樣的:
1.client啓動,聲明一個匿名的callback queue
2.創建RPC請求,請求裏除了消息還包含兩個參數:a.replay_to(告訴server響應的結論callback的隊列裏)
b.correlation_id:每一個請求都被賦予一個獨一無二的值
3.請求被髮送給RPC_queue
4.server等待queue裏的消息,一旦出現請求,server響應請求並把結論發送給經過replay_to要求的queue裏
5.client在callback_queue裏等待數據,一旦消息出現,他將correlation進行比對,若是相同就獲取請求結果。
import pika connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connect.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1:return 1 else:return fib(n-1)+fib(n-2) def on_request(ch,method,props,body): n = int(body) print('[.]fib(%s)'%n) response = fib(n) print(response) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body = str(response)) print('send over') ch.basic_ack(delivery_tag=method.delivery_tag) #確認client接收到消息 channel.basic_qos(prefetch_count=1) #限制消息處理個數 channel.basic_consume(on_request,queue='rpc_queue') print('[x]Awaitiong RPC requests') channel.start_consuming()
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue #聲明請求響應回執的queue self.channel.basic_consume(consumer_callback=self.on_response, queue=self.callback_queue,no_ack=True) #監聽回執queue def on_response(self,ch,method,props,body): #callback_queue的回調函數 print(body) if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties #發送回執消息的參數 (reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n) ) while self.response is None: self.connection.process_data_events() #事件驅動,非阻塞版的start_consuming return int(self.response) fibonacci_rpc = FibonacciRpcClient() n = input('>>>') print('[x] Requesting fib(%s)'%n) response = fibonacci_rpc.call(n) print(response)
以上就是RabbitMQ的常規用法。