RabbitMQ消息隊列

RabbitMQ消息隊列

rabbitmq官方文檔:http://www.rabbitmq.com/tutorials/tutorial-two-python.htmlhtml

rabbitmq中文文檔:http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.htmlpython

介紹

  RabbitMQ是一款基於AMQP協議的消息中間件,它可以在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。並且使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。並且兩端可使用不一樣的語言編寫,大大提供了靈活性。簡單來講,RabbitMQ就是一個消息代理 - 一個消息系統的媒介。它能夠爲你的應用提供一個通用的消息發送和接收平臺,而且保證消息在傳輸過程當中的安全。git

應用場景

  在項目中,將一些無需即時返回且耗時的操做提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提升了系統的吞吐量。程序員

RabbitMQ結構圖

消息隊列的使用過程大概:

  (1)客戶端鏈接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,並設置相關屬性。
  (3)客戶端聲明一個queue,並設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  (5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作Topic交換機,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不須要key的,叫作Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。

一、RabbitMQ實現一個簡單的通訊

RabbitMQ庫github

  RabbitMQ使用的是AMQP協議。要使用她你就必須須要一個使用一樣協議的庫。幾乎全部的編程語言都有可選擇的庫。python也是同樣,能夠從如下幾個庫中選擇:算法

py-amqplib
txAMQP
pika編程

pip install pika

  大體的設計是這樣的:json

  生產者(producer)把消息發送到一個名爲「hello」的隊列中。消費者(consumer)從這個隊列中獲取消息。windows

# #!/usr/bin/env/ python
# # -*-coding:utf-8 -*-
import pika

#創建一個到RabbitMQ服務器的鏈接。
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

#建立一個通道
channel=connection.channel()

# 聲明一個名爲hello的隊列
channel.queue_declare(queue='hello')

#n RabbitMQ a message can never sent directly to the  queue,it always need to
#go through exchange

channel.basic_publish(exchange='',
                      routing_key='hello',#要發送到的queue名字
                      body='Hello world')
print("[x] Sent 'Hell World'")
#退出程序前,經過關閉鏈接保證消息已經投遞到RabbitMq   
connection.close()
send端
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 聲明一個管道
channel = connection.channel()

#you may ask why declare the queue again - we have already declared in our previous code.We cloud avoid that if we were sure that the queue already exists .For example if send.py program was run before .But we're not yet sure which program to run first(send or receive are possible) .In such cases it's a good practice to repeat declaring the queue in both programs
# channel.queue_declare(queue='hello')
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(ch, method, properties)
    print(" [x] Received %r" %body)

channel.basic_consume(
                      callback,#若是收到消息就調用callback函數處理消息
                      queue='hello',#queue名
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()
receive端

二、Work Queues

  Work Queues(工做隊列)(又稱:任務隊列——Task Queues)是爲了不等待一些佔用大量資源、時間的操做。當咱們把任務(Task)看成消息發送到隊列中,一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當你運行多個工做者(workers),任務就會在它們之間共享。緩存

  這個概念在網絡應用中是很是有用的,它能夠在短暫的HTTP請求中處理一些複雜的任務。

一、Round-robin dispatching(循環調度)

  默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)  

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少,使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。

 

# #!/usr/bin/env/ python
# # -*-coding:utf-8 -*-
import pika
import sys
#創建一個到RabbitMQ服務器的鏈接。
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

#建立一個通道
channel=connection.channel()

# 聲明一個名爲hello的隊列
channel.queue_declare(queue='hello')

#n RabbitMQ a message can never sent directly to the  queue,it always need to
#go through exchange

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % (message,))

connection.close()
task
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 聲明一個管道
channel = connection.channel()
#聲明一個隊列
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print (" [x] Received %r" % (body,))
    time.sleep( body.count('.'.encode('utf-8')) )
    print(" [x] Done")

channel.basic_consume(
                      callback,#若是收到消息就調用callback函數處理消息
                      queue='hello',#queue名
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()
worker

 二、Message acknowledgment(消息確認)

  當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。

  咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。

  爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。

  若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。

  消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。

  消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True(True表示不管這條消息發完與否都不會給服務端發消息)標識把它關閉。是時候移除這個標識了。當工做者(worker)完成了任務,就發送一個響應。

# #!/usr/bin/env/ python
# # -*-coding:utf-8 -*-
import pika
import sys
#創建一個到RabbitMQ服務器的鏈接。
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

#建立一個通道
channel=connection.channel()

# 聲明一個名爲hello的隊列
channel.queue_declare(queue='hello')

#n RabbitMQ a message can never sent directly to the  queue,it always need to
#go through exchange

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % (message,))

connection.close()
task
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 聲明一個管道
channel = connection.channel()
#聲明一個隊列
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print (" [x] Received %r" % (body,))
    time.sleep( body.count('.'.encode('utf-8')) )
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)#當工做者(worker)完成了任務,就發送一個響應。

channel.basic_consume(
                      callback,#若是收到消息就調用callback函數處理消息
                      queue='hello',#queue名
                    )
print(' [*] Waiting for messages. To exit press CTRL+C')
workers

忘記確認

  一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。

爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

若是windows環境下的話,去掉sudo

三、Message durability(消息持久化)

  若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。

  首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):

channel.queue_declare(queue='hello', durable=True)#durable=True隊列持久化,注意:task和workers端均要寫上

  這時候,咱們只是確保在RabbitMq重啓以後queue_declare隊列不會丟失。另外,咱們須要把咱們的消息也要設爲持久化——將delivery_mode的屬性設爲2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

  注意:消息持久化將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。

四、Fair dispatch(公平調度)

  你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。

  這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。那麼問題就來了,Rabbit這樣只按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆

  咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。

channel.basic_qos(prefetch_count=1)

關於隊列大小

  不過,這樣有可能會出現這樣的問題:若是全部的工做者都處理繁忙狀態,你的隊列就會被填滿。你須要留意這個問題,要麼添加更多的工做者(workers),要麼使用其餘策略。

帶消息持久化+公平調度的完整代碼:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()
task
#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()
work

五、Publish/Subscribe(發佈/訂閱)

  前面,咱們搭建了一個工做隊列,每一個任務只分發給一個工做者(worker)。但如今咱們要作的跟以前徹底不同 —— 分發一個消息給多個消費者(consumers)。這種模式被稱爲「發佈/訂閱」。

  發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

  爲了描述這種模式,咱們將會構建一個簡單的日誌系統。它包括兩個程序——第一個程序負責發送日誌消息,第二個程序負責獲取消息並輸出內容。

  在咱們的這個日誌系統中,全部正在運行的接收方程序都會接受消息。咱們用其中一個接收者(receiver)把日誌寫入硬盤中,另一個接受者(receiver)把日誌輸出到屏幕上。

  最終,日誌消息被廣播給全部的接受者(receivers)。

  要到達這種效果,這時候exchanges就派上用場了

Exchanges(交換機)

  RabbitMQ消息模型的核心理念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。

  發佈者(producer)只須要把消息發送給一個交換機(exchange)。交換機很是簡單,它一邊從發佈者方接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過交換機類型(exchange type)來定義的。

  有幾個可供選擇的交換機類型:

direct(直連交換機):經過routingKey和exchange決定的那個惟一的queue能夠接收消息

topic(主題交換機):全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
        例:#.a會匹配a.a,aa.a,aaa.a等
            *.a會匹配a.a,b.a,c.a等
       注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout

headers(頭交換機):經過headers 來決定把消息發給哪些queue

fanout(扇形交換機):全部bind到此exchange的queue均可以接收消息

  咱們在這裏主要說明最後一個 —— 扇型交換機(fanout)。先建立一個fanout類型的交換機,命名爲logs:

channel.exchange_declare(exchange='logs',
                         type='fanout')

  扇型交換機(fanout)很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。

交換器列表:

#rabbitmqctl可以列出服務器上全部的交換器:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

#這個列表中有一些叫作amq.*的交換器。這些都是默認建立的,不過這時候你還不須要使用他們。

匿名的交換器:

  前面,咱們對交換機一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串("")默認的交換機。

回想咱們以前是這樣發佈一則消息:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

#exchange參數就是交換機的名稱。空字符串表明默認或者匿名交換機:消息將會根據指定的routing_key分發到指定的隊列。

如今,咱們就能夠發送消息到一個具名交換機了:

channel.basic_publish(exchange='logs',
                      routing_key='',#廣播,故不用指定queue名
                      body=message)

 

臨時隊列

  你還記得以前咱們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——咱們須要把工做者(workers)指向正確的隊列。若是你打算在發佈者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。

  可是這並不適用於咱們的日誌系統。咱們打算接收全部的日誌消息,而不只僅是一小部分。咱們關心的是最新的消息而不是舊的。爲了解決這個問題,咱們須要作兩件事情。

首先,當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只須要在調用queue_declare方法的時候,不提供queue參數就能夠了:

result = channel.queue_declare()

  這時候咱們能夠經過result.method.queue得到已經生成的隨機隊列名。它多是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被當即刪除。exclusive標識符便可達到此目的。

注意:由於廣播是即時連通的,消費者是沒法接收在其開啓前生產者所發送的消息的,這就跟收音機的原理相似。

result = channel.queue_declare(exclusive=True)#exclusive排他的,惟一的,不指定queue名,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除

queue_name=result.method.queue #得到已經生成的隨機隊列名

綁定(Bindings)

  咱們已經建立了一個扇型交換機(fanout)和一個隊列。這時候咱們須要告訴交換機如何發送消息給咱們的隊列。交換器和隊列之間的聯繫咱們稱之爲綁定(binding)。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

  如今,logs交換機將會把消息添加到咱們的隊列中

rabbitmqctl list_bindings #列出全部現存的綁定。

代碼整合:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()
publisher
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
subscriber

 

六、Routing路由

  RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

  在前面中,咱們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能作的僅僅是廣播。這時候咱們將會使用直連交換機(direct exchange)來代替。路由的算法很簡單 —— 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列。

direct exchange

  在這個場景中,咱們能夠看到直連交換機 X和兩個隊列進行了綁定。第一個隊列使用error做爲綁定鍵,第二個隊列有三個綁定,一個使用info做爲綁定鍵,第二個使用error,第三個使用warning。

這樣一來,當路由鍵爲error的消息發佈到交換機,就會被路由到隊列Q1和Q2。路由鍵爲info或者warning的消息就會路由到Q2。其餘的全部消息都將會被丟棄。

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
publisher
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
subscriber

 七、topic exchange主題交換機

  儘管直連交換機可以改善咱們的系統,可是它也有它的限制 —— 沒辦法基於多個標準執行路由操做。

發送到主題交換機(topic exchange)的消息不能夠攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是一個由.分隔開的詞語列表。這些單詞隨即是什麼均可以,可是最好是跟攜帶它們的消息有關係的詞彙。如下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數能夠隨意,可是不要超過255字節。

綁定鍵也必須擁有一樣的格式。主題交換機背後的邏輯跟直連交換機很類似 —— 一個攜帶着特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。可是它的綁定鍵和路由鍵有兩個特殊應用方式:

  • * (星號) 用來表示一個單詞.
  • # (井號) 用來表示任意數量(零個或多個)單詞。

下邊用圖說明:
None

這個例子裏,咱們發送的全部消息都是用來描述小動物的。發送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.分割開。路由鍵裏的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。因此它看起來是這樣的: <celerity>.<colour>.<species>

咱們建立了三個綁定:Q1的綁定鍵爲 *.orange.*,Q2的綁定鍵爲 *.*.rabbit 和 lazy.# 。

這三個綁定鍵被能夠總結爲:

  • Q1 對全部的桔黃色動物都感興趣。
  • Q2 則是對全部的兔子全部懶惰的動物感興趣。

一個攜帶有 quick.orange.rabbit 的消息將會被分別投遞給這兩個隊列。攜帶着 lazy.orange.elephant的消息一樣也會給兩個隊列都投遞過去。另外一方面攜帶有 quick.orange.fox 的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox 的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit 的消息只會被投遞給第二個隊列一次,即便它同時匹配第二個隊列的兩個綁定。攜帶着 quick.brown.fox 的消息不會投遞給任何一個隊列。

若是咱們違反約定,發送了一個攜帶有一個單詞或者四個單詞("orange" or"quick.orange.male.rabbit")的消息時,發送的消息不會投遞給任何一個隊列,並且會丟失掉。

可是另外一方面,即便 "lazy.orange.male.rabbit" 有四個單詞,他仍是會匹配最後一個綁定,而且被投遞到第二個隊列中。

主題交換機

主題交換機是很強大的,它能夠表現出跟其餘交換機相似的行爲

當一個隊列的綁定鍵爲 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收全部的消息。

當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。

組合在一塊兒

接下來咱們會將主題交換機應用到咱們的日誌系統中。在開始工做前,咱們假設日誌的路由鍵由兩個單詞組成,路由鍵看起來是這樣的:<facility>.<severity>

整合:

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
publisher
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()
subscriber
#執行下邊命令 接收全部日誌:
python receive_logs_topic.py "#"

#執行下邊命令 接收來自」kern「設備的日誌:
python receive_logs_topic.py "kern.*"

#執行下邊命令 只接收嚴重程度爲」critical「的日誌:
python receive_logs_topic.py "*.critical"

#執行下邊命令 創建多個綁定:
python receive_logs_topic.py "kern.*" "*.critical"

#執行下邊命令 發送路由鍵爲 "kern.critical" 的日誌:
python emit_log_topic.py "kern.critical" "A critical kernel error"

#另外,上邊代碼不會對路由鍵和綁定鍵作任何假設,因此你能夠在命令中使用超過兩個路由鍵參數。

八、RPC遠程過程調用

  當咱們須要將一個函數運行在遠程計算機上而且等待從那兒獲取結果時,這時候咱們就須要用到RPC(Remote Procedure Call)了。

構建一個RPC系統:

  RPC系統:包含一個客戶端和一個RPC服務器

客戶端接口

  爲了展現RPC服務如何使用,咱們建立了一個簡單的客戶端類。它會暴露出一個名爲「call」的方法用來發送一個RPC請求,而且在收到迴應前保持阻塞。

 
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

關於RPC的注意事項:

  儘管RPC在計算領域是一個經常使用模式,但它也常常被詬病。當一個問題被拋出的時候,程序員每每意識不到這究竟是由本地調用仍是由較慢的RPC調用引發的。一樣的困惑還來自於系統的不可預測性和給調試工做帶來的沒必要要的複雜性。跟軟件精簡不一樣的是,濫用RPC會致使不可維護的麪條代碼.

考慮到這一點,牢記如下建議:

  確保可以明確的搞清楚哪一個函數是本地調用的,哪一個函數是遠程調用的。給你的系統編寫文檔。保持各個組件間的依賴明確。處理錯誤案例。明瞭客戶端改如何處理RPC服務器的宕機和長時間無響應狀況。

  當對避免使用RPC有疑問的時候。若是能夠的話,你應該儘可能使用異步管道來代替RPC類的阻塞。結果被異步地推送到下一個計算場景。

回調隊列

通常來講經過RabbitMQ來實現RPC是很容易的。一個客戶端發送請求信息,服務器端將其應用到一個回覆信息中。爲了接收到回覆信息,客戶端須要在發送請求的時候同時發送一個回調隊列(callback queue)的地址:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

消息屬性:

AMQP協議給消息預約義了一系列的14個屬性。大多數屬性不多會用到,除了如下幾個:

  • delivery_mode(投遞模式):將消息標記爲持久的(值爲2)或暫存的(除了2以外的其餘任何值)。第二篇教程裏接觸過這個屬性,記得吧?
  • content_type(內容類型):用來描述編碼的mime-type。例如在實際使用中經常使用application/json來描述JOSN編碼類型。
  • reply_to(回覆目標):一般用來命名回調隊列。
  • correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。

關聯標識

  在上面咱們給每個RPC請求新建一個回調隊列。但這不是一個高效的作法,而更好的辦法 —— 咱們能夠爲每一個客戶端只創建一個獨立的回調隊列。可是這樣的話,問題就來了,隊列接收到一個響應的時候它沒法辨別出這個響應是屬於哪一個請求的。這時候correlation_id 就能夠爲咱們解決這個問題了。咱們給每一個請求設置一個獨一無二的值。稍後,當咱們從回調隊列中接收到一個消息的時候,咱們就能夠查看這條屬性從而將響應和請求匹配起來。若是咱們接手到的消息的correlation_id是未知的,那就直接銷燬掉它,由於它不屬於咱們的任何一條請求。

  你也許會問,爲何咱們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。

RPC的工做流程:

咱們的RPC如此工做:

  • 當客戶端啓動的時候,它建立一個匿名獨享的回調隊列。
  • 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另外一個是設置惟一值的 correlation_id 屬性。
  • 將請求發送到一個 rpc_queue 隊列中。
  • RPC工做者(又名:服務器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工做而且將帶有執行結果的消息發送給reply_to字段指定的隊列。
  • 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查correlation_id屬性。若是此屬性的值與請求匹配,將它返回給應用。

建立一個模擬RPC服務來返回斐波那契數列:

服務器端:

  • (4)像往常同樣,咱們創建鏈接,聲明隊列
  • (11)咱們聲明咱們的fibonacci函數,它假設只有合法的正整數看成輸入。(別期望這個函數能處理很大的數值,函數遞歸大家都懂得...)
  • (19)咱們爲 basic_consume 聲明瞭一個回調函數,這是RPC服務器端的核心。它執行實際的操做而且做出響應。
  • (32)或許咱們但願能在服務器上多開幾個線程。爲了能將負載平均地分攤到多個服務器,咱們須要將 prefetch_count 設置好。
#!/usr/bin/env/ python
# -*-coding:utf-8 -*-
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,#客戶端指定返回消息的queue
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpc_server

客戶端:

  • (7)創建鏈接、通道而且爲回覆(replies)聲明獨享的回調隊列。
  • (16)咱們訂閱這個回調隊列,以便接收RPC的響應。
  • (18)「on_response」回調函數對每個響應執行一個很是簡單的操做,檢查每個響應消息的correlation_id屬性是否與咱們期待的一致,若是一致,將響應結果賦給self.response,而後跳出consuming循環。
  • (23)接下來,咱們定義咱們的主要方法 call 方法。它執行真正的RPC請求。
  • (24)在這個方法中,首先咱們生成一個惟一的 correlation_id 值而且保存起來,'on_response'回調函數會用它來獲取符合要求的響應。
  • (25)接下來,咱們將帶有 reply_to 和 correlation_id 屬性的消息發佈出去。
  • (32)如今咱們能夠坐下來,等待正確的響應到來。
  • (33)最後,咱們將響應返回給用戶。
#!/usr/bin/env/ python
# -*-coding:utf-8 -*-

import pika
import uuid
#uuid 通用惟一標識符 ( Universally Unique Identifier ),經過MAC地址、時間戳、命名空間、隨機數、僞隨機數來保證生成ID的惟一性
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

#聲明一個consumer,用於處理請求
        self.channel.basic_consume(self.on_response, #只要一收到消息,就調用on_response
                                    no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())#uuid4()——基於隨機數
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,#讓服務器端執行完命令後,把消息返回到這個queue裏
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n))
        # 接收返回的數據
        while self.response is None:
            self.connection.process_data_events() #非阻塞版的start_consuming(),沒有消息不阻塞
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
rpc_client
相關文章
相關標籤/搜索