在雲計算環境中,不少時候須要用它其餘機器的計算資源,咱們有可能會在接收到Message進行處理時,會把一部分計算任務分配到其餘節點來完成。那麼,RabbitMQ如何使用RPC呢?在本篇文章中,咱們將會經過其它節點求來斐波納契完成示例。html
爲了展現一個RPC服務是如何使用的,咱們將建立一段很簡單的客戶端class。 它將會向外提供名字爲call的函數,這個call會發送RPC請求而且阻塞知道收到RPC運算的結果。代碼以下:python
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)複製代碼
整體來講,在RabbitMQ進行RPC遠程調用是比較容易的。client發送請求的Message而後server返回響應結果。爲了收到響應client在publish message時須要提供一個」callback「(回調)的queue地址。code以下:json
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...複製代碼
AMQP 預約義了14個屬性。它們中的絕大多不多會用到。如下幾個是平時用的比較多的:bash
- delivery_mode: 持久化一個Message(經過設定值爲2)。其餘任意值都是非持久化。請移步RabbitMQ消息隊列(三):任務分發機制
- content_type: 描述mime-type 的encoding。好比設置爲JSON編碼:設置該property爲application/json。
- reply_to: 通常用來指明用於回調的queue(Commonly used to name a callback queue)。
- correlation_id: 在請求中關聯處理RPC響應(correlate RPC responses with requests)。
在上個小節裏,實現方法是對每一個RPC請求都會建立一個callback queue。這是不高效的。幸運的是,在這裏有一個解決方法:爲每一個client建立惟一的callback queue。app
這又有其餘問題了:收到響應後它沒法肯定是不是它的,由於全部的響應都寫到同一個queue了。上一小節的correlation_id在這種狀況下就派上用場了:對於每一個request,都設置惟一的一個值,在收到響應後,經過這個值就能夠判斷是不是本身的響應。若是不是本身的響應,就不去處理。函數
工做流程:oop
The code for rpc_server.py:fetch
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = /
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()
複製代碼
The server code is rather straightforward:ui
The code for rpc_client.py:this
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)複製代碼
The client code is slightly more involved:
開始rpc_server.py:
$ python rpc_server.py
[x] Awaiting RPC requests複製代碼
經過client來請求fibonacci數:
$ python rpc_client.py
[x] Requesting fib(30)複製代碼
如今這個設計並非惟一的,可是這個實現有如下優點:
咱們的code仍是挺簡單的,並無嘗試去解決更復雜和重要的問題,好比:
參考資料:
1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html
2. http://blog.csdn.net/anzhsoft/article/details/19633107