Python之RabbitMQ的使用

今天總結一下Python關於Rabbitmq的使用html

  RabbitMQ官網說明,其實也是一種隊列,那和前面說的線程queue和進程queue有什麼區別呢?python

    線程queue只能在同一個進程下進行數據交互shell

    進程queue只能在父進程和子進程之間,或者同一父進程下的子進程之間作數據交互windows

    若是須要對不一樣進程(eg:微信和qq)兩個獨立的程序間通訊瀏覽器

方法1就是直接把數據寫在硬盤(disk)上而後各自的進程讀取數據就能夠,可是因爲硬盤的讀寫速度太慢,效率過低安全

方法2本身寫個socket,直接作數據交互,問題是若是改變程序,或者再加一個程序,須要對寫好的socket進行修改,還要處理黏包什麼的複雜的鏈接關係,維護成本過高。微信

方法3,利用已有的中間商(代理)。這個broker其實就是封裝好的socket,咱們拿來直接用就行了。socket

   這裏的broker,就有RabbitMQ,ZeroMQ,ActiveMQ等等等等。ide

一.安裝及環境配置函數

    windows的安裝和配置方法較爲簡單,直接安裝就行了

    Rabbit支持多種語言: Java, .NET, PHP, Python, JavaScript, Ruby, Go這些經常使用語言都支持


如圖所示,python操做RabbitMQ須要的模塊有上述幾種選擇,咱們用最簡單的pika,用pip直接安裝

pip install pika

二.RabbitMQ的使用

這裏全部的用法都是基於RabbitMQ是工做在‘localhost’上,而且端口號爲15672,能在瀏覽器裏訪問http://localhost:15672這個地址。

1.消息分發(基礎版)

這就是RabbitMQ最簡單的工做模式,p爲生產者(Producer),生產者發送message給queue,queue再把消息發送至消費者c(Customer)

先看看生產者至隊列(send)這個過程

import pika connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel()

咱們先創建了一個連接,而後就須要定義一個隊列,隊列的名字就暫時定位‘hello'

channel.queue_declare(queue='hello')

在RabbitMQ裏消息並不能直接發送給隊列,全部的信息發送都要經過一個exchange,可是這裏咱們先把這個exchange定義成一個空的字符串,後面在將他的具體用法

channel.basic_publish(exchange='', routing_key='hello', body='123')

在發送確認完成後,能夠將鏈接關閉

connect.close()

這就是send端的代碼

import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='123')
print("[x] Sent 'hello world!'")
connect.close()
RabbitMQ_basic_producer

 運行了send代碼後咱們能夠在terminal裏RabbitMQ安裝目錄下sbin文件夾裏查看一下消息隊列

rabbitmqctl.bat list_queues 

若是是Linux命令爲

sudo rabbitmqctl list_queues

這裏就說明了隊列信息和消息狀態。

 

而後再看一下消費者這一端的代碼是什麼樣的,一樣,先要創建鏈接並定義好隊列名

connect = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connect.channel() channel.queue_declare(queue='hello')

這裏可能有個疑問:咱們不是在生產者裏已經定義了隊列名嗎?爲何在消費者裏還要定義呢?

由於在實際工做中,咱們並不能肯定是生產者仍是消費者先一步運行,若是隊列名沒有定義的話運行時候是會報錯的。下面就是對消息的處理

def callback(ch,method,properties,body): print("[x] Received %r"%body) channel.basic_consume(callback, queue='hello', no_ack=True)

當消息來臨時,消費者會執行回調函數callback。這裏的callback就是直接打印消息內容(body)。

回調函數另外的幾個參數:chconne.channel的內存對象地址,

<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=('::1', 62145, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>>

method是包含了發送的信息

<Basic.Deliver(['consumer_tag=ctag1.9ae48c906b014a83a512413c0e6f9ef8', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>

 properties咱們之後再瞭解。

2.公平分發(workqueue)

  在這種結構裏,咱們要考慮到這樣一種狀況:有多個消費者,消費者在獲得消息時須要對消息進行處理,而且有可能處理消息所消耗的時間是不一樣的

。這裏咱們用的queue叫作workqueue

  爲了模擬消費者對消息進行處理的過程,咱們用time.sleep()作一個消耗時間的過程。消息的產生和接收是這樣的

message = ' '.join(sys.argv[1:]) or "Hello World!"
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")

這裏插播一條''.join(sys.argv[1:])的做用:就是把在shell裏輸入的指令後跟的代碼加在message裏。消費者獲得消息後,數消息裏有幾個「.」,sleep相應的秒數。

 放出初版的代碼

import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:]) or "Hello World."
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print('send %s'%message)
RabbitMQ生產者
import time,pika

connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello')

print(' [*] Waiting for messages. To exit press CTRL+C')



def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')
channel.start_consuming()
RabbbitMQ消費者

 

 這時候,咱們能夠多啓動幾個消費者,再用生產者發送消息,看看效果!

 

能夠發現,消息是被公平的依次被分發給各個消費者的(Fair dispatch),這種分發的方式叫輪詢。

消息確認message acknowledgments  

如今考慮下這種情形:消費者在處理消息時須要較長的時間,在這時把這個消費者kill掉,正在處理的消息和已經接收但未被處理的消息就丟失了。這應該是不容許的,咱們可不但願有數據丟失,就須要將這些任務從新發送給其餘正常工做的消費者。

爲了保證任務不丟失,RabbitMQ支持使用message acknowledgments,消費者在完成任務後會給RabbitMQ發送個消息,告訴他活已經幹完了,RabbitMQ就會把這個任務給釋放掉。而當出現消費者宕機、掉線等狀況時,RabbitMQ會從新把這個任務發送給其餘的消費者。

往回看看上文說到的no_ack,這個值默認的是False,RabbitMQ是不主動銷燬消息的因此咱們一看看在這裏把值置爲True。

channel.basic_consume(callback, queue='hello', no_ack=True)

這樣只要消費者接收到消息,RabbitMQ就直接銷燬掉這個消息,就成了手動確認。咱們要想實現剛纔說的消息不丟失,就要這樣定義

def callback(ch, method, properties, body): print(" [x] Received %r" % body) # time.sleep(body.count(b'.'))
    time.sleep(10) #修改了一下,在延時的10s把消費者斷掉 print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')#這裏的no_ack默認爲False

這樣,當一個消費者宕機了,RabbitMQ就會直接把任務拍個下一個消費者。

 消息持久化

剛纔經過了消息確認,咱們保證了消費者在掉線的時候任務不丟失,但是還有一個問題,若是RabbitMQ若是斷掉(或者服務重啓)了,裏面的任務(包括全部queue和exchange依舊會丟失)這時候咱們能夠用到——消息持久化Message durability

channel.queue_declare(queue='hello',durable=True)#將隊列持久化(只保存了隊列)
channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties(delivery_mode=2))#保持消息持久化

必須同時將隊列和消息持久化,能夠保證RabbitMQ服務在重啓後任務還存在。

注意幾點:

1.若是隻持久化了消息,服務重啓後消息丟失

2.若是隻持久化了隊列,服務重啓後隊列還在,但消息丟失

3.在持久化隊列的時候要保持生產者和消費者的一致性

最後一點,由於有可能每一個消費者處理信息的能力不同,若是按公平分發的化有可能致使負載不平衡,旱的旱死、澇的澇死。爲避免這種狀況發生還有一個知識點

channel.basic_qos(prefetch_count=1)

用這個語句限制了消費者待處理信息的個數

workQueue的終極代碼

import pika,sys
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#隊列持久化
message = ' '.join(sys.argv[1:]) or "Hello World2."
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message,
                      properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print('send %s'%message)
RabbitMQ_workqueue_procucer
import time,pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.queue_declare(queue='hello',durable=True)#隊列持久化
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)  #消息回執
channel.basic_qos(prefetch_count=1)  #限制消費者待處理任務個數
channel.basic_consume(callback,
                      queue='hello'
                      )
channel.start_consuming()
RabbitMQ_workqueue_customer

 3.發佈/訂閱(publish/subscribe)

咱們在前面兩部分將的都是將消息由生產者到消費者之間經過queue傳遞,如今將引入一個新的成員:exchange。

其實生產者在發送的時候是不知道消息要發送給那個queue的,甚至他都不知道消息是由queue接收的。實際上生產者只是把message發送給了exchange。至於message後續的處理都是由exchange決定的。

就像圖上標示的,exchange在sender和queue之間起到了轉呈的做用。

按照工做方式,咱們將exchange分紅了fanoutdirecttopicheaders四種類型。

fanout:全部綁定到這個exchange的隊列都接收消息

direct:經過routingKey和exchange決定的那個惟一的queue能夠接收消息

topic:全部符合routingKey(能夠是表達式)的queue能夠接收消息

  表達式說明:#表示一個或多個字符

                            *表示任何字符

        使用RoutingKey爲#時至關於fanout

headers:經過headers來決定把消息發送給哪些queue。

在這個part咱們來看fanout的做用。

channel.exchange_declare(exchange='logs', exchange_type='fanout') 

咱們定義一個exchange,名字隨便起一個‘logs’,類型就聲明爲fanout。

(在前面兩節咱們尚未引入exchange這個概念,就用了默認的exchange設置

channel.basic_publish(exchange='', routing_key='hello', body='123')

,exchange=''空的字符串表示了默認的exchange或名字是空的,那exchange就把消息發送給routing_key指定的queue裏(前提是這個queue是存在的),在聲明瞭exchange之後,咱們就能夠用這個exchange發送消息了

channel.basic_publish(exchange='logs',                #使用的exchange名稱
                      routing_key='',                 #使用的隊列名稱
                      body='123')                     #消息內容

注意到了一點沒有?這裏並無定義隊列的名稱?爲何?在廣播的時候是不用固定具體的哪一個queue的,咱們

result = channel.queue_declare() #生成隨機queue

咱們在消費端聲明queue的時候能夠生成一個隨機的queue,這裏還要加個命令

result = channel.queue_declare(exclusive=True) 

這個exclusive表示在鏈接在關閉之後這個queue直接被銷燬掉。

而後把這個queue綁定在轉發器上。全部進入這個exchange的消息被髮送給全部和他綁定的隊列裏。

隨機的queue已經聲明瞭,如今就把他跟exchange綁定

channel.queue_bind(exchange='logs', queue=result.method.queue)#注意queue名的獲取方法

這就是最終的代碼:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')   #logs 是隨便起的名字,聲明瞭exchange
message = ' '.join(sys.argv[1:]) or 'info: Hello World!'
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
fanout_publish
import pika
connect = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connect.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
result = channel.queue_declare(exclusive=True)   #exclusive 惟一的,爲True時不指定queue名的化隨機生成一個queue,
                                                    # 在斷開鏈接後把queue刪除,至關於生成一個隨機queue
channel.queue_bind(exchange='logs',
                   queue=result.method.queue) #綁定的是exchange對應的queue
print('waiting for logs.')
def callback(ch,method,preproteries,body):
    print('get data:%r'%body)
channel.basic_consume(callback,
                      queue=result.method.queue,
                      no_ack=True)
channel.start_consuming()
fanout_customer

 整體看一下,發送端的代碼跟前面的差不太多,最重要的差異就是把routingKey給忽略掉了,可是明確了exchange的對象。

而接收方是在創建鏈接後要聲明exchange,而且要和隊列綁定。若是沒有隊列和exchange綁定,消息就被銷燬了。這就是整個發送的過程

還有一點,這個訂閱——發佈的模型就像電臺和收音機同樣,若是customer下線了是收不到信息的,消息也是在線發送的,並不會保存。

4.routing(exchange type:direct)

在這個過程當中,咱們大體瞭解了發佈——訂閱模型。其實就是在發送端定義了一個exchange,在接收端定義了一個隊列,而後把這二者綁定,就OK了。但是咱們如今只想訂閱一部分有用的信息,好比只獲取錯誤信息寫到日誌文件裏,但同時又能將全部的信息都顯示在控制檯(或者terminal上)。

上一節所講述的bind,也能夠簡單的理解爲這個queue對這個exchange的內容「感興趣」。

在binding的時候,還能夠加一個routingKey的參數,這個參數就描述了queue對哪些exchange感興趣。

channel.queue_bind(exchange='logs',         #被綁定的exchange名
                   queue='queue_name',      #被綁定的queue名
                   routing_key='black')     #queue的‘興趣愛好’

對queue和exchange進行bind時,bind的參數主要取決於exchange的類型,好比在fanout模式下是不能有這個routingKey的,運行時候會報錯。

咱們使用了fanout的發佈訂閱模式,在這個模式下接收端不能對信息進行必定原則的過濾,一股腦的照單全收,已經不能知足咱們的要求了,如今就要用direct模式。

 

在上面的圖裏,有兩個queue分別和exchange綁定,Q1的routingKey是orange,Q2則有兩個分別是black和green。在這個模型中,發佈的消息關鍵字是orange則被分發到Q1內,而包含有black或green的則發給Q2.剩餘的消息就被discard了。

而在上圖中,一樣的key同多個隊列進行綁定的方法也是合法的。全部包含關鍵字black的消息會被同時發送 給Q1和Q2。

瞭解了上面所說的方法,咱們來按照本節一開始的目標來修改下代碼

首先要聲明exchange

channel.exchange_declare(exchange='logs', exchange_type='direct')#聲明exchange,注意接收端的代碼是同樣的

在發送的時候對消息進行分類

serverity = sys.argv[1] if len(sys.argv)>1 else 'info'

而後發送消息

channel.basic_publish(exchange='logs', routing_key=serverity,  #消息的分類
                      body=message)

在接收端,咱們用一個循環把全部的routingKey和queue綁定(有可能出現多個關鍵字和一個queue同時綁定的狀況)

servrities = sys.argv[1:]    #獲取全部的關鍵字
if not servrities: sys.stderr.write('Usage: %s [info] [warning] [error]\n'%sys.argv[0]) sys.exit(1)              #關鍵字不存在打印提示後退出
print('recived:%s'%servrities) for servrity in servrities:  #循環綁定
    channel.queue_bind(exchange='logs', queue=queue_name, routing_key=servrity)

整個方案就是這樣的

咱們啓動兩個terminal,按這樣的方式啓動

python direct_consumer.py info error warning
python direct_consumer.py error

 在分別發送

python direct_publisher.py info 123 python direct_publisher.py error 456 python direct_publisher.py warning 789

看看是什麼效果

是否是達到了訂閱的效果!

4.更加細緻的消息過濾(topic模式)

在上一節咱們利用了direct的模式實現了初步的消息過濾,在這一節裏,咱們要看看如何實現如何實現更加細緻的消息過濾,好比我在獲取info的前提下還要知道哪些message是RabbitMQ發來的,哪些是Redis發來的,那怎麼區分呢?

就想這個圖裏的同樣,咱們在定義RoutingKey的時候利用了表達式,就像模糊查詢同樣其中

*表示任意一個字符

#表示0個或多個字符

topic模式的代碼和上一節的基本一致,只是改變了exchange的模式

channel.exchange_declare(exchange='logs', exchange_type='topic')#聲明exchange

 啓動terminal,輸入指令

python topic_customer kern.*                 能夠接收以kern.開頭的全部消息 kern.123 abc  接收到abc
python topic_customer.py *.kern.*            中間包含.kern.的消息 123.kern.345 abc  接收到abc

 同時綁定多個關鍵字

接收端
d:\python\week11>python topic_customer.py kern.* pip.* ['kern.*', 'pip.*'] [*] Waiting for logs. To exit press CTRL+C [x] 'pip.11':b'duziele' [x] 'kern.11':b'duziele'
發送端 d:\python\week11>python topic_publisher.py pip.11 duziele [x] Sent 'pip.11':'duziele' d:\python\week11>python topic_publisher.py kern.11 duziele [x] Sent 'kern.11':'duziele'

 還能夠用#獲取全部消息

d:\python\week11>python topic_customer.py #

 ps:#的做用我一直不大明白,我試過了

d:\python\week11>python topic_customer.py kern.#

效果和kern.*是同樣的。

6.Remote procedure call(RPC)

       咱們在前面的章節將到了在多個消費者之間分發耗時任務的方法,但是如今要實現這樣的功能:調用遠程的設備上的一個函數,而後等執行完畢返回結果。這樣的工做模式就叫遠程過程調用——Remote Procedure Call(RPC)。

  利用RabbitMQ也能夠實現RPC的功能,爲了能模擬這個過程,咱們在server端設立一個fun:給定一個整數n,而後返回n對應的斐波那契數列。

callback queue

  經過RabbitMQ實現RPC的方法很簡單——客戶端發送請求,服務端對請求響應而後把消息發送至叫callback的queue,過程相似這樣

result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)

Correlation id

  咱們剛纔爲每一個請求對應的響應都聲明瞭一個隊列,可是在這裏等待着結果的返回效率是否是過低了?還好有另外的一種方法:爲每一個客戶端建立一個callback的隊列。然而又引起了一個新問題:在這個隊列裏我不知道哪一個響應是對應我這個請求的!這時候就到大神出馬了——Correlation ID。對每一個請求都設置一個惟一的ID,在callback的隊列裏經過查看屬性來判斷他對應哪一個請求。若是出現沒有對應的ID,安全起見咱們仍是把他忽略掉。

 

總之咱們的RPC的工做流程就是這樣的:

1.client啓動,聲明一個匿名的callback queue

2.創建RPC請求,請求裏除了消息還包含兩個參數:a.replay_to(告訴server響應的結論callback的隊列裏)

                       b.correlation_id:每一個請求都被賦予一個獨一無二的值

3.請求被髮送給RPC_queue

4.server等待queue裏的消息,一旦出現請求,server響應請求並把結論發送給經過replay_to要求的queue裏

5.client在callback_queue裏等待數據,一旦消息出現,他將correlation進行比對,若是相同就獲取請求結果。

import pika

connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:return 1
    else:return fib(n-1)+fib(n-2)


def on_request(ch,method,props,body):
    n = int(body)
    print('[.]fib(%s)'%n)
    response = fib(n)
    print(response)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),
                     body = str(response))
    print('send over')
    ch.basic_ack(delivery_tag=method.delivery_tag)   #確認client接收到消息

channel.basic_qos(prefetch_count=1)   #限制消息處理個數
channel.basic_consume(on_request,queue='rpc_queue')

print('[x]Awaitiong RPC requests')
channel.start_consuming()
RPC_server
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue     #聲明請求響應回執的queue

        self.channel.basic_consume(consumer_callback=self.on_response,
                                   queue=self.callback_queue,no_ack=True) #監聽回執queue

    def on_response(self,ch,method,props,body):  #callback_queue的回調函數
        print(body)
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties   #發送回執消息的參數
                                   (reply_to=self.callback_queue,
                                    correlation_id=self.corr_id),
                                   body=str(n)
                                   )
        while self.response is None:
            self.connection.process_data_events()   #事件驅動,非阻塞版的start_consuming
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()
n = input('>>>')
print('[x] Requesting fib(%s)'%n)
response = fibonacci_rpc.call(n)
print(response)
RPC_client

 以上就是RabbitMQ的常規用法。

相關文章
相關標籤/搜索