關於RPC的介紹請參考百度百科裏的關於RPC的介紹:http://baike.baidu.com/view/32726.htm#sub32726java
如今來看看Rabbitmq中RPC吧!RPC的工做示意圖以下:json
上圖中的C表明客戶端,S表示服務器端;Rabbitmq中的RPC流程以下:服務器
一、首先客戶端發送一個reply_to和corrention_id的請求,發佈到RPC隊列中;app
二、服務器端處理這個請求,並把處理結果發佈到一個回調Queue,此Queue的名稱應當與reply_to的名稱一致dom
三、客戶端從回調Queue中獲得先前correlation_id設定的值的處理結果。若是碰到和先前不同的corrention_id的值,將會忽略而不是拋出異常。ui
對於上面所提到的回調Queue中的消費處理使用的是BasicProperties類;而消息 屬性在AMQP的協議中規定有14個;而不少大部分咱們沒有用到。經常使用的幾個屬性有:this
1 Message properties 2 The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following: 3 4 delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial. 5 content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json. 6 reply_to: Commonly used to name a callback queue. 7 correlation_id: Useful to correlate RPC responses with requests.
delivery_mode : 標記消息是持久性消息仍是瞬態信息。在前面的「Work Queue」中咱們已經提到過; spa
content_type : 用來描述MIME的類型。如把其類型設定爲JSON;code
reply_to : 用於命名一個回調Queue;htm
correlation_id : 用於與相關聯的請求的RPC響應.
當客戶端想要調用服務器的某個方法來完成某項功能時,就能夠使用rabbitMQ支持的PRC服務。
其實RPC服務與普通的收發消息的區別不大, RPC的過程其實就是
客戶端向服務端定義好的Queue發送消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。
示例:
1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.ConsumerCancelledException; 10 import com.rabbitmq.client.QueueingConsumer; 11 import com.rabbitmq.client.QueueingConsumer.Delivery; 12 import com.rabbitmq.client.ShutdownSignalException; 13 14 public class RPCServer { 15 16 public static final String RPC_QUEUE_NAME = "rpc_queue"; 17 18 public static String sayHello(String name){ 19 return "hello " + name ; 20 } 21 22 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 23 24 ConnectionFactory connFac = new ConnectionFactory() ; 25 connFac.setHost("localhost"); 26 27 Connection conn = connFac.newConnection() ; 28 29 Channel channel = conn.createChannel() ; 30 31 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ; 32 33 QueueingConsumer consumer = new QueueingConsumer(channel); 34 35 channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ; 36 37 while(true){ 38 System.out.println("服務端等待接收消息.."); 39 Delivery deliver = consumer.nextDelivery() ; 40 System.out.println("服務端成功收到消息.."); 41 BasicProperties props = deliver.getProperties() ; 42 43 String message = new String(deliver.getBody() , "UTF-8") ; 44 45 String responseMessage = sayHello(message) ; 46 47 BasicProperties responseProps = new BasicProperties.Builder() 48 .correlationId(props.getCorrelationId()) 49 .build() ; 50 51 //將結果返回到客戶端Queue 52 channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ; 53 54 //向客戶端確認消息 55 channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false); 56 System.out.println("服務端返回消息完成.."); 57 } 58 59 } 60 61 }
1 package com.zf.rabbitmq07; 2 3 import java.io.IOException; 4 import java.util.UUID; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.ConsumerCancelledException; 10 import com.rabbitmq.client.QueueingConsumer; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 import com.rabbitmq.client.QueueingConsumer.Delivery; 13 import com.rabbitmq.client.ShutdownSignalException; 14 15 public class RPCClient { 16 17 public static final String RPC_QUEUE_NAME = "rpc_queue"; 18 19 public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 20 21 ConnectionFactory connFac = new ConnectionFactory() ; 22 connFac.setHost("localhost"); 23 Connection conn = connFac.newConnection() ; 24 Channel channel = conn.createChannel() ; 25 26 //響應QueueName ,服務端將會把要返回的信息發送到該Queue 27 String responseQueue = channel.queueDeclare().getQueue() ; 28 29 String correlationId = UUID.randomUUID().toString() ; 30 31 BasicProperties props = new BasicProperties.Builder() 32 .replyTo(responseQueue) 33 .correlationId(correlationId) 34 .build(); 35 36 String message = "is_zhoufeng"; 37 channel.basicPublish( "" , RPC_QUEUE_NAME , props , message.getBytes("UTF-8")); 38 39 QueueingConsumer consumer = new QueueingConsumer(channel) ; 40 41 channel.basicConsume( responseQueue , consumer) ; 42 43 while(true){ 44 45 Delivery delivery = consumer.nextDelivery() ; 46 47 if(delivery.getProperties().getCorrelationId().equals(correlationId)){ 48 String result = new String(delivery.getBody()) ; 49 System.out.println(result); 50 } 51 52 } 53 } 54 55 }