RabbitMQ之遠程過程調用(RPC)【譯】

目錄

客戶端接口(Client interface)
回調隊列(Callback queue)
關聯標識(Correlation Id)
總結
代碼整合

在第二個教程中,咱們學習瞭如何使用工做隊列在多個worker之間分配耗時的任務。
可是若是咱們須要在遠程計算機上運行功能並等待結果呢?嗯,這是另一件事情,這種模式一般被稱爲遠程過程調用(RPC)。
在本教程中咱們將使用RabbitMQ的創建一個RPC系統:一個客戶端和一個可伸縮的RPC服務器。因爲咱們沒有什麼耗時的任務,咱們要建立一個返回斐波那契數虛設RPC服務。html

客戶端接口(Client interface)

爲了說明RPC如何使用,咱們將建立一個簡單的客戶端類。它將建立一個名爲call的方法——發送RPC請求,而且處於阻塞狀態,直到收到應答。java

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

PRC筆記

儘管PRC是一個常見的模式,它常常受到批評。當程序員不知道他所調用的方法是本地的仍是一個緩慢的RPC,問題就出現了。這樣的混亂在系統中形成不可預料的結果,並增長了沒必要要的調試的複雜性,相比於簡單的軟件,PRC的濫用可能致使形成不可維護的麪條式的代碼。
考慮到這一點,請參考如下建議:git

確保能明確分辨出哪些函數是本地的,哪些是遠程的。
創建文檔,讓組件之間的依賴關係更清楚。
處理錯誤的case,若是RPC服務器掛了很長時間,客戶端應該怎麼處理?
若是對以上有疑問,請避免使用。若是沒有,你也應該使用異步管道,而不是阻塞式的RPC調用,結果被異步地推到下一個計算階段。程序員

回調隊列(Callback queue)

通常來講利用RabbitMQ來作RPC是很簡單的。客戶端發送請求消息,服務端回覆應答消息。爲了能收到回覆,咱們須要發送一個「callback」隊列地址在請求裏面。咱們可使用默認隊列(這是Java客戶端獨有的):github

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息屬性

AMQP協議預約義了14個屬性去發送消息。大部分的屬性都不多使用,可是下列除外:json

deliveryMode:標記的消息爲持久(值爲2)或暫時的(任何其餘值)。你可能還記得第二個教程中的此屬性。
contentType:用於描述MIME類型的編碼。例如,對於常用JSON編碼,是一個很好的作法,將此屬性設置爲:application/json。
eplyTo: 經常使用於命名一個回調隊列。
correlationId: 用於關聯的RPC響應。服務器

咱們須要import:網絡

import com.rabbitmq.client.AMQP.BasicProperties;

關聯標識(Correlation Id)

在上面介紹的方法中,咱們建議爲每個RPC請求創建一個回調隊列。這是至關低效的,幸虧有一個更好的辦法 - 讓咱們建立每一個客戶端一個回調隊列。
這樣產生了一個新的問題,在收到該回調隊列的響應的時候,咱們並不知道該響應是哪一個請求的響應,這就是correlationId屬性的用處,咱們將它設置爲每一個請求的惟一值。這樣,當咱們在回調隊列收到一條消息的時候,咱們將看看這個屬性,就能找到與這個響應相對應的請求。若是咱們看到一個未知的correlationId,咱們徹底能夠丟棄消息,由於他不併不屬於咱們系統。
你也許會問,爲何咱們選擇丟棄這個消息,而不是拋出一個錯誤。這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。併發

總結

咱們的RPC這樣工做:app

 

  1. 當客戶端啓動的時候,建立一個匿名的獨享的回調隊列。
  2. 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另外一個是設置惟一值的 correlation_id 屬性。
  3. 該請求被髮送到rpc_queue隊列。
  4. RPC工做者(又名:服務器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工做而且將帶有執行結果的消息發送給reply_to字段指定的隊列。
  5. 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查correlation_id屬性。若是此屬性的值與請求匹配,將它返回給應用。

代碼整合

斐波那契數列任務:

private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

咱們定義一個斐波那契的方法,假定只有有效的正整數輸入。(不要期望它爲大數據工做,這多是最慢的遞歸實現)
咱們的RPC服務器RPCServer.java的代碼以下:

private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

以上服務端的代碼很簡單:

  1. 和一般同樣,咱們從創建一個鏈接,一個通道和定義一個隊列開始。
  2. 咱們可能須要運行多個服務器進程。爲了在多個服務器上均勻分佈的負荷,咱們須要設置channel.basicQos中的prefetchCount。
  3. 咱們使用basicConsume訪問隊列。而後,進入while循環中,等待請求消息,完成工做併發送回響應。

RPCClient.java:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}

客戶端的代碼稍微複雜:

  1. 咱們創建一個鏈接,一個通道和一個用於接收回復的回調隊列。
  2. 咱們訂閱「回調」的隊列,這樣咱們就能夠接收RPC響應。
  3. 咱們的call方法發出實際的RPC請求。
  4. 在這裏,咱們首先生成一個惟一的correlationID,並保存它 - while循環會使用這個值來捕捉適當的響應。
  5. 接下來,咱們發佈請求消息時,具備兩個屬性:的replyTo和的correlationID。
  6. 在這一點上,咱們能夠坐下來,等到適當的響應到達。
  7. while循環正在作一個很簡單的工做,對於每個響應消息它會檢查的correlationID是咱們要找的人。若是是,它將保存的響應。
  8. 最後,咱們返回響應給用戶。

客戶端請求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

以上的設計不是惟一可能的實現一個RPC服務的,但它有一些重要的優勢:

  • 若是RPC服務器速度太慢,則只需運行多個便可。嘗試在新的控制檯運行的第二RPCServer。
  • 在客戶端,RPC請求只發送或接收一條消息。不須要像 queue_declare 這樣的異步調用。因此RPC客戶端的單個請求只須要一個網絡往返。

咱們的代碼依舊很是簡單,並且沒有試圖去解決一些複雜(可是重要)的問題,如:

  • 當沒有服務器運行時,客戶端如何做出反映。
  • 客戶端是否須要實現相似RPC超時的東西。
  • 若是服務器發生故障,而且拋出異常,應該被轉發到客戶端嗎?
  • 在處理前,防止混入無效的信息(例如檢查邊界)

原文地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html

代碼地址:https://github.com/aheizi/hi-mq

相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發佈訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)

相關文章
相關標籤/搜索