RPC(Remote Procedure Call)即遠程過程調用,簡單的說就是在A機器上去調用B機器上的某個方法,在分佈式系統中極其經常使用。html
rpc原理其實很簡單,比較容易理解,在rpc中有兩個角色,rpc server和rpc client,先從client開始討論,由於client要像調用普通類的方法那樣調用server端的方法,好比client要調用server端的Foo#bar()方法,因此它必須首先獲取到Foo的實例,可是又不能直接new,由於直接new的話仍是至關於在本地調用,因此這個時候就必須有個什麼機制可以把Foo包一下,使得表面上看起來和Foo徹底同樣可是調用它的bar方法時底層替換爲去調用server的bar,這個機制就是代理,代理提供了一種相似於攔截的機制,能夠把調用bar方法替換成爲本身的實現,好比本地調用bar方法大體執行過程(粗糙歸納):java
execute bar() return result
代理替換以後的bar方法(粗糙歸納):json
call rpc server execute bar() get result from server return result
第一步的call rpc server execute bar(),client如何告訴server本身要調用哪一個方法呢,這個方法就比較多了,比較常見的是約定一種協議,好比第幾個字節是表示的嘛意思,而後server接收後解析按照指令執行就能夠了,這樣網絡傳輸的數據比較少,或者不太講究的直接將現成的協議拿過來用,好比經過socket直接傳json、傳xml、傳對象流等等,再或者甚至用http請求的,反正可以把本身要調用哪一個方法告訴server,同時還有調用方法時須要傳遞的參數,而後等待server執行完獲取到其結果就能夠了。網絡
而後就是server端的處理,若是是使用socket傳輸數據的話,server應該啓動一個服務監聽在約定的端口(不約定好的話客戶端不知道去連誰啊),一個while循環不斷地等待客戶端的鏈接,每來一個客戶端就啓動一個新的線程去處理(此處沒有考慮高併發狀況下的負載和優化,只是基本的實現),在新線程中讀取socket流看客戶端要調用哪一個方法,而後調用本地的此方法,調用的時候將client傳過來的參數傳入進去,待方法執行完再傳回給client,傳回的方法和client傳數據過來相同,無非是走socket自定義協議、xml、json、http等等,再而後client讀取到結果返回,一次rpc調用就完成了。至此,一個簡單的rpc框架的雛形已經完成。併發
這一章節基於上面討論的rpc調用的過程,實現一個簡單的rpc框架,其中代理使用JDK提供的代理實現,傳輸層使用Java的ObjectInputStream和ObjectOutputStream實現。app
定義一個工具類,提供兩個方法,分別用於服務端啓動rpc server和客戶端獲取相關serviceProvider的代理對象。框架
RpcServiceProviderUtil.java:dom
package cc11001100.diySimpleRpcFramework.util; import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.ServerSocket; import java.net.Socket; import static cc11001100.diySimpleRpcFramework.util.RpcLogUtil.info; /** * @author CC11001100 */ public class RpcServiceProviderUtil { /** * 在指定的端口上啓動rpc服務,用於server端啓動rpc服務 * * @param port * @throws IOException */ public static <T> void startService(T object, int port) throws IOException { ServerSocket serverSocket = new ServerSocket(port); while (true) { final Socket socket = serverSocket.accept(); info(socket, "start"); new Thread(() -> { ObjectInputStream ois = null; ObjectOutputStream oos = null; try { // 從輸入流中讀取要調用的方法名和調用時傳入的參數 ois = new ObjectInputStream(socket.getInputStream()); String methodName = ois.readUTF(); Class[] parameterTypes = (Class[]) ois.readObject(); Object[] parameterValues = (Object[]) ois.readObject(); Method method = object.getClass().getMethod(methodName, parameterTypes); // 調用方法執行 info(socket, "begin invoke method ", methodName); long start = System.currentTimeMillis(); Object invoke = method.invoke(object, parameterValues); long cost = System.currentTimeMillis() - start; info(socket, "exec method ", methodName, " done, cost=", cost, "ms"); // 將執行結果傳回調用端 oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(invoke); } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { // 若是發生了異常,反饋給調用端 try { info(socket, "exec exception, e=", e.getClass(), ", cause=", e.getCause()); oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(e); } catch (IOException e1) { e1.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } finally { close(ois); close(oos); close(socket); } info(socket, "end"); }).start(); } } private static void close(Closeable closeable) { if (closeable != null) { try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 用於客戶端獲取rpc的endpoint * * @param clazz * @param remoteHost * @param remotePort * @param <T> * @return */ public static <T> T wrap(Class<T> clazz, String remoteHost, int remotePort) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), clazz.getInterfaces(), (proxy, method, args) -> { // 獲取要調用的方法的名字 String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // 調用rpc server端去執行 Socket socket = new Socket(remoteHost, remotePort); ObjectOutputStream oos = null; ObjectInputStream ois = null; Object result = null; try { oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeUTF(methodName); oos.writeObject(parameterTypes); oos.writeObject(args); // 讀取rpc server端執行結果 ois = new ObjectInputStream(socket.getInputStream()); result = ois.readObject(); // 此處不catch,執行時出了異常儘管拋出 } finally { close(ois); close(oos); close(socket); } // 檢測server端執行是否拋了異常 if (result != null && result instanceof Throwable) { throw (Throwable) result; } return result; }); } }
RpcLogUtil.java:socket
package cc11001100.diySimpleRpcFramework.util; import java.net.Socket; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * @author CC11001100 */ public class RpcLogUtil { private static String now() { return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")); } public static void info(Socket socket, Object... messages) { String remoteAddress = socket.getRemoteSocketAddress().toString(); StringBuilder sb = new StringBuilder(); sb.append("[").append(now()).append("]") .append(" - ").append(remoteAddress).append(":").append(" - "); for (Object msg : messages) { sb.append(msg.toString()); } System.out.println(sb.toString()); } }
由於代理是使用JDK提供的代理機制實現的,這種代理方式要求必需要定義一個接口而後實現它,因此首先定義一個接口:分佈式
package cc11001100.diySimpleRpcFramework.rpcServiceProvider; public interface FooRpcServiceProvider { int add(int a, int b); }
而後實現它:
package cc11001100.diySimpleRpcFramework.rpcServiceProvider; /** * @author CC11001100 */ public class FooRpcServiceProviderImpl implements FooRpcServiceProvider { @Override public int add(int a, int b) { return a + b; } }
測試一下此RPC server是否可用,先啓動一個rpc server:
package cc11001100.diySimpleRpcFramework.test; import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl; import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil; import java.io.IOException; /** * 啓動rpc server端 * * @author CC11001100 */ public class FooRpcServiceProviderServerTest { public static void main(String[] args) throws IOException { RpcServiceProviderUtil.startService(new FooRpcServiceProviderImpl(), 10086); } }
而後啓動client去調用server:
package cc11001100.diySimpleRpcFramework.test; import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProvider; import cc11001100.diySimpleRpcFramework.rpcServiceProvider.FooRpcServiceProviderImpl; import cc11001100.diySimpleRpcFramework.util.RpcServiceProviderUtil; import java.util.Random; import java.util.concurrent.TimeUnit; /** * rpc客戶端調用 * * @author CC11001100 */ public class FooRpcServiceProviderClientTest { public static void main(String[] args) throws InterruptedException { FooRpcServiceProvider foo = RpcServiceProviderUtil.wrap(FooRpcServiceProviderImpl.class, "localhost", 10086); Random random = new Random(); while (true) { int a = random.nextInt(10); int b = random.nextInt(10); int result = foo.add(a, b); System.out.printf("%d + %d = %d\n", a, b, result); TimeUnit.MILLISECONDS.sleep(random.nextInt(900) + 100); } } }
控制檯輸出:
我寫的rpc server精通10之內加法,這點client能夠做證。
.