官網英文版學習——RabbitMQ學習筆記(八)Remote procedure call (RPC)

在第四篇學習筆記中,咱們學習瞭如何使用工做隊列在多個工做者之間分配耗時的任務。java

 

可是,若是咱們須要在遠程計算機上運行一個函數並等待結果呢?這是另外一回事。這種模式一般稱爲遠程過程調用或RPC。安全

 

在本篇學習筆記中,咱們將使用RabbitMQ構建一個RPC系統:客戶機和可伸縮的RPC服務器。因爲咱們沒有任何值得分發的耗時任務,因此咱們將建立一個返回斐波那契數的虛擬RPC服務。服務器

爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。它將公開一個名爲call的方法,該方法發送一個RPC請求並阻塞,直到收到答案:app

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

 

一般,在RabbitMQ上執行RPC是很容易的。客戶端發送請求消息,服務器用響應消息進行響應。爲了接收響應,咱們須要向請求發送一個「回調」隊列地址。咱們可使用默認隊列(在Java客戶機中是獨佔的)。讓咱們試一試:dom

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

 

消息屬性ide

 

AMQP 0-9-1協議預先定義了一組包含消息的14個屬性。大多數屬性不多被使用,除了如下狀況:函數

 

deliveryMode:將消息標記爲持久化(值爲2)或瞬變(任何其餘值)。您可能還記得第二個教程中的這個屬性。學習

 

contentType:用於描述編碼的mime類型。例如,對於一般使用的JSON編碼,最好將這個屬性設置爲:application/ JSON。fetch

 

replyTo:一般用於命名回調隊列。ui

correlationid:有助於將RPC響應與請求關聯起來。

correlationId做用

咱們爲每一個請求設置一個惟一值correctionid。用於當隊列接收到響應後區分是哪一個請求的響應。稍後,當咱們在回調隊列中接收到消息時,咱們將查看此屬性,並基於此,咱們將可以將響應與請求匹配。若是咱們看到一個未知的correlationId值,咱們能夠安全地丟棄消息—它不屬於咱們的請求。

 

咱們的RPC將這樣工做:

當客戶端啓動時,它將建立一個匿名獨佔回調隊列(官方教程建立的是匿名的)。

 

對於RPC請求,客戶端發送一條消息,該消息具備兩個屬性:replyTo,它被設置爲回調隊列和correlationId,它被設置爲每一個請求的惟一值。

 

請求被髮送到rpc_queue隊列。

 

RPC工做程序(即:server)正在等待該隊列上的請求。當出現請求時,它會執行該任務並使用replyTo字段中的隊列將結果發送回客戶機。

客戶端等待回調隊列上的數據。當消息出現時,它會檢查相關屬性。若是它匹配來自請求的值,它將嚮應用程序返回響應。

服務端代碼:

package com.rabbitmq.cn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RPCServer {
//	定義一個遠程隊列名稱
	private static final String RPCQUEUENAME = "RPCqueue"; 
//	斐波那契數函數
	private static int fib(int n) {
	    if (n ==0) return 0;
	    if (n == 1) return 1;
	    return fib(n-1) + fib(n-2);
	  }
	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
//		建立工廠獲取鏈接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setPort(5672);
		factory.setPassword("123456");
		factory.setUsername("admin");
		Connection connection = null;
		try{
//		得到鏈接
		connection = factory.newConnection();
//		建立隊列
		Channel channel = connection.createChannel();
//		聲明一個遠程的消息隊列
		channel.queueDeclare(RPCQUEUENAME, false, false, false, null);
//		爲了減輕服務器負擔,當多個服務一同工做時,能夠設置以下參數
		channel.basicQos(1);
		System.out.println(" [x] Awaiting RPC requests");
//		執行客戶端發送的請求任務
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
//				設置返回的消息屬性
				AMQP.BasicProperties replyPros = new BasicProperties().builder()
						.correlationId(properties.getCorrelationId()).build();
				String response = "";
				try {
//				對傳遞過來的文字版數字解析,解析後調用函數處理,處理之後的結果做爲返回值準備返回給客戶端
					String message = new String(body, "utf-8");
					int n = Integer.parseInt(message);
					System.out.println(" [.] fib(" + message + ")");
		            response += fib(n);
				} catch (Exception e) {
					// TODO: handle exception
				}finally{
//				返回處理後的結果給客戶端
					channel.basicPublish("", properties.getReplyTo(), replyPros, response.getBytes());
					channel.basicAck(envelope.getDeliveryTag(), false);
//			    RabbitMq consumer worker thread notifies the RPC server owner thread 
		            synchronized(this) {
		            	this.notify();
				}
			}
		}
	};
	/*The RPC worker (aka: server) is waiting for requests on that queue. 
	When a request appears, it does the job and sends a message with the result back to the Client, 
	using the queue from the replyTo field.*/
// 		Wait and be prepared to consume the message from RPC client.
	  channel.basicConsume(RPCQUEUENAME, false, consumer);
     
      while (true) {
      	synchronized(consumer) {
      		try {
      			consumer.wait();
      	    } catch (InterruptedException e) {
      	    	e.printStackTrace();	    	
      	    }
      	}
      }}catch (IOException | TimeoutException e) {
          e.printStackTrace();
      }
      finally {
        if (connection != null)
          try {
            connection.close();
          } catch (IOException _ignore) {}
    	  
      }
}}

服務端代碼過程顯示,一般咱們從創建鏈接、通道和聲明隊列開始。

 

咱們可能但願運行多個服務器進程。爲了將負載均勻地分佈到多個服務器上,咱們須要在channel.basicQos中設置prefetchCount設置。

 

咱們使用basicconsumption訪問隊列,在隊列中咱們以對象(DefaultConsumer)的形式提供回調,該對象將執行該工做並將響應發送回。

客戶端代碼:

package com.rabbitmq.cn;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

  private Connection connection;
  private Channel channel;
  private String requestQueueName = "RPCqueue";
  private String replyQueueName;

  public RPCClient() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.10.185");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("123456");
    
    connection = factory.newConnection();
    channel = connection.createChannel();
//  建立臨時隊列
    replyQueueName = channel.queueDeclare().getQueue();
  }

  public String call(String message) throws IOException, InterruptedException {
//	 經過uuid生成請求段的correctionId
    final String corrId = UUID.randomUUID().toString();
//	設置correctionId和replyTo屬性
    AMQP.BasicProperties props = new AMQP.BasicProperties
            .Builder()
            .correlationId(corrId)
            .replyTo(replyQueueName)
            .build();
//	發送請求到請求隊列中
    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//	定義一個阻塞的消息隊列,來掛起線程等待相應
    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//	消費消息
    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getCorrelationId().equals(corrId)) {
          response.offer(new String(body, "UTF-8"));
        }
      }
    });

    return response.take();
  }

  public void close() throws IOException {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient fibonacciRpc = null;
    String response = null;
    try {
//    經過構造函數獲取鏈接,並建立一個臨時匿名的隊列
      fibonacciRpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = fibonacciRpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
    }
    catch  (IOException | TimeoutException | InterruptedException e) {
      e.printStackTrace();
    }
    finally {
      if (fibonacciRpc!= null) {
        try {
          fibonacciRpc.close();
        }
        catch (IOException _ignore) {}
      }
    }
  }
}

 

客戶端代碼稍微複雜一些:

 

咱們創建一個鏈接和通道,併爲回覆聲明一個獨佔的「回調」隊列。

 

咱們訂閱「回調」隊列,以便接收RPC響應。

 

咱們的調用方法生成實際的RPC請求。

 

在這裏,咱們首先生成一個惟一的correlationId號並保存它——咱們在DefaultConsumer中實現的handleDelivery將使用這個值來捕獲適當的響應。

 

接下來,咱們發佈請求消息,有兩個屬性:replyTo和correlationId。

 

此時,咱們能夠坐下來等待合適的答覆。

因爲咱們的消費者交付處理是在一個單獨的線程中進行的,因此在響應到達以前,咱們須要一些東西來掛起主線程。使用BlockingQueue是一種可能的解決方案。這裏咱們建立了ArrayBlockingQueue,它的容量設置爲1,由於咱們須要等待一個響應。

 

handleDelivery方法作的是一項很是簡單的工做,對於每一個消耗的響應消息,它檢查correlationId是不是咱們要查找的那個。若是是,它將響應放置到BlockingQueue。

 

與此同時,主線程正在等待響應從BlockingQueue接收。

最後,咱們將響應返回給用戶。

運行後,咱們獲得結果

相關文章
相關標籤/搜索