前置文章:java
前一篇文章相對簡略地介紹了RPC
服務端的編寫,而這篇博文最要介紹服務端(Client
)的實現。RPC
調用通常是面向契約編程的,而Client
的核心功能就是:把契約接口方法的調用抽象爲使用Netty
向RPC
服務端經過私有協議發送一個請求。這裏最底層的實現依賴於動態代理,所以動態代理是動態實現接口的最簡單方式(若是字節碼研究得比較深刻,能夠經過字節碼編程實現接口)。須要的依賴以下:git
JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE
通常能夠經過JDK
動態代理或者Cglib
的字節碼加強來實現此功能,爲了簡單起見,不引入額外的依賴,這裏選用JDK
動態代理。這裏從新搬出前面提到的契約接口HelloService
:github
public interface HelloService { String sayHello(String name); }
接下來須要經過動態代理爲此接口添加一個實現:shell
public class TestDynamicProxy { public static void main(String[] args) throws Exception { Class<HelloService> interfaceKlass = HelloService.class; InvocationHandler handler = new HelloServiceImpl(interfaceKlass); HelloService helloService = (HelloService) Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler); System.out.println(helloService.sayHello("throwable")); } @RequiredArgsConstructor private static class HelloServiceImpl implements InvocationHandler { private final Class<?> interfaceKlass; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 這裏應該根據方法的返回值類型去決定返回結果 return String.format("[%s#%s]方法被調用,參數列表:%s", interfaceKlass.getName(), method.getName(), JSON.toJSONString(args)); } } } // 控制檯輸出結果 [club.throwable.contract.HelloService#sayHello]方法被調用,參數列表:["throwable"]
這裏能夠確認兩點:編程
InvocationHandler
實現後會對被代理接口生成一個動態實現類。InvocationHandler
對應實例的invoke()
方法,傳入的參數就是當前方法調用的元數據。Client
端須要經過動態代理爲契約接口生成一個動態實現類,而後提取契約接口調用方法時候所能提供的元數據,經過這些元數據和Netty
客戶端的支持(例如Netty
的Channel
)基於私有RPC
協議組裝請求信息而且發送請求。這裏先定義一個請求參數提取器接口RequestArgumentExtractor
:bootstrap
@Data public class RequestArgumentExtractInput { private Class<?> interfaceKlass; private Method method; } @Data public class RequestArgumentExtractOutput { private String interfaceName; private String methodName; private List<String> methodArgumentSignatures; } // 接口 public interface RequestArgumentExtractor { RequestArgumentExtractOutput extract(RequestArgumentExtractInput input); }
簡單實現一下,解析結果添加到緩存中,實現類DefaultRequestArgumentExtractor
代碼以下:緩存
public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor { private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap(); @Override public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) { Class<?> interfaceKlass = input.getInterfaceKlass(); Method method = input.getMethod(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName, Lists.newArrayList(parameterTypes)), x -> { RequestArgumentExtractOutput output = new RequestArgumentExtractOutput(); output.setInterfaceName(interfaceKlass.getName()); List<String> methodArgumentSignatures = Lists.newArrayList(); for (Class<?> klass : parameterTypes) { methodArgumentSignatures.add(klass.getName()); } output.setMethodArgumentSignatures(methodArgumentSignatures); output.setMethodName(methodName); return output; }); } @RequiredArgsConstructor private static class CacheKey { private final String interfaceName; private final String methodName; private final List<Class<?>> parameterTypes; @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CacheKey cacheKey = (CacheKey) o; return Objects.equals(interfaceName, cacheKey.interfaceName) && Objects.equals(methodName, cacheKey.methodName) && Objects.equals(parameterTypes, cacheKey.parameterTypes); } @Override public int hashCode() { return Objects.hash(interfaceName, methodName, parameterTypes); } } }
在不考慮重連、斷連等狀況下,新增一個類ClientChannelHolder
用於保存Netty
客戶端的Channel
實例:網絡
public class ClientChannelHolder { public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>(); }
接着新增一個契約動態代理工廠(工具類)ContractProxyFactory
,用於爲契約接口生成代理類實例:框架
public class ContractProxyFactory { private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor(); private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap(); @SuppressWarnings("unchecked") public static <T> T ofProxy(Class<T> interfaceKlass) { // 緩存契約接口的代理類實例 return (T) CACHE.computeIfAbsent(interfaceKlass, x -> Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> { RequestArgumentExtractInput input = new RequestArgumentExtractInput(); input.setInterfaceKlass(interfaceKlass); input.setMethod(method); RequestArgumentExtractOutput output = EXTRACTOR.extract(input); // 封裝請求參數 RequestMessagePacket packet = new RequestMessagePacket(); packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER); packet.setVersion(ProtocolConstant.VERSION); packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber()); packet.setMessageType(MessageType.REQUEST); packet.setInterfaceName(output.getInterfaceName()); packet.setMethodName(output.getMethodName()); packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0])); packet.setMethodArguments(args); Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get(); // 發起請求 channel.writeAndFlush(packet); // 這裏方法返回值須要進行同步處理,相對複雜,後面專門開一篇文章講解,暫時統一返回字符串 // 若是契約接口的返回值類型不是字符串,這裏方法返回後會拋出異常 return String.format("[%s#%s]調用成功,發送了[%s]到NettyServer[%s]", output.getInterfaceName(), output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress()); })); } }
最後編寫客戶端ClientApplication
的代碼:異步
@Slf4j public class ClientApplication { public static void main(String[] args) throws Exception { int port = 9092; EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE); bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new ResponseMessagePacketDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception { Object targetPayload = packet.getPayload(); if (targetPayload instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) targetPayload; int readableByteLength = byteBuf.readableBytes(); byte[] bytes = new byte[readableByteLength]; byteBuf.readBytes(bytes); targetPayload = FastJsonSerializer.X.decode(bytes, String.class); byteBuf.release(); } packet.setPayload(targetPayload); log.info("接收到來自服務端的響應消息,消息內容:{}", JSON.toJSONString(packet)); } }); } }); ChannelFuture future = bootstrap.connect("localhost", port).sync(); // 保存Channel實例,暫時不考慮斷連重連 ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel()); // 構造契約接口代理類實例 HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class); String result = helloService.sayHello("throwable"); log.info(result); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
先啓動《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》一文中的ServerApplication
,再啓動ClientApplication
,控制檯輸出以下:
// 服務端日誌 2020-01-16 22:34:51 [main] INFO c.throwable.server.ServerApplication - 啓動NettyServer[9092]成功... 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)]) 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1} // 客戶端日誌 2020-01-16 22:36:35 [main] INFO c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]調用成功,發送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092] 2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO c.throwable.client.ClientApplication - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
Client
端主要負責契約接口調用轉換爲發送RPC
協議請求這一步,核心技術就是動態代理,在不進行模塊封裝優化的前提下實現是相對簡單的。這裏其實Client
端還有一個比較大的技術難題沒有解決,上面例子中客戶端日誌輸出若是眼尖的夥伴會發現,Client
端發送RPC
請求的線程(main
線程)和Client
端接收Server
端RPC
響應處理的線程(nioEventLoopGroup-2-1
線程)並不相同,這一點是Netty
處理網絡請求之因此可以如此高效的根源(簡單來講就是請求和響應是異步的,兩個流程原本是互不感知的)。可是更多狀況下,咱們但願外部請求是同步的,但願發送RPC
請求的線程獲得響應結果再返回(這裏請求和響應有可能依然是異步流程)。下一篇文章會詳細分析一下若是對請求-響應作同步化處理。
Demo
項目地址:
(c-2-d e-a-20200116)