前置文章:java
Github Page
:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》Coding Page
:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》在前置的《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》一文中已經定義了一個相對簡單的RPC
私有協議,而且實現了對應的編碼和解碼模塊。這篇文章基於協議篇,完成Server
端代碼調用的編寫。考慮到目前相對主流的IOC
容器是Spring
,這裏選用了spring-boot-starter
(非MVC
容器,只是單純管理Bean
),依賴JDK1.8+
。git
首先RPC
私有協議定義了Client
端會傳過來四個和服務調用息息相關的字符:接口全類名interfaceName
、方法名methodName
、方法參數簽名字符串數組methodArgumentSignatures
(可選,這個參數不是必須傳入的)以及方法參數數組methodArguments
(可選,空方法列表的時候不須要傳入參數)。主要流程以下:github
Server
端的全部服務端(實現)類交由IOC
容器託管。Client
端發起RPC
請求。Server
服務實例的IOC
容器中匹配出吻合度最高的一個方法java.lang.reflect.Method
實例、該方法實例的宿主類以及宿主類對應的Bean
實例,若是這一步匹配的目標方法超過1個或者爲0個,能夠直接返回異常信息。Method
實例、宿主類Bean
實例,結合方法參數數組methodArguments
進行反射調用,獲得調用結果。Server
端把響應結果封裝到payload
經過私有協議發送回Client
端。爲了暫時方便起見,部分數組入參被從新封裝爲ArrayList
,實際上編寫RPC
框架的時候應該優先考慮性能問題,像JDK
提供的集合類庫等等應該儘量少用(以ArrayList
爲例,擴容的時候存在底層Object[]
拷貝,形成性能損失和額外的內存消耗),極盡量使用基本類型和數組。spring
先定義方法匹配器MethodMatcher
相關的類:shell
public interface MethodMatcher { /** * 查找一個匹配度最高的方法信息 * * @param input input * @return output */ MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input); } // 輸入值 @EqualsAndHashCode @Data public class MethodMatchInput { private String interfaceName; private String methodName; private List<String> methodArgumentSignatures; private int methodArgumentArraySize; } // 輸出值 @Data public class MethodMatchOutput { /** * 目標方法實例 */ private Method targetMethod; /** * 目標實現類 - 這個有多是被Cglib加強過的類型,是宿主類的子類,若是沒有被Cglib加強過,那麼它就是宿主類 */ private Class<?> targetClass; /** * 宿主類 */ private Class<?> targetUserClass; /** * 宿主類Bean實例 */ private Object target; /** * 方法參數類型列表 */ private List<Class<?>> parameterTypes; }
目標方法匹配的邏輯大體以下:bootstrap
分析至此,能夠基於反射,編寫一個抽象的方法匹配器BaseMethodMatcher
,而後把獲取宿主類信息的功能委託到子類:數組
public class MethodMatchException extends RuntimeException { public MethodMatchException(String message) { super(message); } public MethodMatchException(String message, Throwable cause) { super(message, cause); } public MethodMatchException(Throwable cause) { super(cause); } } @Data public class HostClassMethodInfo { private Class<?> hostClass; private Class<?> hostUserClass; private Object hostTarget; } @Slf4j abstract class BaseMethodMatcher implements MethodMatcher { private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap(); @Override public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) { return cache.computeIfAbsent(input, in -> { try { MethodMatchOutput output = new MethodMatchOutput(); Class<?> interfaceClass = Class.forName(in.getInterfaceName()); // 獲取宿主類信息 HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass); List<Method> targetMethods = Lists.newArrayList(); ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> { String methodName = method.getName(); Class<?> declaringClass = method.getDeclaringClass(); List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures()) .map(mas -> { List<Class<?>> list = Lists.newArrayList(); mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null))); return list; }).orElse(Lists.newArrayList()); output.setParameterTypes(inputParameterTypes); // 若是傳入了參數簽名列表,優先使用參數簽名列表類型進行匹配 if (!inputParameterTypes.isEmpty()) { List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes()); return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass) && Objects.equals(parameterTypes, inputParameterTypes); } // 若是沒有傳入參數簽名列表,那麼使用參數的數量進行匹配 if (in.getMethodArgumentArraySize() > 0) { List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes()); return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass) && in.getMethodArgumentArraySize() == parameterTypes.size(); } // 若是參數簽名列表和參數列表都沒有傳入,那麼只能經過方法名稱和方法實例的宿主類型匹配 return Objects.equals(methodName, in.getMethodName()) && Objects.equals(info.getHostUserClass(), declaringClass); }); if (targetMethods.size() != 1) { throw new MethodMatchException(String.format("查找到目標方法數量不等於1,interface:%s,method:%s", in.getInterfaceName(), in.getMethodName())); } Method targetMethod = targetMethods.get(0); output.setTargetClass(info.getHostClass()); output.setTargetMethod(targetMethod); output.setTargetUserClass(info.getHostUserClass()); output.setTarget(info.getHostTarget()); return output; } catch (Exception e) { log.error("查找匹配度最高的方法失敗,輸入參數:{}", JSON.toJSONString(in), e); if (e instanceof MethodMatchException) { throw (MethodMatchException) e; } else { throw new MethodMatchException(e); } } }); } /** * 獲取宿主類的信息 * * @param interfaceClass interfaceClass * @return HostClassMethodInfo */ abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass); }
接着,經過接口類型獲取宿主類的功能就委託給Spring
實現,從IOC
容器中獲取,定義SpringMethodMatcher
:緩存
@Component public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware { private DefaultListableBeanFactory beanFactory; @Override public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; } @Override HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) { HostClassMethodInfo info = new HostClassMethodInfo(); // 從容器中經過接口類型獲取對應的實現,實現必須只有一個 Object bean = beanFactory.getBean(interfaceClass); info.setHostTarget(bean); info.setHostClass(bean.getClass()); info.setHostUserClass(ClassUtils.getUserClass(bean.getClass())); return info; } }
至此,目標方法匹配的模塊已經編寫完畢,接下來須要處理方法參數列表的反序列化。編寫協議的時候,筆者把方法參數列表methodArguments
存放在Object
數組中,傳輸的時候序列化爲byte
數組,通過協議解析以後,方法參數列表的實際類型爲ByteBuf
數組(這是由於Netty
中的字節容器就是ByteBuf
),那麼須要考慮把ByteBuf
數組轉換爲目標方法的參數類型實例。主要步驟以下:性能優化
Method#getParameterTypes()
獲得的方法參數列表類型進行轉換。定義一個方法參數轉換器接口MethodArgumentConverter
:框架
public interface MethodArgumentConverter { ArgumentConvertOutput convert(ArgumentConvertInput input); } @Data public class ArgumentConvertInput { /** * 目標方法 */ private Method method; /** * 方法參數類型列表 */ private List<Class<?>> parameterTypes; /** * 方法參數列表 */ private List<Object> arguments; } @Data public class ArgumentConvertOutput { private Object[] arguments; }
方法參數轉換器的默認實現以下:
@Slf4j @Component public class DefaultMethodArgumentConverter implements MethodArgumentConverter { private final Serializer serializer = FastJsonSerializer.X; @Override public ArgumentConvertOutput convert(ArgumentConvertInput input) { ArgumentConvertOutput output = new ArgumentConvertOutput(); try { if (null == input.getArguments() || input.getArguments().isEmpty()) { output.setArguments(new Object[0]); return output; } List<Class<?>> inputParameterTypes = input.getParameterTypes(); int size = inputParameterTypes.size(); if (size > 0) { Object[] arguments = new Object[size]; for (int i = 0; i < size; i++) { ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i); int readableBytes = byteBuf.readableBytes(); byte[] bytes = new byte[readableBytes]; byteBuf.readBytes(bytes); arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i)); byteBuf.release(); } output.setArguments(arguments); return output; } Class<?>[] parameterTypes = input.getMethod().getParameterTypes(); int len = parameterTypes.length; Object[] arguments = new Object[len]; for (int i = 0; i < len; i++) { ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i); int readableBytes = byteBuf.readableBytes(); byte[] bytes = new byte[readableBytes]; byteBuf.readBytes(bytes); arguments[i] = serializer.decode(bytes, parameterTypes[i]); byteBuf.release(); } output.setArguments(arguments); return output; } catch (Exception e) { throw new ArgumentConvertException(e); } } }
全部前置工做都完成了,如今編寫一個Server
端的入站處理器ServerHandler
,暫時不作代碼邏輯優化,只作實現,把反射調用的模塊直接在此類中編寫:
@Component @Slf4j public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> { @Autowired private MethodMatcher methodMatcher; @Autowired private MethodArgumentConverter methodArgumentConverter; @Override protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception { log.info("服務端接收到:{}", packet); MethodMatchInput input = new MethodMatchInput(); input.setInterfaceName(packet.getInterfaceName()); input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures()) .map(Lists::newArrayList).orElse(Lists.newArrayList())); input.setMethodName(packet.getMethodName()); Object[] methodArguments = packet.getMethodArguments(); input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0); MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input); log.info("查找目標實現方法成功,目標類:{},宿主類:{},宿主方法:{}", output.getTargetClass().getCanonicalName(), output.getTargetUserClass().getCanonicalName(), output.getTargetMethod().getName() ); Method targetMethod = output.getTargetMethod(); ArgumentConvertInput convertInput = new ArgumentConvertInput(); convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList()); convertInput.setMethod(output.getTargetMethod()); convertInput.setParameterTypes(output.getParameterTypes()); ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput); ReflectionUtils.makeAccessible(targetMethod); // 反射調用 Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments()); ResponseMessagePacket response = new ResponseMessagePacket(); response.setMagicNumber(packet.getMagicNumber()); response.setVersion(packet.getVersion()); response.setSerialNumber(packet.getSerialNumber()); response.setAttachments(packet.getAttachments()); response.setMessageType(MessageType.RESPONSE); response.setErrorCode(200L); response.setMessage("Success"); response.setPayload(JSON.toJSONString(result)); log.info("服務端輸出:{}", JSON.toJSONString(response)); ctx.writeAndFlush(response); } }
編寫一個Server
的啓動類ServerApplication
,在Spring
容器啓動以後,啓動Netty
服務:
@SpringBootApplication(scanBasePackages = "club.throwable.server") @Slf4j public class ServerApplication implements CommandLineRunner { @Value("${netty.port:9092}") private Integer nettyPort; @Autowired private ServerHandler serverHandler; public static void main(String[] args) throws Exception { SpringApplication.run(ServerApplication.class, args); } @Override public void run(String... args) throws Exception { int port = nettyPort; ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new RequestMessagePacketDecoder()); ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(serverHandler); } }); ChannelFuture future = bootstrap.bind(port).sync(); log.info("啓動NettyServer[{}]成功...", port); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
最後,編寫契約包和契約實現:
- ch0-custom-rpc-protocol 項目根目錄 - club.throwable - utils 工具類 - protocol 協議 - exception 異常 - contract 契約 - HelloService 契約接口 - server 服務端 - contract - DefaultHelloService 契約接口實現
public interface HelloService { String sayHello(String name); } // 實現 @Service public class DefaultHelloService implements HelloService { @Override public String sayHello(String name) { return String.format("%s say hello!", name); } }
先啓動服務端ServerApplication
,再啓動上一節提到的TestProtocolClient
,輸出結果:
// 服務端日誌 2020-01-15 00:05:57.898 INFO 14420 --- [ main] club.throwable.server.ServerApplication : 啓動NettyServer[9092]成功... 2020-01-15 00:06:05.980 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)]) 2020-01-15 00:06:07.448 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-15 00:06:07.521 INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler : 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1} // 客戶端日誌 00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 啓動NettyClient[9092]成功... ...省略... 00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}
可見RPC
調用成功。
編寫RPC
的Server
端技巧在於處理目標方法和宿主類的查找,在轉換方法參數的時候,須要考慮簡化處理和提升效率,剩下的就是作好異常處理和模塊封裝。限於篇幅,後面會先分析Client
端的處理,再分析心跳處理、服務端優化、甚至是對接註冊中心等等,在Netty
、SpringBoot
等優秀框架的加持下編寫一個RPC
框架其實並不困難,困難的是性能優化和生態圈的支持。
Demo
項目地址:
(本文完 c-1-d e-a-20200115)