在第二個教程中,咱們學習瞭如何使用工做隊列在多個worker之間分配耗時的任務。
可是若是咱們須要在遠程計算機上運行功能並等待結果呢?嗯,這是另一件事情,這種模式一般被稱爲遠程過程調用(RPC)。
在本教程中咱們將使用RabbitMQ的創建一個RPC系統:一個客戶端和一個可伸縮的RPC服務器。因爲咱們沒有什麼耗時的任務,咱們要建立一個返回斐波那契數虛設RPC服務。html
爲了說明RPC如何使用,咱們將建立一個簡單的客戶端類。它將建立一個名爲call的方法——發送RPC請求,而且處於阻塞狀態,直到收到應答。java
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
PRC筆記
儘管PRC是一個常見的模式,它常常受到批評。當程序員不知道他所調用的方法是本地的仍是一個緩慢的RPC,問題就出現了。這樣的混亂在系統中形成不可預料的結果,並增長了沒必要要的調試的複雜性,相比於簡單的軟件,PRC的濫用可能致使形成不可維護的麪條式的代碼。
考慮到這一點,請參考如下建議:git確保能明確分辨出哪些函數是本地的,哪些是遠程的。
創建文檔,讓組件之間的依賴關係更清楚。
處理錯誤的case,若是RPC服務器掛了很長時間,客戶端應該怎麼處理?
若是對以上有疑問,請避免使用。若是沒有,你也應該使用異步管道,而不是阻塞式的RPC調用,結果被異步地推到下一個計算階段。程序員
通常來講利用RabbitMQ來作RPC是很簡單的。客戶端發送請求消息,服務端回覆應答消息。爲了能收到回覆,咱們須要發送一個「callback」隊列地址在請求裏面。咱們可使用默認隊列(這是Java客戶端獨有的):github
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議預約義了14個屬性去發送消息。大部分的屬性都不多使用,可是下列除外:json
deliveryMode:標記的消息爲持久(值爲2)或暫時的(任何其餘值)。你可能還記得第二個教程中的此屬性。
contentType:用於描述MIME類型的編碼。例如,對於常用JSON編碼,是一個很好的作法,將此屬性設置爲:application/json。
eplyTo: 經常使用於命名一個回調隊列。
correlationId: 用於關聯的RPC響應。服務器
咱們須要import:網絡
import com.rabbitmq.client.AMQP.BasicProperties;
在上面介紹的方法中,咱們建議爲每個RPC請求創建一個回調隊列。這是至關低效的,幸虧有一個更好的辦法 - 讓咱們建立每一個客戶端一個回調隊列。
這樣產生了一個新的問題,在收到該回調隊列的響應的時候,咱們並不知道該響應是哪一個請求的響應,這就是correlationId屬性的用處,咱們將它設置爲每一個請求的惟一值。這樣,當咱們在回調隊列收到一條消息的時候,咱們將看看這個屬性,就能找到與這個響應相對應的請求。若是咱們看到一個未知的correlationId,咱們徹底能夠丟棄消息,由於他不併不屬於咱們系統。
你也許會問,爲何咱們選擇丟棄這個消息,而不是拋出一個錯誤。這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。併發
咱們的RPC這樣工做:
app
斐波那契數列任務:
private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
咱們定義一個斐波那契的方法,假定只有有效的正整數輸入。(不要期望它爲大數據工做,這多是最慢的遞歸實現)
咱們的RPC服務器RPCServer.java的代碼以下:
private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
以上服務端的代碼很簡單:
RPCClient.java:
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }
客戶端的代碼稍微複雜:
客戶端請求:
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
以上的設計不是惟一可能的實現一個RPC服務的,但它有一些重要的優勢:
咱們的代碼依舊很是簡單,並且沒有試圖去解決一些複雜(可是重要)的問題,如:
原文地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
代碼地址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發佈訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)