前言java
RPC 的全稱是 Remote Procedure Call,它是一種進程間通訊方式。容許像調用本地服務同樣調用遠程服務。架構
學習來源:《分佈式系統架構:原理與實踐》 - 李林鋒框架
1.RPC 框架原理socket
RPC 框架的目標就是讓遠程過程(服務)調用更加簡單、透明,RPC框架負責屏蔽底層的傳輸方式(TCP 或者 UDP)、序列化方式(XML、JSON、二進制)和通訊細節。分佈式
框架使用者只須要了解誰在什麼位置,提供了什麼樣的遠程服務接口便可,開發者不須要關心底層通訊細節和調用過程。ide
2.最簡單的 RPC 框架實現post
· 下面經過 JAVA 原生的序列化、TCP Socket通訊、動態代理和反射機制,實現最簡單的 RPC 框架。它由三部分組成:學習
- 服務提供者:它運行在服務端,負責提供服務接口定義和服務實現類。(EchoService 和 EchoServiceImpl)
- 服務發佈者,它運行在 RPC 服務端,負責將本地服務發佈成遠程服務,供其餘消費者調用。(RPCExporter)
- 本地服務代理,它運行在 RPC 客戶端,經過代理調用遠程服務提供者,而後將結果進行封裝返回給本地消費者。(RPCImporter)
下面看具體代碼,首先是服務端接口定義和服務實現類。測試
代碼清單 :EchoService this
package com.rpc.test; /** * @Description - 調用接口 * @Author zww * @Date 2018/12/10 17:29 */ public interface EchoService { String echo(String ping); }
代碼清單:EchoServiceImpl
package com.rpc.test; /** * @Description - 調用接口實現 * @Author zww * @Date 2018/12/10 17:30 */ public class EchoServiceImpl implements EchoService { @Override public String echo(String ping) { return ping != null ? ping + "挺不錯的。" : "挺不錯的。"; } }
RPC 服務端發佈者代碼實現以下:
代碼清單:RPCExporter
package com.rpc.test; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * @Description - 服務端發佈者(提供服務) * @Author zww * @Date 2018/12/10 17:33 */ public class RPCExporter { //線程池 static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public static void exporter(String hostName, int port) throws Exception { ServerSocket server = new ServerSocket(); //店家 server.bind(new InetSocketAddress(hostName, port)); //開店地址 try { while (true) { //開啓營業模式 executor.execute(new ExporterTask(server.accept())); //accept : 來客人了 } } finally { server.close(); } } //根據約定規則解析請求,返回結果 private static class ExporterTask implements Runnable { Socket client = null; //客戶 public ExporterTask(Socket client) { this.client = client; } @Override public void run() { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { System.out.println("老闆娘:誒,來咯!您要點什麼!"); inputStream = new ObjectInputStream(client.getInputStream()); //接收請求 System.out.println("老闆娘:要個回鍋肉!"); String interfaceName = inputStream.readUTF(); Class<?> service = Class.forName(interfaceName); System.out.println("老闆娘:微辣!"); String methodName = inputStream.readUTF(); System.out.println("老闆娘:少油!"); Class<?>[] parameterType = (Class<?>[])inputStream.readObject(); System.out.println("老闆娘:別放香菜!"); Object[] arguments = (Object[])inputStream.readObject(); System.out.println("老闆娘:作菜快點!"); Method method = service.getMethod(methodName, parameterType); Object result = method.invoke(service.newInstance(), arguments); System.out.println("老闆娘:老頭子,聽清沒!"); System.out.println("老闆悶頭作菜中!!!!"); System.out.println("老闆娘:帥哥,你的菜好了!"); outputStream = new ObjectOutputStream(client.getOutputStream()); //返回結果 outputStream.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if (outputStream != null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (client != null) { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
服務發佈者的主要職責:
- 監聽客戶端的 TCP 鏈接,接收到新的客戶端鏈接以後,將其封裝成 Task,由線程池執行。
- 將客戶端發送的碼流反序列化成對象,反射調用服務實現者,獲取執行結果。
- 將執行結果反序列化,經過 Socket 發送給客戶端。
- 遠程服務調用完成後,釋放 Socket 等鏈接資源,防止句柄泄露。
RPC 客戶端本地服務代理代碼:
代碼清單:RPCImporter
package com.rpc.test; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; /** * @Description - 路人(請求服務) * @Author zww * @Date 2018/12/11 10:31 */ public class RPCImporter<S> { public S importer(final Class<?> serviceClass, final InetSocketAddress address) { //啓用遠端代理 return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{ serviceClass.getInterfaces()[0] }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { socket = new Socket(); socket.connect(address); //使用 TCP 方式請求遠端方法,如下爲約定的傳輸方式 System.out.println("路人:老闆娘,點菜咯!"); outputStream = new ObjectOutputStream(socket.getOutputStream()); //發送請求 System.out.println("路人:要個回鍋肉"); outputStream.writeUTF(serviceClass.getName()); System.out.println("路人:微辣!"); outputStream.writeUTF(method.getName()); System.out.println("路人:少油!"); outputStream.writeObject(method.getParameterTypes()); System.out.println("路人:別放香菜!"); outputStream.writeObject(args); System.out.println("路人:上菜快點!"); inputStream = new ObjectInputStream(socket.getInputStream()); //獲取結果 System.out.println("路人:吧唧吧唧!"); return inputStream.readObject(); } finally { if (socket != null) socket.close(); } } }); } }
本地服務代理的主要功能以下:
- 將本地的接口調用轉換成 JDK 的動態代理,在動態代理中實現接口的遠程調用。
- 建立 Socket 客戶端,根據指定地址連接遠程服務提供者。
- 將遠程服務調用所需的接口類、方法名、參數列表、返回參數 等編碼後發送給服務提供者。
- 同步阻塞服務端返回應答,獲取應答以後返回。
最後:編寫測試代碼,並看看執行結果
package com.rpc.test; import java.net.InetSocketAddress; /** * 測試類 */ public class TestApplication { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { //啓用服務提供端(設置 地址端口) RPCExporter.exporter("localhost", 8080); } catch (Exception e) { e.printStackTrace(); } } }).start(); //發起服務請求 RPCImporter<EchoService> importer = new RPCImporter<>(); //使用遠端代理(訪問 地址端口) EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress("localhost", 8080)); System.out.println(echo.echo("這家店味道咋樣? \n")); } }
執行測試結果:
Connected to the target VM, address: '127.0.0.1:57656', transport: 'socket' 路人:老闆娘,點菜咯! 老闆娘:誒,來咯!您要點什麼! 路人:要個回鍋肉 路人:微辣! 路人:少油! 路人:別放香菜! 路人:上菜快點! 老闆娘:要個回鍋肉! 老闆娘:微辣! 老闆娘:少油! 老闆娘:別放香菜! 老闆娘:作菜快點! 老闆娘:老頭子,聽清沒! 老闆悶頭作菜中!!!! 老闆娘:帥哥,你的菜好了! 路人:吧唧吧唧!味道不錯 這家店味道咋樣? 挺不錯的。