原文地址: haifeiWu和他朋友們的博客
博客地址:www.hchstudio.cn
歡迎轉載,轉載請註明做者及出處,謝謝!java
服務端開發都會或多或少的涉及到 RPC 的使用,固然若是止步於會用,對本身的成長非常不利,因此樓主今天本着知其然,且知其因此然的精神來探討一下 RPC 這個東西。
git
child-rpc 採用 socket 直連的方式來實現服務的遠程調用,而後使用 jdk 動態代理的方式讓調用者感知不到遠程調用。
github
RPC 服務類要監聽指定IP端口,設置要發佈的服務的實現及其接口的引用,並指定序列化的方式,目前 child-rpc 支持 Hessian,JACKSON 兩種序列化方式。json
/** * @author wuhf * @Date 2018/9/1 18:30 **/ public class ServerTest { public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig(); serverConfig.setSerializer(Serializer.SerializeEnum.HESSIAN.serializer) .setPort(5201) .setInterfaceId(HelloService.class.getName()) .setRef(HelloServiceImpl.class.getName()); ServerProxy serverProxy = new ServerProxy(new NettyServer(),serverConfig); try { serverProxy.export(); while (true){ } } catch (Exception e) { e.printStackTrace(); } } }
RPC 客戶端要連接遠程 IP 端口,並註冊要引用的服務,而後調用 sayHi 方法,輸出結果服務器
/** * @author wuhf * @Date 2018/9/1 18:31 **/ public class ClientTest { public static void main(String[] args) { ClientConfig clientConfig = new ClientConfig(); clientConfig.setHost("127.0.0.1") .setPort(5201) .setTimeoutMillis(100000) .setSerializer(Serializer.SerializeEnum.HESSIAN.serializer); ClientProxy clientProxy = new ClientProxy(clientConfig,new NettyClient(),HelloService.class); for (int i = 0; i < 10; i++) { HelloService helloService = (HelloService) clientProxy.refer(); System.out.println(helloService.sayHi()); } } }
server 端輸出
網絡
client 端輸出
框架
定義消息請求響應格式,消息類型、消息惟一 ID 和消息的 json 序列化字符串內容。消息惟一 ID 是用來客戶端驗證服務器請求和響應是否匹配。socket
// rpc 請求 public class RpcRequest implements Serializable { private static final long serialVersionUID = -4364536436151723421L; private String requestId; private long createMillisTime; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; // set get 方法省略掉 } // rpc 響應 public class RpcResponse implements Serializable { private static final long serialVersionUID = 7329530374415722876L; private String requestId; private Throwable error; private Object result; // set get 方法省略掉 }
消息編碼解碼使用自定義的編解碼器,根據服務初始化是使用的序列化器來將數據序列化成字節流,拆包的策略是設定指定長度的數據包,對 socket 粘包,拆包感興趣的小夥伴請移步 Socket 中粘包問題淺析及其解決方案ide
下面是解碼器代碼實現 :this
public class NettyDecoder extends ByteToMessageDecoder { private Class<?> genericClass; private Serializer serializer; public NettyDecoder(Class<?> genericClass, Serializer serializer) { this.genericClass = genericClass; this.serializer = serializer; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() < 4) { return; } byteBuf.markReaderIndex(); // 讀取消息長度 int dataLength = byteBuf.readInt(); if (dataLength < 0) { channelHandlerContext.close(); } if (byteBuf.readableBytes() < dataLength) { byteBuf.resetReaderIndex(); return; } try { byte[] data = new byte[dataLength]; byteBuf.readBytes(data); Object object = serializer.deserialize(data,genericClass); list.add(object); } catch (Exception e) { e.printStackTrace(); } } }
下面是編碼器的實現:
public class NettyEncoder extends MessageToByteEncoder<Object> { private Class<?> genericClass; private Serializer serializer; public NettyEncoder(Class<?> genericClass,Serializer serializer) { this.serializer = serializer; this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object object, ByteBuf byteBuf) throws Exception { if (genericClass.isInstance(object)) { byte[] data = serializer.serialize(object); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } } }
server 端業務處理 handler 實現 : 主要業務邏輯是 經過 java 的反射實現方法的調用。
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception { // invoke 經過調用反射方法獲取 rpcResponse RpcResponse response = RpcInvokerHandler.invokeService(rpcRequest); channelHandlerContext.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error(">>>>>>>>>>> child-rpc provider netty server caught exception", cause); ctx.close(); } } public class RpcInvokerHandler { public static Map<String, Object> serviceMap = new HashMap<String, Object>(); public static RpcResponse invokeService(RpcRequest request) throws ClassNotFoundException, IllegalAccessException, InstantiationException { Object serviceBean = serviceMap.get(request.getClassName()); RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object result = method.invoke(serviceBean, parameters); response.setResult(result); } catch (Throwable t) { t.printStackTrace(); response.setError(t); } return response; } }
client 端主要業務實現是等待 server 響應返回。代碼比較簡單就不貼代碼了,詳情請看下面給出的 github 連接。
由於服務端與客戶端啓動都是 Netty 的模板代碼,由於篇幅緣由就不貼出來了,感興趣的夥伴請移步 造個輪子---RPC動手實現。
由於只是爲了理解 RPC 的本質,因此在實現細節上還有好多沒有仔細去雕琢的地方。不過 RPC 的目的就是容許像調用本地服務同樣調用遠程服務,對調用者透明,因而咱們使用了動態代理。並使用 Netty 的 handler 發送數據和響應數據,總的來講該框架實現了簡單的 RPC 調用。代碼比較簡單,主要是思路,以及瞭解 RPC 底層的實現。