rabbitmq中文教程python版 - 遠程過程調用

源碼:https://github.com/ltoddy/rabbitmq-tutorialpython

遠程過程調用(RPC)

(using the Pika Python client)git

本章節教程重點介紹的內容

在第二篇教程中,咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。程序員

可是若是咱們須要在遠程計算機上運行某個功能並等待結果呢?那麼,這是一個不一樣的事情。
這種模式一般稱爲遠程過程調用(RPC)。github

在本教程中,咱們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。
因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回斐波那契數字的虛擬RPC服務。json

客戶端界面

爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。它將公開一個名爲call的方法 ,
它發送一個RPC請求並阻塞,直到收到答案:服務器

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
*有關RPC的說明*

雖然RPC是計算中很常見的模式,但它常常被吹毛求疵。當程序員不知道函數調用是本地的仍是
慢速的RPC時會出現這些問題。像這樣的混亂致使不可預知的問題,並增長了調試的沒必要要的複雜性,
而不是咱們想要的簡化軟件。

銘記這一點,請考慮如下建議:

  * 確保顯而易見哪一個函數調用是本地的,哪一個是遠程的。
  * 記錄您的系統。清楚組件之間的依賴關係。
  * 處理錯誤狀況。當RPC服務器長時間關閉時,客戶端應該如何反應?

有疑問時避免RPC。若是能夠的話,你應該使用異步管道 - 而不是相似於RPC的阻塞,
其結果被異步推送到下一個計算階段。

回調隊列

通常來講,經過RabbitMQ來執行RPC是很容易的。客戶端發送請求消息,服務器回覆響應消息。
爲了接收響應,客戶端須要發送一個「回調」隊列地址和請求。讓咱們試試看:app

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
消息屬性

AMQP 0-9-1協議預約義了一組包含14個屬性的消息。大多數屬性不多使用,但如下狀況除外:

delivery_mode:將消息標記爲持久(值爲2)或瞬態(任何其餘值)。你可能會記得第二篇教程中的這個屬性。
content_type:用於描述編碼的MIME類型。例如,對於常用的JSON編碼,將此屬性設置爲application/json是一種很好的作法。
reply_to:一般用於命名回調隊列。
correlation_id:用於將RPC響應與請求關聯起來。

相關ID

在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,
但幸運的是有一個更好的方法 - 讓咱們爲每一個客戶端建立一個回調隊列。異步

這引起了一個新問題,在該隊列中收到回覆後,不清楚回覆屬於哪一個請求。那是何時使用correlation_id屬性。
咱們將把它設置爲每一個請求的惟一值。稍後,當咱們在回調隊列中收到消息時,咱們將查看此屬性,
並基於此屬性,咱們將可以將響應與請求進行匹配。若是咱們看到未知的correlation_id值,
咱們能夠放心地丟棄該消息 - 它不屬於咱們的請求。函數

您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是拋出錯誤?
這是因爲服務器端可能出現競爭情況。雖然不太可能,但在發送給咱們答案以後,但在發送請求的確認消息以前,
RPC服務器可能會死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理該請求。
這就是爲何在客戶端,咱們必須優雅地處理重複的響應,理想狀況下RPC應該是等冪的。學習

總結

image

咱們的RPC會像這樣工做:

  • 當客戶端啓動時,它建立一個匿名獨佔回調隊列。
  • 對於RPC請求,客戶端將發送具備兩個屬性的消息:reply_to,該消息設置爲回調隊列和correlation_id,該值設置爲每一個請求的惟一值。
  • 該請求被髮送到rpc_queue隊列。
  • RPC worker(又名:服務器)正在等待該隊列上的請求。當出現請求時,它執行該做業,並使用reply_to字段中的隊列將結果發送回客戶端。
  • 客戶端在回調隊列中等待數據。當出現消息時,它會檢查correlation_id屬性。若是它匹配來自請求的值,則返回對應用程序的響應。

把它放在一塊兒

rpc_server.py的代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_size=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服務器代碼很是簡單:

  • (4)像往常同樣,咱們首先創建鏈接並聲明隊列。
  • (11)咱們聲明咱們的斐波那契函數。它只假定有效的正整數輸入。(不要期望這個版本適用於大數字,它多是最慢的遞歸實現)。
  • (20)咱們聲明瞭basic_consume的回調,它是RPC服務器的核心。它在收到請求時執行。它完成工做並將響應發回。
  • (34)咱們可能想運行多個服務器進程。爲了在多臺服務器上平均分配負載,咱們須要設置prefetch_count設置。

rpc_client.py的代碼:

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.corrrelation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客戶端代碼稍有涉及:

  • (8)咱們創建鏈接,通道併爲回覆聲明獨佔的「回調」隊列。
  • (17)咱們訂閱'回調'隊列,以便咱們能夠接收RPC響應。
  • (19)對每一個響應執行的'on_response'回調函數作了一個很是簡單的工做,對於每一個響應消息它檢查correlation_id是不是咱們正在尋找的。若是是這樣,它將保存self.response中的響應並打破消費循環。
  • (23)接下來,咱們定義咱們的主要調用方法 - 它執行實際的RPC請求。
  • (25)在這個方法中,首先咱們生成一個惟一的correlation_id數並保存 - 'on_response'回調函數將使用這個值來捕獲適當的響應。
  • (29)接下來,咱們發佈具備兩個屬性的請求消息:reply_tocorrelation_id
  • (32)在這一點上,咱們能夠坐下來等待,直到適當的迴應到達。
  • (41)最後,咱們將回復返回給用戶。
相關文章
相關標籤/搜索