RPC,全稱爲 Remote Procedure Call,即遠程過程調用,它是一個計算機通訊協議。它容許像調用本地服務同樣調用遠程服務。它能夠有不一樣的實現方式,而不須要了解底層網絡技術的協議。 RPC 協議假定某些傳輸協議的存在,如 TCP 或 UDP,爲通訊程序之間攜帶信息數據。如 RMI(遠程方法調用)、Hessian、Http invoker 等。java
RPC 可以讓本地應用簡單、高效地調用服務器中的過程。它主要應用在分佈式系統。如 Hadoop 中的 IPC 組件。但怎樣實現一個 RPC 框架呢?
能夠從下面幾個方面思考:bash
RPC 架構分爲三部分:服務器
這裏我只介紹服務提供者和客戶端的實現方式。網絡
服務提供者 IHello 接口定義:架構
public interface IHello { String sayHello(String string); }
服務提供者 IHello 接口實現:框架
public class HelloImpl implements IHello { @Override public String sayHello(String string) { return "Hello:" + string; } }
服務端 RpcProxyServer 類:socket
public class RpcProxyServer { ExecutorService executorService = Executors.newCachedThreadPool(); public void publisher(Object service, int port) { ServerSocket serverSocket = null; try { // 啓動 socket 服務 serverSocket = new ServerSocket(port); while (true) { Socket socket = serverSocket.accept(); executorService.execute(new ProcessorHandler(service, socket)); } } catch (IOException e) { e.printStackTrace(); } } }
服務端 RpcRequest 類:分佈式
public class RpcRequest implements Serializable { private static final long serialVersionUID = 383378368319625542L; private String className; private String methodName; private Object[] params; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } @Override public String toString() { return "RpcRequest{" + "className='" + className + '\'' + ", methodName='" + methodName + '\'' + ", params=" + Arrays.toString(params) + '}'; } }
服務端 ProcessorHandler 類:ide
import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; public class ProcessorHandler implements Runnable { Socket socket; Object service; public ProcessorHandler(Object service, Socket socket) { this.socket = socket; this.service = service; } @Override public void run() { System.out.println("begin processor handler!"); ObjectInputStream objectInputStream = null; try { objectInputStream = new ObjectInputStream(socket.getInputStream()); RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object restlt = invoke(rpcRequest); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(restlt); objectOutputStream.flush(); objectInputStream.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (objectInputStream != null) { try { objectInputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } private Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { Object[] args = request.getParams(); Class<?>[] types = new Class[args.length]; for (int i = 0; i < args.length; i++) { types[i] = args[i].getClass(); } Method method = service.getClass().getMethod(request.getMethodName(), types); return method.invoke(service, args); } }
服務端主類 RpcServerMain:oop
public class RpcServerMain { public static void main(String[] args) { IHello hello = new HelloImpl(); RpcProxyServer rpcProxyServer = new RpcProxyServer(); rpcProxyServer.publisher(hello, 8080); System.out.println(hello.sayHello("charles")); } }
客戶端 IHello 類:
public interface IHello { String sayHello(String string); }
客戶端 RpcClientProxy 類:
public class RpcClientProxy { public <T> T clientProxy(Class<T> interfaceCls, String host, int port) { return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port)); } }
客戶端 RpcRequest 類:
public class RpcRequest implements Serializable { private static final long serialVersionUID = 383378368319625542L; private String className; private String methodName; private Object[] params; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } @Override public String toString() { return "RpcRequest{" + "className='" + className + '\'' + ", methodName='" + methodName + '\'' + ", params=" + Arrays.toString(params) + '}'; } }
客戶端 RpcNetTransport 類:
import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.net.Socket; public class RpcNetTransport { String host; int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } private Socket createSocket() { System.out.println("Begin create socket connect!"); Socket socket = null; try { socket = new Socket(host, port); } catch (Exception e) { throw new RuntimeException("build connect failed."); } return socket; } public Object send(RpcRequest rpcRequest) { Socket socket = null; try { socket = createSocket(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); // 返回結果接收 ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); Object resultObject = objectInputStream.readObject();// 反序列化 對象 objectInputStream.close(); objectOutputStream.close(); return resultObject; } catch (Exception e) { throw new RuntimeException("send request exception:" + e); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客戶端 RemoteInvocationHandler 類:
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; public class RemoteInvocationHandler implements InvocationHandler { String host; int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParams(args); RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port); return rpcNetTransport.send(rpcRequest); } }
客戶端主類 RpcClientMain:
public class RpcClientMain { public static void main(String[] args) { RpcClientProxy rpcClientProxy = new RpcClientProxy(); IHello hello = rpcClientProxy.clientProxy(IHello.class, "localhost", 8080); System.out.println(hello.sayHello("charles")); } }
項目啓動後客戶端向服務端發送了一條消息,分別運行兩個項目後輸出結果以下
服務端:
begin processor handler!
客戶端:
Begin create socket connect! Hello:charles
RPC 本質爲消息處理模型,RPC 屏蔽了底層不一樣主機間的通訊細節,讓進程調用遠程的服務就像是本地的服務同樣。