在第四篇學習筆記中,咱們學習瞭如何使用工做隊列在多個工做者之間分配耗時的任務。java
可是,若是咱們須要在遠程計算機上運行一個函數並等待結果呢?這是另外一回事。這種模式一般稱爲遠程過程調用或RPC。安全
在本篇學習筆記中,咱們將使用RabbitMQ構建一個RPC系統:客戶機和可伸縮的RPC服務器。因爲咱們沒有任何值得分發的耗時任務,因此咱們將建立一個返回斐波那契數的虛擬RPC服務。服務器
爲了說明如何使用RPC服務,咱們將建立一個簡單的客戶端類。它將公開一個名爲call的方法,該方法發送一個RPC請求並阻塞,直到收到答案:app
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
一般,在RabbitMQ上執行RPC是很容易的。客戶端發送請求消息,服務器用響應消息進行響應。爲了接收響應,咱們須要向請求發送一個「回調」隊列地址。咱們可使用默認隊列(在Java客戶機中是獨佔的)。讓咱們試一試:dom
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息屬性ide
AMQP 0-9-1協議預先定義了一組包含消息的14個屬性。大多數屬性不多被使用,除了如下狀況:函數
deliveryMode:將消息標記爲持久化(值爲2)或瞬變(任何其餘值)。您可能還記得第二個教程中的這個屬性。學習
contentType:用於描述編碼的mime類型。例如,對於一般使用的JSON編碼,最好將這個屬性設置爲:application/ JSON。fetch
replyTo:一般用於命名回調隊列。ui
correlationid:有助於將RPC響應與請求關聯起來。
correlationId做用
咱們爲每一個請求設置一個惟一值correctionid。用於當隊列接收到響應後區分是哪一個請求的響應。稍後,當咱們在回調隊列中接收到消息時,咱們將查看此屬性,並基於此,咱們將可以將響應與請求匹配。若是咱們看到一個未知的correlationId值,咱們能夠安全地丟棄消息—它不屬於咱們的請求。
咱們的RPC將這樣工做:
當客戶端啓動時,它將建立一個匿名獨佔回調隊列(官方教程建立的是匿名的)。
對於RPC請求,客戶端發送一條消息,該消息具備兩個屬性:replyTo,它被設置爲回調隊列和correlationId,它被設置爲每一個請求的惟一值。
請求被髮送到rpc_queue隊列。
RPC工做程序(即:server)正在等待該隊列上的請求。當出現請求時,它會執行該任務並使用replyTo字段中的隊列將結果發送回客戶機。
客戶端等待回調隊列上的數據。當消息出現時,它會檢查相關屬性。若是它匹配來自請求的值,它將嚮應用程序返回響應。
服務端代碼:
package com.rabbitmq.cn; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RPCServer { // 定義一個遠程隊列名稱 private static final String RPCQUEUENAME = "RPCqueue"; // 斐波那契數函數 private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub // 建立工廠獲取鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setPort(5672); factory.setPassword("123456"); factory.setUsername("admin"); Connection connection = null; try{ // 得到鏈接 connection = factory.newConnection(); // 建立隊列 Channel channel = connection.createChannel(); // 聲明一個遠程的消息隊列 channel.queueDeclare(RPCQUEUENAME, false, false, false, null); // 爲了減輕服務器負擔,當多個服務一同工做時,能夠設置以下參數 channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); // 執行客戶端發送的請求任務 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // 設置返回的消息屬性 AMQP.BasicProperties replyPros = new BasicProperties().builder() .correlationId(properties.getCorrelationId()).build(); String response = ""; try { // 對傳遞過來的文字版數字解析,解析後調用函數處理,處理之後的結果做爲返回值準備返回給客戶端 String message = new String(body, "utf-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (Exception e) { // TODO: handle exception }finally{ // 返回處理後的結果給客戶端 channel.basicPublish("", properties.getReplyTo(), replyPros, response.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; /*The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.*/ // Wait and be prepared to consume the message from RPC client. channel.basicConsume(RPCQUEUENAME, false, consumer); while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }}catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } }}
服務端代碼過程顯示,一般咱們從創建鏈接、通道和聲明隊列開始。
咱們可能但願運行多個服務器進程。爲了將負載均勻地分佈到多個服務器上,咱們須要在channel.basicQos中設置prefetchCount設置。
咱們使用basicconsumption訪問隊列,在隊列中咱們以對象(DefaultConsumer)的形式提供回調,該對象將執行該工做並將響應發送回。
客戶端代碼:
package com.rabbitmq.cn; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "RPCqueue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); connection = factory.newConnection(); channel = connection.createChannel(); // 建立臨時隊列 replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { // 經過uuid生成請求段的correctionId final String corrId = UUID.randomUUID().toString(); // 設置correctionId和replyTo屬性 AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // 發送請求到請求隊列中 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); // 定義一個阻塞的消息隊列,來掛起線程等待相應 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); // 消費消息 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { // 經過構造函數獲取鏈接,並建立一個臨時匿名的隊列 fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (IOException _ignore) {} } } } }
客戶端代碼稍微複雜一些:
咱們創建一個鏈接和通道,併爲回覆聲明一個獨佔的「回調」隊列。
咱們訂閱「回調」隊列,以便接收RPC響應。
咱們的調用方法生成實際的RPC請求。
在這裏,咱們首先生成一個惟一的correlationId號並保存它——咱們在DefaultConsumer中實現的handleDelivery將使用這個值來捕獲適當的響應。
接下來,咱們發佈請求消息,有兩個屬性:replyTo和correlationId。
此時,咱們能夠坐下來等待合適的答覆。
因爲咱們的消費者交付處理是在一個單獨的線程中進行的,因此在響應到達以前,咱們須要一些東西來掛起主線程。使用BlockingQueue是一種可能的解決方案。這裏咱們建立了ArrayBlockingQueue,它的容量設置爲1,由於咱們須要等待一個響應。
handleDelivery方法作的是一項很是簡單的工做,對於每一個消耗的響應消息,它檢查correlationId是不是咱們要查找的那個。若是是,它將響應放置到BlockingQueue。
與此同時,主線程正在等待響應從BlockingQueue接收。
最後,咱們將響應返回給用戶。
運行後,咱們獲得結果