源碼:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在第二篇教程中,咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。程序員
可是若是咱們須要在遠程計算機上運行某個功能並等待結果呢?那麼,這是一個不一樣的事情。
這種模式一般稱爲遠程過程調用(RPC)。github
在本教程中,咱們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。
因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回斐波那契數字的虛擬RPC服務。json
爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。它將公開一個名爲call的方法 ,
它發送一個RPC請求並阻塞,直到收到答案:服務器
fibonacci_rpc = FibonacciRpcClient() result = fibonacci_rpc.call(4) print("fib(4) is %r" % result)
*有關RPC的說明* 雖然RPC是計算中很常見的模式,但它常常被吹毛求疵。當程序員不知道函數調用是本地的仍是 慢速的RPC時會出現這些問題。像這樣的混亂致使不可預知的問題,並增長了調試的沒必要要的複雜性, 而不是咱們想要的簡化軟件。 銘記這一點,請考慮如下建議: * 確保顯而易見哪一個函數調用是本地的,哪一個是遠程的。 * 記錄您的系統。清楚組件之間的依賴關係。 * 處理錯誤狀況。當RPC服務器長時間關閉時,客戶端應該如何反應? 有疑問時避免RPC。若是能夠的話,你應該使用異步管道 - 而不是相似於RPC的阻塞, 其結果被異步推送到下一個計算階段。
通常來講,經過RabbitMQ來執行RPC是很容易的。客戶端發送請求消息,服務器回覆響應消息。
爲了接收響應,客戶端須要發送一個「回調」隊列地址和請求。讓咱們試試看:app
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request)
消息屬性 AMQP 0-9-1協議預約義了一組包含14個屬性的消息。大多數屬性不多使用,但如下狀況除外: delivery_mode:將消息標記爲持久(值爲2)或瞬態(任何其餘值)。你可能會記得第二篇教程中的這個屬性。 content_type:用於描述編碼的MIME類型。例如,對於常用的JSON編碼,將此屬性設置爲application/json是一種很好的作法。 reply_to:一般用於命名回調隊列。 correlation_id:用於將RPC響應與請求關聯起來。
在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,
但幸運的是有一個更好的方法 - 讓咱們爲每一個客戶端建立一個回調隊列。異步
這引起了一個新問題,在該隊列中收到回覆後,不清楚回覆屬於哪一個請求。那是何時使用correlation_id屬性。
咱們將把它設置爲每一個請求的惟一值。稍後,當咱們在回調隊列中收到消息時,咱們將查看此屬性,
並基於此屬性,咱們將可以將響應與請求進行匹配。若是咱們看到未知的correlation_id值,
咱們能夠放心地丟棄該消息 - 它不屬於咱們的請求。函數
您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是拋出錯誤?
這是因爲服務器端可能出現競爭情況。雖然不太可能,但在發送給咱們答案以後,但在發送請求的確認消息以前,
RPC服務器可能會死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理該請求。
這就是爲何在客戶端,咱們必須優雅地處理重複的響應,理想狀況下RPC應該是等冪的。學習
咱們的RPC會像這樣工做:
rpc_server.py的代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_size=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
服務器代碼很是簡單:
rpc_client.py的代碼:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.corrrelation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
客戶端代碼稍有涉及: