RabbitMQ消息隊列(七):適用於雲計算集羣的遠程調用(RPC)

在雲計算環境中,不少時候須要用它其餘機器的計算資源,咱們有可能會在接收到Message進行處理時,會把一部分計算任務分配到其餘節點來完成。那麼,RabbitMQ如何使用RPC呢?在本篇文章中,咱們將會經過其它節點求來斐波納契完成示例。html

1. 客戶端接口 Client interface

爲了展現一個RPC服務是如何使用的,咱們將建立一段很簡單的客戶端class。 它將會向外提供名字爲call的函數,這個call會發送RPC請求而且阻塞知道收到RPC運算的結果。代碼以下:python

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)複製代碼

2. 回調函數隊列 Callback queue

整體來講,在RabbitMQ進行RPC遠程調用是比較容易的。client發送請求的Message而後server返回響應結果。爲了收到響應client在publish message時須要提供一個」callback「(回調)的queue地址。code以下:json

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...複製代碼

2.1 Message properties

AMQP 預約義了14個屬性。它們中的絕大多不多會用到。如下幾個是平時用的比較多的:bash

  • delivery_mode: 持久化一個Message(經過設定值爲2)。其餘任意值都是非持久化。請移步RabbitMQ消息隊列(三):任務分發機制
  • content_type: 描述mime-type 的encoding。好比設置爲JSON編碼:設置該property爲application/json。
  • reply_to: 通常用來指明用於回調的queue(Commonly used to name a callback queue)。
  • correlation_id: 在請求中關聯處理RPC響應(correlate RPC responses with requests)。

3. 相關id Correlation id

在上個小節裏,實現方法是對每一個RPC請求都會建立一個callback queue。這是不高效的。幸運的是,在這裏有一個解決方法:爲每一個client建立惟一的callback queue。app

這又有其餘問題了:收到響應後它沒法肯定是不是它的,由於全部的響應都寫到同一個queue了。上一小節的correlation_id在這種狀況下就派上用場了:對於每一個request,都設置惟一的一個值,在收到響應後,經過這個值就能夠判斷是不是本身的響應。若是不是本身的響應,就不去處理。函數

4. 總結

RabbitMQ消息隊列(七):適用於雲計算集羣的遠程調用(RPC)

工做流程:oop

  • 當客戶端啓動時,它建立了匿名的exclusive callback queue.
  • 客戶端的RPC請求時將同時設置兩個properties: reply_to設置爲callback queue;correlation_id設置爲每一個request一個獨一無二的值.
  • 請求將被髮送到an rpc_queue queue.
  • RPC端或者說server一直在等待那個queue的請求。當請求到達時,它將經過在reply_to指定的queue回覆一個message給client。
  • client一直等待callback queue的數據。當message到達時,它將檢查correlation_id的值,若是值和它request發送時的一致那麼就將返回響應。

5. 最終實現

The code for rpc_server.py:fetch

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = /
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()
複製代碼

The server code is rather straightforward:ui

  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don’t expect this one to work for big numbers, it’s probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It’s executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

The code for rpc_client.py:this

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)複製代碼

The client code is slightly more involved:

  • (7) We establish a connection, channel and declare an exclusive ‘callback’ queue for replies.
  • (16) We subscribe to the ‘callback’ queue, so that we can receive RPC responses.
  • (18) The ‘on_response’ callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we’re looking for. If so, it saves the response inself.response
    and breaks the consuming loop.
  • (23) Next, we define our main call method – it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it – the ‘on_response’ callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties:
    reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

開始rpc_server.py:

$ python rpc_server.py
 [x] Awaiting RPC requests複製代碼

經過client來請求fibonacci數:

$ python rpc_client.py
 [x] Requesting fib(30)複製代碼

如今這個設計並非惟一的,可是這個實現有如下優點:

  • 如何RPC server太慢,你能夠擴展它:啓動另一個RPC server。
  • 在client端, 無所進行加鎖能同步操做,他所做的就是發送請求等待響應。

咱們的code仍是挺簡單的,並無嘗試去解決更復雜和重要的問題,好比:

  • 若是沒有server在運行,client須要怎麼作?
  • RPC應該設置超時機制嗎?
  • 若是server運行出錯而且拋出了異常,須要將這個問題轉發到client嗎?
  • 須要邊界檢查嗎?

參考資料:

1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html

2. http://blog.csdn.net/anzhsoft/article/details/19633107

相關文章
相關標籤/搜索