RPC的簡單實現

RPC(Remote Procedure Call)—遠程過程調用協議,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,爲通訊程序之間攜帶信息數據。在OSI網絡通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分佈式多程序在內的應用程序更加容易。git

RPC採用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,而後等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息的到達爲止。當一個調用信息到達,服務器得到進程參數,計算結果,發送答覆信息,而後等待下一個調用信息,最後,客戶端調用進程接收答覆信息,得到進程結果,而後調用執行繼續進行。數組

 

源碼託管在 Git@OSC緩存

 

原理

1. Client端獲取一個 RPC 代理對象 proxy安全

2. 調用 proxy 上的方法, 被 InvocationHandler 實現類 Invoker 的 invoke() 方法捕獲服務器

3. invoke() 方法內將 RPC 請求封裝成 Invocation 實例, 再向 Server 發送 RPC請求網絡

4. Server端循環接收 RPC請求, 對每個請求都建立一個 Handler線程處理併發

5. Handler線程從輸入流中反序列化出 Invocation實例, 再調用 Server端的實現方法分佈式

6. 調用結束, 向 Client端返回調用結果ide

 

一. Invoker 類

    InvocationHandler 的實現類高併發

/**
 * InvocationHandler 接口的實現類 <br>
 * Client端代理對象的方法調用都會被 Invoker 的 invoke() 方法捕獲
 */
public class Invoker implements InvocationHandler {
	/** RPC協議接口的 Class對象 */
	private Class<?> intface;
	/** Client 端 Socket */
	private Socket client;
	/** 用於向 Server端發送 RPC請求的輸出流 */
	private ObjectOutputStream oos;
	/** 用於接收 Server端返回的 RPC請求結果的輸入流 */
	private ObjectInputStream ois;

	/**
	 * 構造一個 Socket實例 client, 並鏈接到指定的 Server端地址, 端口
	 * 
	 * @param intface
	 *            RPC協議接口的 Class對象
	 * @param serverAdd
	 *            Server端地址
	 * @param serverPort
	 *            Server端監聽的端口
	 */
	public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException {
		this.intface = intface;
		client = new Socket(serverAdd, serverPort);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

		try {
			// 封裝 RPC請求
			Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args);
			// 打開 client 的輸出流
			oos = new ObjectOutputStream(client.getOutputStream());
			// 序列化, 將 RPC請求寫入到 client 的輸出流中
			oos.writeObject(invocation);
			oos.flush();

			// 等待 Server端返回 RPC請求結果 //

			// 打開 client 的輸入流
			ois = new ObjectInputStream(client.getInputStream());
			// 反序列化, 從輸入流中讀取 RPC請求結果
			Object res = ois.readObject();
			// 向 client 返回 RPC請求結果
			return res;
		} finally { // 關閉資源
			CloseUtil.closeAll(ois, oos);
			CloseUtil.closeAll(client);
		}
	}
}

 

二. Invocation 類

    Serializable 的實現類, RPC請求的封裝

/**
 * RPC調用的封裝, 包括如下字段: <br>
 * methodName: 方法名 <br>
 * parameterTypes: 方法參數列表的 Class 對象數組 <br>
 * params: 方法參數列表
 */
@SuppressWarnings("rawtypes")
public class Invocation implements Serializable {
	private static final long serialVersionUID = -7311316339835834851L;
	/** RPC協議接口的 Class對象 */
	private Class<?> intface;
	/** 方法名 */
	private String methodName;
	/** 方法參數列表的 Class 對象數組 */
	private Class[] parameterTypes;
	/** 方法的參數列表 */
	private Object[] params;

	public Invocation() {
	}

	/**
	 * 構造一個 RPC請求的封裝
	 * 
	 * @param intface
	 *            RPC協議接口的 Class對象
	 * @param methodName
	 *            方法名
	 * @param parameterTypes
	 *            方法參數列表的 Class 對象數組
	 * @param params
	 *            方法的參數列表
	 */
	public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) {
		this.intface = intface;
		this.methodName = methodName;
		this.parameterTypes = parameterTypes;
		this.params = params;
	}

	public Class getIntface() {
		return intface;
	}

	public String getMethodName() {
		return methodName;
	}

	public Class[] getParameterTypes() {
		return parameterTypes;
	}

	public Object[] getParams() {
		return params;
	}
}

 

三.RPC 類

    構造 Client端代理對象, Server端實例

/**
 * 一個構造 Server 端實例與 Client 端代理對象的類
 */
public class RPC {

	/**
	 * 獲取一個 Client 端的代理對象
	 * 
	 * @param intface
	 *            RPC協議接口, Client 與 Server 端共同遵照
	 * @param serverAdd
	 *            Server 端地址
	 * @param serverPort
	 *            Server 端監聽的端口
	 * @return Client 端的代理對象
	 */
	public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort)
			throws UnknownHostException, IOException {

		Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface },
				new Invoker(intface, serverAdd, serverPort));
		return proxy;
	}

	/**
	 * 獲取 RPC 的 Server 端實例
	 * 
	 * @param intface
	 *            RPC協議接口
	 * @param intfaceImpl
	 *            Server 端 RPC協議接口的實現
	 * @param port
	 *            Server 端監聽的端口
	 * @return RPCServer 實例
	 */
	public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		return new RPCServer(intface, intfaceImpl, port);
	}

}

 

四. RPCServer 類

    Server端接收 RPC請求, 處理請求

/**
 * RPC 的 Server端
 */
public class RPCServer {
	/** Server端的 ServerSocket實例 */
	private ServerSocket server;
	/** Server端 RPC協議接口的實現緩存, 一個接口對應一個實現類的實例 */
	private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>();

	/**
	 * 構造一個 RPC 的 Server端實例
	 * 
	 * @param intface
	 *            RPC協議接口的 Class對象
	 * @param intfaceImpl
	 *            Server端 RPC協議接口的實現
	 * @param port
	 *            Server端監聽的端口
	 */
	public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
		server = new ServerSocket(port);
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 循環監聽並接收 Client端鏈接, 處理 RPC請求, 向 Client端返回結果
	 */
	public void start() {
		try {
			while (true) {
				// 接收 Client端鏈接, 建立一個 Handler線程, 處理 RPC請求
				new Handler(server.accept()).start();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally { // 關閉資源
			CloseUtil.closeAll(server);
		}
	}

	/**
	 * 向 RPC協議接口的實現緩存中添加緩存
	 * 
	 * @param intface
	 *            RPC協議接口的 Class對象
	 * @param intfaceImpl
	 *            Server端 RPC協議接口的實現
	 */
	public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) {
		RPCServer.intfaceImpls.put(intface, intfaceImpl);
	}

	/**
	 * 處理 RPC請求的線程類
	 */
	private static class Handler extends Thread {
		/** Server端接收到的 Client端鏈接 */
		private Socket client;
		/** 用於接收 client 的 RPC請求的輸入流 */
		private ObjectInputStream ois;
		/** 用於向 client 返回 RPC請求結果的輸出流 */
		private ObjectOutputStream oos;
		/** RPC請求的封裝 */
		private Invocation invocation;

		/**
		 * 用 Client端鏈接構造 Handler線程
		 * 
		 * @param client
		 */
		public Handler(Socket client) {
			this.client = client;
		}

		@Override
		public void run() {
			try {
				// 打開 client 的輸入流
				ois = new ObjectInputStream(client.getInputStream());
				// 反序列化, 從輸入流中讀取 RPC請求的封裝
				invocation = (Invocation) ois.readObject();
				// 從 RPC協議接口的實現緩存中獲取實現
				Object intfaceImpl = intfaceImpls.get(invocation.getIntface());
				// 獲取 Server端 RPC協議接口的方法實現
				Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(),
						invocation.getParameterTypes());
				// 跳過安全檢查
				method.setAccessible(true);
				// 調用具體的實現方法, 用 res 接收方法返回結果
				Object res = method.invoke(intfaceImpl, invocation.getParams());
				// 打開 client 的輸出流
				oos = new ObjectOutputStream(client.getOutputStream());
				// 序列化, 向輸出流中寫入 RPC請求的結果
				oos.writeObject(res);
				oos.flush();
			} catch (Exception e) {
				e.printStackTrace();
			} finally { // 關閉資源
				CloseUtil.closeAll(ois, oos);
				CloseUtil.closeAll(client);
			}
		}
	}
}

 

五. 測試類

    Login類, RPC協議接口

/**
 * RPC協議接口, Client 與 Server端共同遵照
 */
public interface Login {
	/**
	 * 抽象方法 login(), 模擬用戶登陸傳入兩個String 類型的參數, 返回 String類型的結果
	 * 
	 * @param username
	 *            用戶名
	 * @param password
	 *            密碼
	 * @return 返回登陸結果
	 */
	public String login(String username, String password);
}

 

LoginImpl類, Server 端 RPC協議接口( Login )的實現類

/**
 * Server端 RPC協議接口( Login )的實現類
 */
public class LoginImpl implements Login {
	/**
	 * 實現 login()方法, 模擬用戶登陸
	 * 
	 * @param username
	 *            用戶名
	 * @param password
	 *            密碼
	 * @return hello 用戶名
	 */
	@Override
	public String login(String username, String password) {
		return "hello " + username;
	}
}

 

ClientTest類, Client端測試類

/**
 * Client端測試類
 */
public class ClientTest {
	public static void main(String[] args) throws UnknownHostException, IOException {
		// 獲取一個 Client端的代理對象 proxy
		Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888);
		// 調用 proxy 的 login() 方法, 返回值爲 res
		String res = proxy.login("rpc", "password");
		// 輸出 res
		System.out.println(res);
	}
}

 

ServerTest類, Server端測試類

/**
 * Server端測試類
 */
public class ServerTest {
	public static void main(String[] args) throws ClassNotFoundException, IOException {
		// 獲取 RPC 的 Server 端實例 server
		RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888);
		// 循環監聽並接收 Client 端鏈接, 處理 RPC 請求, 向 Client 端返回結果
		server.start();
	}
}

 

運行 ServerTest, 控制檯輸出: 

Starting Socket Handler for port 8888

 

運行 ClientTest, 控制檯輸出: 

hello rpc

 

至此, 實現了基於 Proxy, Socket, IO 的簡單版 RPC模型,

對於每個 RPC請求, Server端都開啓一個 Handler線程處理該請求,

在高併發狀況下, Server端是扛不住的, 改用 NIO應該表現更好

JDK動態代理的簡單實現

Hadoop中RPC機制簡介

源碼託管在 Git@OSC

相關文章
相關標籤/搜索