RPC(Remote Procedure Call)—遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。git
RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息的到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。數組
1. Client端獲取一個 RPC 代理對象 proxy安全
2. 調用 proxy 上的方法, 被 InvocationHandler 實現類 Invoker 的 invoke() 方法捕獲服務器
3. invoke() 方法內將 RPC 請求封裝成 Invocation 實例, 再向 Server 發送 RPC請求網絡
4. Server端循環接收 RPC請求, 對每個請求都建立一個 Handler線程處理併發
5. Handler線程從輸入流中反序列化出 Invocation實例, 再調用 Server端的實現方法分佈式
6. 調用結束, 向 Client端返回調用結果ide
InvocationHandler 的實現類高併發
/** * InvocationHandler 接口的實現類 <br> * Client端代理對象的方法調用都會被 Invoker 的 invoke() 方法捕獲 */ public class Invoker implements InvocationHandler { /** RPC協議接口的 Class對象 */ private Class<?> intface; /** Client 端 Socket */ private Socket client; /** 用於向 Server端發送 RPC請求的輸出流 */ private ObjectOutputStream oos; /** 用於接收 Server端返回的 RPC請求結果的輸入流 */ private ObjectInputStream ois; /** * 構造一個 Socket實例 client, 並鏈接到指定的 Server端地址, 端口 * * @param intface * RPC協議接口的 Class對象 * @param serverAdd * Server端地址 * @param serverPort * Server端監聽的端口 */ public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException { this.intface = intface; client = new Socket(serverAdd, serverPort); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { // 封裝 RPC請求 Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args); // 打開 client 的輸出流 oos = new ObjectOutputStream(client.getOutputStream()); // 序列化, 將 RPC請求寫入到 client 的輸出流中 oos.writeObject(invocation); oos.flush(); // 等待 Server端返回 RPC請求結果 // // 打開 client 的輸入流 ois = new ObjectInputStream(client.getInputStream()); // 反序列化, 從輸入流中讀取 RPC請求結果 Object res = ois.readObject(); // 向 client 返回 RPC請求結果 return res; } finally { // 關閉資源 CloseUtil.closeAll(ois, oos); CloseUtil.closeAll(client); } } }
Serializable 的實現類, RPC請求的封裝
/** * RPC調用的封裝, 包括如下字段: <br> * methodName: 方法名 <br> * parameterTypes: 方法參數列表的 Class 對象數組 <br> * params: 方法參數列表 */ @SuppressWarnings("rawtypes") public class Invocation implements Serializable { private static final long serialVersionUID = -7311316339835834851L; /** RPC協議接口的 Class對象 */ private Class<?> intface; /** 方法名 */ private String methodName; /** 方法參數列表的 Class 對象數組 */ private Class[] parameterTypes; /** 方法的參數列表 */ private Object[] params; public Invocation() { } /** * 構造一個 RPC請求的封裝 * * @param intface * RPC協議接口的 Class對象 * @param methodName * 方法名 * @param parameterTypes * 方法參數列表的 Class 對象數組 * @param params * 方法的參數列表 */ public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) { this.intface = intface; this.methodName = methodName; this.parameterTypes = parameterTypes; this.params = params; } public Class getIntface() { return intface; } public String getMethodName() { return methodName; } public Class[] getParameterTypes() { return parameterTypes; } public Object[] getParams() { return params; } }
構造 Client端代理對象, Server端實例
/** * 一個構造 Server 端實例與 Client 端代理對象的類 */ public class RPC { /** * 獲取一個 Client 端的代理對象 * * @param intface * RPC協議接口, Client 與 Server 端共同遵照 * @param serverAdd * Server 端地址 * @param serverPort * Server 端監聽的端口 * @return Client 端的代理對象 */ public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException { Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface }, new Invoker(intface, serverAdd, serverPort)); return proxy; } /** * 獲取 RPC 的 Server 端實例 * * @param intface * RPC協議接口 * @param intfaceImpl * Server 端 RPC協議接口的實現 * @param port * Server 端監聽的端口 * @return RPCServer 實例 */ public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException { return new RPCServer(intface, intfaceImpl, port); } }
Server端接收 RPC請求, 處理請求
/** * RPC 的 Server端 */ public class RPCServer { /** Server端的 ServerSocket實例 */ private ServerSocket server; /** Server端 RPC協議接口的實現緩存, 一個接口對應一個實現類的實例 */ private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>(); /** * 構造一個 RPC 的 Server端實例 * * @param intface * RPC協議接口的 Class對象 * @param intfaceImpl * Server端 RPC協議接口的實現 * @param port * Server端監聽的端口 */ public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException { server = new ServerSocket(port); RPCServer.intfaceImpls.put(intface, intfaceImpl); } /** * 循環監聽並接收 Client端鏈接, 處理 RPC請求, 向 Client端返回結果 */ public void start() { try { while (true) { // 接收 Client端鏈接, 建立一個 Handler線程, 處理 RPC請求 new Handler(server.accept()).start(); } } catch (IOException e) { e.printStackTrace(); } finally { // 關閉資源 CloseUtil.closeAll(server); } } /** * 向 RPC協議接口的實現緩存中添加緩存 * * @param intface * RPC協議接口的 Class對象 * @param intfaceImpl * Server端 RPC協議接口的實現 */ public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) { RPCServer.intfaceImpls.put(intface, intfaceImpl); } /** * 處理 RPC請求的線程類 */ private static class Handler extends Thread { /** Server端接收到的 Client端鏈接 */ private Socket client; /** 用於接收 client 的 RPC請求的輸入流 */ private ObjectInputStream ois; /** 用於向 client 返回 RPC請求結果的輸出流 */ private ObjectOutputStream oos; /** RPC請求的封裝 */ private Invocation invocation; /** * 用 Client端鏈接構造 Handler線程 * * @param client */ public Handler(Socket client) { this.client = client; } @Override public void run() { try { // 打開 client 的輸入流 ois = new ObjectInputStream(client.getInputStream()); // 反序列化, 從輸入流中讀取 RPC請求的封裝 invocation = (Invocation) ois.readObject(); // 從 RPC協議接口的實現緩存中獲取實現 Object intfaceImpl = intfaceImpls.get(invocation.getIntface()); // 獲取 Server端 RPC協議接口的方法實現 Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); // 跳過安全檢查 method.setAccessible(true); // 調用具體的實現方法, 用 res 接收方法返回結果 Object res = method.invoke(intfaceImpl, invocation.getParams()); // 打開 client 的輸出流 oos = new ObjectOutputStream(client.getOutputStream()); // 序列化, 向輸出流中寫入 RPC請求的結果 oos.writeObject(res); oos.flush(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉資源 CloseUtil.closeAll(ois, oos); CloseUtil.closeAll(client); } } } }
Login類, RPC協議接口
/** * RPC協議接口, Client 與 Server端共同遵照 */ public interface Login { /** * 抽象方法 login(), 模擬用戶登陸傳入兩個String 類型的參數, 返回 String類型的結果 * * @param username * 用戶名 * @param password * 密碼 * @return 返回登陸結果 */ public String login(String username, String password); }
LoginImpl類, Server 端 RPC協議接口( Login )的實現類
/** * Server端 RPC協議接口( Login )的實現類 */ public class LoginImpl implements Login { /** * 實現 login()方法, 模擬用戶登陸 * * @param username * 用戶名 * @param password * 密碼 * @return hello 用戶名 */ @Override public String login(String username, String password) { return "hello " + username; } }
ClientTest類, Client端測試類
/** * Client端測試類 */ public class ClientTest { public static void main(String[] args) throws UnknownHostException, IOException { // 獲取一個 Client端的代理對象 proxy Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888); // 調用 proxy 的 login() 方法, 返回值爲 res String res = proxy.login("rpc", "password"); // 輸出 res System.out.println(res); } }
ServerTest類, Server端測試類
/** * Server端測試類 */ public class ServerTest { public static void main(String[] args) throws ClassNotFoundException, IOException { // 獲取 RPC 的 Server 端實例 server RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888); // 循環監聽並接收 Client 端鏈接, 處理 RPC 請求, 向 Client 端返回結果 server.start(); } }
運行 ServerTest, 控制檯輸出:
Starting Socket Handler for port 8888
運行 ClientTest, 控制檯輸出:
hello rpc
至此, 實現了基於 Proxy, Socket, IO 的簡單版 RPC模型,
對於每個 RPC請求, Server端都開啓一個 Handler線程處理該請求,
在高併發狀況下, Server端是扛不住的, 改用 NIO應該表現更好