在 譯:2. RabbitMQ 之Work Queues (工做隊列) 咱們學習瞭如何使用工做隊列在多個工做人員之間分配耗時的任務。html
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); 字符串結果= fibonacciRpc.call(「4」); System.out.println(「fib(4)is」 + result);
- 確保明顯哪一個函數調用是本地的,哪一個是遠程的。
- 記錄您的系統。使組件之間的依賴關係變得清晰。
- 處理錯誤案例。當RPC服務器長時間停機時,客戶端應該如何反應?
若有疑問,請避免使用RPC。若是能夠,您應該使用異步管道 - 而不是相似RPC的阻塞,將結果異步推送到下一個計算階段。服務器
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
Message properties
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:併發
- deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
- contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
- replyTo: Commonly used to name a callback queue.
- correlationId: Useful to correlate RPC responses with requests.
import com.rabbitmq.client.AMQP.BasicProperties;
在上面介紹的方法中,咱們建議爲每一個RPC請求建立一個回調隊列。這是很是低效的,但幸運的是有更好的方法 - 讓咱們爲每一個客戶端建立一個回調隊列。
這引起了一個新問題,在該隊列中收到響應後,不清楚響應屬於哪一個請求。那是在使用correlationId屬性的時候 。咱們將爲每一個請求將其設置爲惟一值。稍後,當咱們在回調隊列中收到一條消息時,咱們將查看此屬性,並根據該屬性,咱們將可以將響應與請求進行匹配。若是咱們看到未知的correlationId值,咱們能夠安全地丟棄該消息 - 它不屬於咱們的請求。
- 對於RPC請求,客戶端發送帶有兩個屬性的消息: replyTo,設置爲僅爲請求建立的匿名獨佔隊列;以及correlationId,設置爲每一個請求的惟一值。
- 請求被髮送到rpc_queue隊列。
- RPC worker(aka:server)正在等待該隊列上的請求。當出現請求時,它會執行該做業,並使用來自replyTo字段的隊列將帶有結果的消息發送回客戶端。
- 客戶端等待回覆隊列上的數據。出現消息時,它會檢查correlationId屬性。若是它與請求中的值匹配,則返回對應用程序的響應。
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }咱們宣佈咱們的斐波那契函數。它假定只有有效的正整數輸入。(不要期望這個適用於大數字,它多是最慢的遞歸實現)。
- 像往常同樣,咱們首先創建鏈接,通道和聲明隊列。
- 咱們可能但願運行多個服務器進程。爲了在多個服務器上平均分配負載,咱們須要在channel.basicQos中設置 prefetchCount設置。
- 咱們使用basicConsume來訪問隊列,咱們以對象(DefaultConsumer)的形式提供回調,它將完成工做併發迴響應。
- 咱們創建了一個鏈接和渠道。
- 咱們的調用方法生成實際的RPC請求。
- 在這裏,咱們首先生成一個惟一的correlationId 數並保存它 - 咱們 在RpcConsumer中的handleDelivery實現將使用該值來捕獲適當的響應。
- 而後,咱們爲回覆建立一個專用的獨佔隊列並訂閱它。
- 接下來,咱們發佈請求消息,其中包含兩個屬性: replyTo和correlationId。
- 在這一點上,咱們能夠坐下來等待正確的響應到來。
- 因爲咱們的消費者交付處理是在一個單獨的線程中進行的,所以咱們須要在響應到來以前暫停主線程。使用BlockingQueue是一種可能的解決方案。這裏咱們建立了ArrayBlockingQueue ,容量設置爲1,由於咱們只須要等待一個響應。
- 該handleDelivery方法是作一個很簡單的工做,對每一位消費響應消息它會檢查的correlationID 是咱們要找的人。若是是這樣,它會將響應置於BlockingQueue。
- 同時主線程正在等待響應從BlockingQueue獲取它。
- 最後,咱們將響應返回給用戶。
RPCClient fibonacciRpc = new RPCClient(); System.out.println(「[x] Requesting fib(30)」); 字符串響應= fibonacciRpc.call(「30」); System.out.println(「[。] Got'」 + response + 「'」); fibonacciRpc.close();
import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); String ctag = channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (IOException _ignore) {} } } } }RPCServer.java
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
- 若是RPC服務器太慢,您能夠經過運行另外一個服務器來擴展。嘗試在新控制檯中運行第二個RPCServer。
- 在客戶端,RPC只須要發送和接收一條消息。不須要像queueDeclare這樣的同步調用 。所以,對於單個RPC請求,RPC客戶端只須要一次網絡往返。
- 若是沒有運行服務器,客戶應該如何反應?
- 客戶端是否應該爲RPC設置某種超時?
- 若是服務器出現故障並引起異常,是否應將其轉發給客戶端?
- 在處理以前防止無效的傳入消息(例如檢查邊界,鍵入)。