rabbitMQ學習筆記(七) RPC 遠程過程調用

關於RPC的介紹請參考百度百科裏的關於RPC的介紹:http://baike.baidu.com/view/32726.htm#sub32726java

   如今來看看Rabbitmq中RPC吧!RPC的工做示意圖以下:json


   上圖中的C表明客戶端,S表示服務器端;Rabbitmq中的RPC流程以下:服務器

一、首先客戶端發送一個reply_to和corrention_id的請求,發佈到RPC隊列中;app

二、服務器端處理這個請求,並把處理結果發佈到一個回調Queue,此Queue的名稱應當與reply_to的名稱一致dom

三、客戶端從回調Queue中獲得先前correlation_id設定的值的處理結果。若是碰到和先前不同的corrention_id的值,將會忽略而不是拋出異常。ui

 

  對於上面所提到的回調Queue中的消費處理使用的是BasicProperties類;而消息 屬性在AMQP的協議中規定有14個;而不少大部分咱們沒有用到。經常使用的幾個屬性有:this

English代碼   收藏代碼
1 Message properties
2 The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
3 
4 delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial. 
5 content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json. 
6 reply_to: Commonly used to name a callback queue. 
7 correlation_id: Useful to correlate RPC responses with requests. 

 delivery_mode : 標記消息是持久性消息仍是瞬態信息。在前面的「Work Queue」中咱們已經提到過;    spa

  content_type : 用來描述MIME的類型。如把其類型設定爲JSON;code

  reply_to : 用於命名一個回調Queue;htm

  correlation_id : 用於與相關聯的請求的RPC響應.

當客戶端想要調用服務器的某個方法來完成某項功能時,就能夠使用rabbitMQ支持的PRC服務。

其實RPC服務與普通的收發消息的區別不大, RPC的過程其實就是  

客戶端向服務端定義好的Queue發送消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。

示例:

 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.QueueingConsumer.Delivery;
12 import com.rabbitmq.client.ShutdownSignalException;
13 
14 public class RPCServer {
15     
16     public static final String RPC_QUEUE_NAME = "rpc_queue";
17     
18     public static String sayHello(String name){
19         return "hello " + name ;
20     }
21     
22     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
23         
24         ConnectionFactory connFac = new ConnectionFactory() ;
25         connFac.setHost("localhost");
26         
27         Connection conn = connFac.newConnection() ;
28         
29         Channel channel = conn.createChannel() ;
30         
31         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
32         
33         QueueingConsumer consumer = new QueueingConsumer(channel);
34         
35         channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
36         
37         while(true){
38             System.out.println("服務端等待接收消息..");  
39             Delivery deliver = consumer.nextDelivery() ;
40             System.out.println("服務端成功收到消息..");
41             BasicProperties props =  deliver.getProperties() ;
42             
43             String message = new String(deliver.getBody() , "UTF-8") ;
44             
45             String responseMessage = sayHello(message) ;
46             
47             BasicProperties responseProps = new BasicProperties.Builder()
48             .correlationId(props.getCorrelationId())  
49             .build() ;
50             
51             //將結果返回到客戶端Queue
52             channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
53              
54             //向客戶端確認消息
55             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
56             System.out.println("服務端返回消息完成..");
57         }
58         
59     }
60 
61 }
 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 import java.util.UUID;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 import com.rabbitmq.client.QueueingConsumer.Delivery;
13 import com.rabbitmq.client.ShutdownSignalException;
14 
15 public class RPCClient {
16 
17     public static final String RPC_QUEUE_NAME = "rpc_queue";
18 
19     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
20 
21         ConnectionFactory connFac = new ConnectionFactory() ;
22         connFac.setHost("localhost");
23         Connection conn = connFac.newConnection() ;
24         Channel channel = conn.createChannel() ;
25 
26         //響應QueueName ,服務端將會把要返回的信息發送到該Queue
27         String responseQueue = channel.queueDeclare().getQueue() ;
28 
29         String correlationId = UUID.randomUUID().toString() ;
30 
31         BasicProperties props = new BasicProperties.Builder()
32         .replyTo(responseQueue)
33         .correlationId(correlationId)
34         .build();
35 
36         String message = "is_zhoufeng";
37         channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
38 
39         QueueingConsumer consumer = new QueueingConsumer(channel)    ;
40 
41         channel.basicConsume( responseQueue , consumer) ;
42 
43         while(true){
44             
45             Delivery delivery = consumer.nextDelivery() ;
46             
47             if(delivery.getProperties().getCorrelationId().equals(correlationId)){
48                 String result = new String(delivery.getBody()) ;  
49                 System.out.println(result);
50             }
51             
52         }
53     }
54 
55 }
相關文章
相關標籤/搜索