假如咱們想要調用遠程的一個方法或函數並等待執行結果,也就是咱們一般說的遠程過程調用(Remote Procedure Call)。怎麼辦?dom
今天咱們就用RabbitMQ來實現一個簡單的RPC系統:客戶端發送一個請求消息,服務端以一個響應消息迴應。爲了可以接收到響應,客戶端在發送消息的同時發送一個回調隊列用來告訴服務端響應消息發送到哪一個隊列裏面。也就是說每一個消息一個回調隊列,在此基礎上咱們變下,將回調隊列定義成類的屬性,這個每一個客戶端一個隊列,同一個客戶端的請求共用一個隊列。那麼接下來有個問題,怎麼知道這個隊列裏面的響應消息是屬於哪一個隊列的呢?ide
咱們會用到關聯標識(correlationId),每一個請求咱們都會生成一個惟一的值做爲correlationId,這樣每次有響應消息來的時候,咱們就去看correlationId來肯定究竟是哪一個請求的響應消息,將請求和響應關聯起來。若是收到一個不知道的correlationId,就能夠肯定不是這個客戶端的請求的響應,能夠直接丟棄掉。函數
1、工做模型ui
rpc_queue
隊列的請求。當有請求到來時,它就會開始幹活並將結果經過發送消息來返回,該返回消息發送到replyTo
指定的隊列。correlation id
屬性。若是該屬性值和請求匹配,就將響應返回給程序。2、代碼實現spa
接下來看代碼實現:線程
public class RpcClient { Connection connection = null; Channel channel = null; //回調隊列:用來接收服務端的響應消息 String queueName = ""; // 定義RpcClient public RpcClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); queueName = channel.queueDeclare().getQueue(); } // 真正的處理邏輯 public String call(String msg) throws IOException, InterruptedException { final String uuid = UUID.randomUUID().toString(); //後續,服務端根據"replyTo"來指定將返回信息寫入到哪一個隊列 //後續,服務端根據關聯標識"correlationId"來指定返回的響應是哪一個請求的 AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build(); channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes()); final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } } }); return blockQueue.take(); } //關閉鏈接 public void close() throws IOException { connection.close(); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { RpcClient client = new RpcClient(); client.call("4"); client.close(); } }
發送請求的時候,它是生產者;接受響應的時候,它是消費者。3d
public class RpcServer { //RPC隊列名 public static final String QUEUE_NAME = "rpc_queue"; //斐波那契數列,用來模擬工做任務 public static int fib(int num) { if (num == 0) return 0; if (num == 1) return 1; return fib(num - 1) + fib(num - 2); } public static void main(String[] args) throws InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { // 1.connection & channel connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 2.declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("****** rpc server waiting for client request ......"); // 3.每次只接收一個消息(任務) channel.basicQos(1); //4.獲取消費實例 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { BasicProperties prop = new BasicProperties().builder().correlationId(properties.getCorrelationId()) .build(); String resp = ""; try { String msg = new String(body, "UTF-8"); resp = fib(Integer.valueOf(msg)) + ""; System.out.println("*** will response to rpc client :" + resp); } catch (Exception ex) { ex.printStackTrace(); } finally { channel.basicPublish("", properties.getReplyTo(), prop, resp.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 5.消費消息(處理任務) channel.basicConsume(QUEUE_NAME, false, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
接受請求的時候,它是消費者;發送響應的時候,它是生產者。code
服務端(多了一條打印): ****** rpc server waiting for client request ...... *** will response to rpc client :3 客戶端: **** rpc client reciver response :[3]
3、小插曲server
剛開始我在寫demo的時候,client中沒有用到阻塞隊列final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1);,而是直接這樣寫:blog
@Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); //blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } }
指望能打印出結果來,可是運行後發現並無打印:**** rpc client reciver response :[" + msg + "]的值。
緣由是handleDelivery()這個方法是在子線程中運行的,這個子線程運行的時候,主線程會繼續日後執行直到執行了client.close();方法而結束了。
因爲主線程終止了,致使沒有打印出結果。加了阻塞隊列以後將主線程阻塞不執行close()方法,問題就解決了。