譯:6.RabbitMQ Java Client 之 Remote procedure call (RPC,遠程過程調用)

在  譯:2. RabbitMQ 之Work Queues (工做隊列)  咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。html

可是若是咱們須要在遠程計算機上運行一個函數並等待結果呢?嗯,這是一個不一樣的故事。此模式一般稱爲遠程過程調用RPCjava

在本教程中,咱們將使用RabbitMQ構建RPC系統:客戶端和可伸縮的RPC服務器。因爲咱們沒有任何值得分發的耗時任務,咱們將建立一個返回Fibonacci數字的虛擬RPC服務。git

客戶端界面

爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。程序員

它將公開一個名爲call的方法,該方法發送一個RPC請求並阻塞,直到收到答案爲止:github

 

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
字符串結果= fibonacciRpc.call(「4」);
System.out.println(「fib(4)is」 + result);

 

關於RPC的說明

儘管RPC在計算中是一種很是常見的模式,但它常常受到批評。當程序員不知道函數調用是本地的仍是慢的RPC時,會出現問題。這樣的混淆致使系統不可預測,並增長了調試的沒必要要的複雜性。錯誤使用RPC能夠致使不可維護的意大利麪條代碼,而不是簡化軟件。json

考慮到這一點,請考慮如下建議:安全

  • 確保明顯哪一個函數調用是本地的,哪一個是遠程的。
  • 記錄您的系統。使組件之間的依賴關係變得清晰。
  • 處理錯誤案例。當RPC服務器長時間停機時,客戶端應該如何反應?

若有疑問,請避免使用RPC。若是能夠,您應該使用異步管道 - 而不是相似RPC的阻塞,將結果異步推送到下一個計算階段。服務器

回調隊列

通常來講,經過RabbitMQ進行RPC很容易。客戶端發送請求消息,服務器回覆響應消息。爲了接收響應,咱們須要發送帶有請求的「回調」隊列地址。咱們可使用默認隊列(在Java客戶端中是獨佔的)。咱們來試試吧:網絡

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

Message properties

The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:併發

  • deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • replyTo: Commonly used to name a callback queue.
  • correlationId: Useful to correlate RPC responses with requests.

咱們須要一個新的導入:

import com.rabbitmq.client.AMQP.BasicProperties;

相關ID

在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,但幸運的是有更好的方法 - 讓咱們爲每一個客戶端建立一個回調隊列。

這引起了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪一個請求。那是在使用correlationId屬性的時候 咱們將爲每一個請求將其設置爲惟一值。稍後,當咱們在回調隊列中收到一條消息時,咱們將查看此屬性,並根據該屬性,咱們將可以將響應與請求進行匹配。若是咱們看到未知的correlationId值,咱們能夠安全地丟棄該消息 - 它不屬於咱們的請求。

您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是失敗並出現錯誤?這是因爲服務器端可能存在競爭條件。雖然不太可能,但RPC服務器可能會在向咱們發送答案以後,但在發送請求的確認消息以前死亡。若是發生這種狀況,從新啓動的RPC服務器將再次處理請求。這就是爲何在客戶端上咱們必須優雅地處理重複的響應,理想狀況下RPC應該是冪等的。

概要

咱們的RPC將這樣工做:

  • 對於RPC請求,客戶端發送帶有兩個屬性的消息: replyTo,設置爲僅爲請求建立的匿名獨佔隊列;以及correlationId,設置爲每一個請求的惟一值。
  • 請求被髮送到rpc_queue隊列。
  • RPC worker(aka:server)正在等待該隊列上的請求。當出現請求時,它會執行該做業,並使用來自replyTo字段的隊列將帶有結果的消息發送回客戶端
  • 客戶端等待回覆隊列上的數據。出現消息時,它會檢查correlationId屬性。若是它與請求中的值匹配,則返回對應用程序的響應。

把它們放在一塊兒

斐波納契任務:

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

咱們宣佈咱們的斐波那契函數。它假定只有有效的正整數輸入。(不要期望這個適用於大數字,它多是最慢的遞歸實現)。

服務器代碼很是簡單:

  • 像往常同樣,咱們首先創建鏈接,通道和聲明隊列。
  • 咱們可能但願運行多個服務器進程。爲了在多個服務器上平均分配負載,咱們須要在channel.basicQos中設置 prefetchCount設置。
  • 咱們使用basicConsume來訪問隊列,咱們​​以對象(DefaultConsumer的形式提供回調,它將完成工做併發迴響應。

咱們的RPC客戶端的代碼能夠在這裏找到:RPCClient.java

客戶端代碼稍微複雜一些:

  • 咱們創建了一個鏈接和渠道。
  • 咱們的調用方法生成實際的RPC請求。
  • 在這裏,咱們首先生成一個惟一的correlationId 數並保存它 - 咱們 在RpcConsumer中handleDelivery實現將使用該值來捕獲適當的響應。
  • 而後,咱們爲回覆建立一個專用的獨佔隊列並訂閱它。
  • 接下來,咱們發佈請求消息,其中包含兩個屬性: replyTocorrelationId
  • 在這一點上,咱們能夠坐下來等待正確的響應到來。
  • 因爲咱們的消費者交付處理是在一個單獨的線程中進行的,所以咱們須要在響應到來以前暫停線程。使用BlockingQueue是一種可能的解決方案。這裏咱們建立了ArrayBlockingQueue ,容量設置爲1,由於咱們只須要等待一個響應。
  • handleDelivery方法是作一個很簡單的工做,對每一位消費響應消息它會檢查的correlationID 是咱們要找的人。若是是這樣,它會將響應置於BlockingQueue
  • 同時線程正在等待響應從BlockingQueue獲取它
  • 最後,咱們將響應返回給用戶。

發出客戶請求:

RPCClient fibonacciRpc = new RPCClient(); 
 System.out.println(「[x] Requesting fib(30)」);  字符串響應= fibonacciRpc.call(「30」);  System.out.println(「[。] Got'」 + response + 「'」);   fibonacciRpc.close(); 

RPCClient.java

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RPCClient {
    private Connection connection;
      private Channel channel;
      private String requestQueueName = "rpc_queue";

      public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
      }

      public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        String ctag = channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (properties.getCorrelationId().equals(corrId)) {
              response.offer(new String(body, "UTF-8"));
            }
          }
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
      }

      public void close() throws IOException {
        connection.close();
      }

      public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
          fibonacciRpc = new RPCClient();

          for (int i = 0; i < 32; i++) {
            String i_str = Integer.toString(i);
            System.out.println(" [x] Requesting fib(" + i_str + ")");
            response = fibonacciRpc.call(i_str);
            System.out.println(" [.] Got '" + response + "'");
          }
        }
        catch  (IOException | TimeoutException | InterruptedException e) {
          e.printStackTrace();
        }
        finally {
          if (fibonacciRpc!= null) {
            try {
              fibonacciRpc.close();
            }
            catch (IOException _ignore) {}
          }
        }
      }
}

RPCServer.java

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";

      private static int fib(int n) {
        if (n ==0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
      }

      public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
          connection      = factory.newConnection();
          final Channel channel = connection.createChannel();

          channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
          channel.queuePurge(RPC_QUEUE_NAME);

          channel.basicQos(1);

          System.out.println(" [x] Awaiting RPC requests");

          Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                      .Builder()
                      .correlationId(properties.getCorrelationId())
                      .build();

              String response = "";

              try {
                String message = new String(body,"UTF-8");
                int n = Integer.parseInt(message);

                System.out.println(" [.] fib(" + message + ")");
                response += fib(n);
              }
              catch (RuntimeException e){
                System.out.println(" [.] " + e.toString());
              }
              finally {
                channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
                // RabbitMq consumer worker thread notifies the RPC server owner thread 
                synchronized(this) {
                    this.notify();
                }
              }
            }
          };

          channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
          // Wait and be prepared to consume the message from RPC client.
          while (true) {
              synchronized(consumer) {
                  try {
                      consumer.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();            
                  }
              }
          }
        } catch (IOException | TimeoutException e) {
          e.printStackTrace();
        }
        finally {
          if (connection != null)
            try {
              connection.close();
            } catch (IOException _ignore) {}
        }
      }
}

 

 此處介紹的設計並非RPC服務的惟一可能實現,但它具備一些重要優點:

  • 若是RPC服務器太慢,您能夠經過運行另外一個服務器來擴展。嘗試在新控制檯中運行第二個RPCServer
  • 在客戶端,RPC只須要發送和接收一條消息。不須要像queueDeclare這樣的同步調用 。所以,對於單個RPC請求,RPC客戶端只須要一次網絡往返。

咱們的代碼仍然至關簡單,並不試圖解決更復雜(但重要)的問題,例如:

  • 若是沒有運行服務器,客戶應該如何反應?
  • 客戶端是否應該爲RPC設置某種超時?
  • 若是服務器出現故障並引起異常,是否應將其轉發給客戶端?
  • 在處理以前防止無效的傳入消息(例如檢查邊界,鍵入)。
相關文章
相關標籤/搜索