RPC框架原理剖析(含實例)(轉)

轉自:http://blog.csdn.net/rulon147/article/details/53814589java

1、什麼是RPC

 

 

RPC(Remote Procedure Call Protocol)—— 遠程過程調用協議,它是一種經過 網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。 RPC協議假定某些 傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI 網絡通訊模型中,RPC跨越了 傳輸層應用層。RPC使得開發包括網絡 分佈式多程序在內的應用程序更加容易。
RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆 信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。
有多種 RPC模式和執行。最初由 Sun 公司提出。IETF ONC 憲章從新修訂了 Sun 版本,使得 ONC RPC 協議成爲 IETF 標準協議。如今使用最廣泛的模式和執行是開放式軟件基礎的分佈式計算 環境(DCE)。

 

 

2、圖例說明

 


3、java 實例演示

 

一、實現技術方案

 

     下面使用比較原始的方案實現RPC框架,採用Socket通訊、動態代理與反射與Java原生的序列化。bash

 

二、RPC框架架構

     RPC架構分爲三部分:服務器

 

  1. 服務提供者,運行在服務器端,提供服務接口定義與服務實現類。
  2. 服務中心,運行在服務器端,負責將本地服務發佈成遠程服務,管理遠程服務,提供給服務消費者使用。
  3. 服務消費者,運行在客戶端,經過遠程代理對象調用遠程服務。

 

 

 

 

 

三、 具體實現

 

 

 

1)服務提供者接口定義與實現,代碼以下:網絡

[java]  view plain  copy
 
  1. package services;  
  2.   
  3.   
  4. public interface HelloService {  
  5.   
  6.   
  7.     String sayHi(String name);  
  8.   
  9.   
  10. }  


2)HelloServices接口實現類:架構

 

[java]  view plain  copy
 
  1. package services.impl;  
  2.   
  3.   
  4. import services.HelloService;  
  5.   
  6.   
  7. public class HelloServiceImpl implements HelloService {  
  8.   
  9.   
  10.     public String sayHi(String name) {  
  11.         return "Hi, " + name;  
  12.     }  
  13.   
  14.   
  15. }  



 

3)服務中心代碼實現,代碼以下:框架

[java]  view plain  copy
 
  1. package services;  
  2.   
  3.   
  4. import java.io.IOException;  
  5.   
  6.   
  7. public interface Server {  
  8.     public void stop();  
  9.   
  10.   
  11.     public void start() throws IOException;  
  12.   
  13.   
  14.     public void register(Class serviceInterface, Class impl);  
  15.   
  16.   
  17.     public boolean isRunning();  
  18.   
  19.   
  20.     public int getPort();  
  21. }  


4)服務中心實現類:socket

 

[java]  view plain  copy
 
  1. package services.impl;  
  2.   
  3.   
  4.   
  5.   
  6.   
  7.   
  8. import java.io.IOException;  
  9. import java.io.ObjectInputStream;  
  10. import java.io.ObjectOutputStream;  
  11. import java.lang.reflect.Method;  
  12. import java.net.InetSocketAddress;  
  13. import java.net.ServerSocket;  
  14. import java.net.Socket;  
  15. import java.util.HashMap;  
  16. import java.util.concurrent.ExecutorService;  
  17. import java.util.concurrent.Executors;  
  18.   
  19.   
  20. import services.Server;  
  21.   
  22.   
  23. public class ServiceCenter implements Server {  
  24.     private static ExecutorService              executor        = Executors.newFixedThreadPool(Runtime.getRuntime()  
  25.                                                                         .availableProcessors());  
  26.   
  27.   
  28.     private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();  
  29.   
  30.   
  31.     private static boolean                      isRunning       = false;  
  32.   
  33.   
  34.     private static int                          port;  
  35.   
  36.   
  37.     public ServiceCenter(int port) {  
  38.         this.port = port;  
  39.     }  
  40.   
  41.   
  42.     public void stop() {  
  43.         isRunning = false;  
  44.         executor.shutdown();  
  45.     }  
  46.   
  47.   
  48.     public void start() throws IOException {  
  49.         ServerSocket server = new ServerSocket();  
  50.         server.bind(new InetSocketAddress(port));  
  51.         System.out.println("start server");  
  52.         try {  
  53.             while (true) {  
  54.                 // 1.監聽客戶端的TCP鏈接,接到TCP鏈接後將其封裝成task,由線程池執行  
  55.                 executor.execute(new ServiceTask(server.accept()));  
  56.             }  
  57.         } finally {  
  58.             server.close();  
  59.         }  
  60.     }  
  61.   
  62.   
  63.     public void register(Class serviceInterface, Class impl) {  
  64.         serviceRegistry.put(serviceInterface.getName(), impl);  
  65.     }  
  66.   
  67.   
  68.     public boolean isRunning() {  
  69.         return isRunning;  
  70.     }  
  71.   
  72.   
  73.     public int getPort() {  
  74.         return port;  
  75.     }  
  76.   
  77.   
  78.     private static class ServiceTask implements Runnable {  
  79.         Socket clent = null;  
  80.   
  81.   
  82.         public ServiceTask(Socket client) {  
  83.             this.clent = client;  
  84.         }  
  85.   
  86.   
  87.         public void run() {  
  88.             ObjectInputStream input = null;  
  89.             ObjectOutputStream output = null;  
  90.             try {  
  91.                 // 2.將客戶端發送的碼流反序列化成對象,反射調用服務實現者,獲取執行結果  
  92.                 input = new ObjectInputStream(clent.getInputStream());  
  93.                 String serviceName = input.readUTF();  
  94.                 String methodName = input.readUTF();  
  95.                 Class<?>[] parameterTypes = (Class<?>[]) input.readObject();  
  96.                 Object[] arguments = (Object[]) input.readObject();  
  97.                 Class serviceClass = serviceRegistry.get(serviceName);  
  98.                 if (serviceClass == null) {  
  99.                     throw new ClassNotFoundException(serviceName + " not found");  
  100.                 }  
  101.                 Method method = serviceClass.getMethod(methodName, parameterTypes);  
  102.                 Object result = method.invoke(serviceClass.newInstance(), arguments);  
  103.   
  104.   
  105.                 // 3.將執行結果反序列化,經過socket發送給客戶端  
  106.                 output = new ObjectOutputStream(clent.getOutputStream());  
  107.                 output.writeObject(result);  
  108.             } catch (Exception e) {  
  109.                 e.printStackTrace();  
  110.             } finally {  
  111.                 if (output != null) {  
  112.                     try {  
  113.                         output.close();  
  114.                     } catch (IOException e) {  
  115.                         e.printStackTrace();  
  116.                     }  
  117.                 }  
  118.                 if (input != null) {  
  119.                     try {  
  120.                         input.close();  
  121.                     } catch (IOException e) {  
  122.                         e.printStackTrace();  
  123.                     }  
  124.                 }  
  125.                 if (clent != null) {  
  126.                     try {  
  127.                         clent.close();  
  128.                     } catch (IOException e) {  
  129.                         e.printStackTrace();  
  130.                     }  
  131.                 }  
  132.             }  
  133.   
  134.   
  135.         }  
  136.     }  
  137.   
  138.   
  139.   
  140.   
  141.   
  142.   
  143. }  



 

5)客戶端的遠程代理對象:分佈式

 

[java]  view plain  copy
 
  1. package client;  
  2.   
  3.   
  4. import java.io.ObjectInputStream;  
  5. import java.io.ObjectOutputStream;  
  6. import java.lang.reflect.InvocationHandler;  
  7. import java.lang.reflect.Proxy;  
  8. import java.net.InetSocketAddress;  
  9. import java.net.Socket;  
  10. import java.lang.reflect.Method;  
  11.   
  12.   
  13. public class RPCClient<T> {  
  14.     @SuppressWarnings("unchecked")  
  15.     public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {  
  16.         // 1.將本地的接口調用轉換成JDK的動態代理,在動態代理中實現接口的遠程調用  
  17.         return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] { serviceInterface },  
  18.                 new InvocationHandler() {  
  19.                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  20.                         Socket socket = null;  
  21.                         ObjectOutputStream output = null;  
  22.                         ObjectInputStream input = null;  
  23.                         try {  
  24.                             // 2.建立Socket客戶端,根據指定地址鏈接遠程服務提供者  
  25.                             socket = new Socket();  
  26.                             socket.connect(addr);  
  27.   
  28.   
  29.                             // 3.將遠程服務調用所需的接口類、方法名、參數列表等編碼後發送給服務提供者  
  30.                             output = new ObjectOutputStream(socket.getOutputStream());  
  31.                             output.writeUTF(serviceInterface.getName());  
  32.                             output.writeUTF(method.getName());  
  33.                             output.writeObject(method.getParameterTypes());  
  34.                             output.writeObject(args);  
  35.   
  36.   
  37.                             // 4.同步阻塞等待服務器返回應答,獲取應答後返回  
  38.                             input = new ObjectInputStream(socket.getInputStream());  
  39.                             return input.readObject();  
  40.                         } finally {  
  41.                             if (socket != null)  
  42.                                 socket.close();  
  43.                             if (output != null)  
  44.                                 output.close();  
  45.                             if (input != null)  
  46.                                 input.close();  
  47.                         }  
  48.                     }  
  49.                 });  
  50.     }  
  51. }  



 

6)最後爲測試類:測試

 

[java]  view plain  copy
 
  1. package client;  
  2.   
  3.   
  4. import java.io.IOException;  
  5. import java.net.InetSocketAddress;  
  6.   
  7.   
  8. import services.HelloService;  
  9. import services.Server;  
  10. import services.impl.HelloServiceImpl;  
  11. import services.impl.ServiceCenter;  
  12.   
  13.   
  14. public class RPCTest {  
  15.     public static void main(String[] args) throws IOException {  
  16.         new Thread(new Runnable() {  
  17.             public void run() {  
  18.                 try {  
  19.                     Server serviceServer = new ServiceCenter(8088);  
  20.                     serviceServer.register(HelloService.class, HelloServiceImpl.class);  
  21.                     serviceServer.start();  
  22.                 } catch (IOException e) {  
  23.                     e.printStackTrace();  
  24.                 }  
  25.             }  
  26.         }).start();  
  27.         HelloService service = RPCClient  
  28.                 .getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));  
  29.         System.out.println(service.sayHi("test"));  
  30.     }  
  31.   
  32.   
  33. }  



 

 

運行結果:this

[java]  view plain  copy
 
  1. regeist service HelloService  
  2. start server  
  3. Hi, test  
相關文章
相關標籤/搜索