基於Netty和SpringBoot實現一個輕量級RPC框架-Client篇

前提

前置文章:java

前一篇文章相對簡略地介紹了RPC服務端的編寫,而這篇博文最要介紹服務端(Client)的實現。RPC調用通常是面向契約編程的,而Client的核心功能就是:把契約接口方法的調用抽象爲使用NettyRPC服務端經過私有協議發送一個請求。這裏最底層的實現依賴於動態代理,所以動態代理是動態實現接口的最簡單方式(若是字節碼研究得比較深刻,能夠經過字節碼編程實現接口)。須要的依賴以下:git

  • JDK1.8+
  • Netty:4.1.44.Final
  • SpringBoot:2.2.2.RELEASE

動態代理的簡單使用

通常能夠經過JDK動態代理或者Cglib的字節碼加強來實現此功能,爲了簡單起見,不引入額外的依賴,這裏選用JDK動態代理。這裏從新搬出前面提到的契約接口HelloServicegithub

public interface HelloService {

    String sayHello(String name);
}

接下來須要經過動態代理爲此接口添加一個實現:shell

public class TestDynamicProxy {

    public static void main(String[] args) throws Exception {
        Class<HelloService> interfaceKlass = HelloService.class;
        InvocationHandler handler = new HelloServiceImpl(interfaceKlass);
        HelloService helloService = (HelloService)
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler);
        System.out.println(helloService.sayHello("throwable"));
    }

    @RequiredArgsConstructor
    private static class HelloServiceImpl implements InvocationHandler {

        private final Class<?> interfaceKlass;

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 這裏應該根據方法的返回值類型去決定返回結果
            return String.format("[%s#%s]方法被調用,參數列表:%s", interfaceKlass.getName(), method.getName(),
                    JSON.toJSONString(args));
        }
    }
}
// 控制檯輸出結果
[club.throwable.contract.HelloService#sayHello]方法被調用,參數列表:["throwable"]

這裏能夠確認兩點:編程

  1. InvocationHandler實現後會對被代理接口生成一個動態實現類。
  2. 動態實現類(接口)方法被調用的時候,其實是調用InvocationHandler對應實例的invoke()方法,傳入的參數就是當前方法調用的元數據。

Client端代碼實現

Client端須要經過動態代理爲契約接口生成一個動態實現類,而後提取契約接口調用方法時候所能提供的元數據,經過這些元數據和Netty客戶端的支持(例如NettyChannel)基於私有RPC協議組裝請求信息而且發送請求。這裏先定義一個請求參數提取器接口RequestArgumentExtractorbootstrap

@Data
public class RequestArgumentExtractInput {

    private Class<?> interfaceKlass;

    private Method method;
}

@Data
public class RequestArgumentExtractOutput {

    private String interfaceName;

    private String methodName;

    private List<String> methodArgumentSignatures;
}

// 接口
public interface RequestArgumentExtractor {

    RequestArgumentExtractOutput extract(RequestArgumentExtractInput input);
}

簡單實現一下,解析結果添加到緩存中,實現類DefaultRequestArgumentExtractor代碼以下:緩存

public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {

    private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();

    @Override

    public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
        Class<?> interfaceKlass = input.getInterfaceKlass();
        Method method = input.getMethod();
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
                Lists.newArrayList(parameterTypes)), x -> {
            RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
            output.setInterfaceName(interfaceKlass.getName());
            List<String> methodArgumentSignatures = Lists.newArrayList();
            for (Class<?> klass : parameterTypes) {
                methodArgumentSignatures.add(klass.getName());
            }
            output.setMethodArgumentSignatures(methodArgumentSignatures);
            output.setMethodName(methodName);
            return output;
        });
    }

    @RequiredArgsConstructor
    private static class CacheKey {

        private final String interfaceName;
        private final String methodName;
        private final List<Class<?>> parameterTypes;

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            CacheKey cacheKey = (CacheKey) o;
            return Objects.equals(interfaceName, cacheKey.interfaceName) &&
                    Objects.equals(methodName, cacheKey.methodName) &&
                    Objects.equals(parameterTypes, cacheKey.parameterTypes);
        }

        @Override
        public int hashCode() {
            return Objects.hash(interfaceName, methodName, parameterTypes);
        }
    }
}

在不考慮重連、斷連等狀況下,新增一個類ClientChannelHolder用於保存Netty客戶端的Channel實例:網絡

public class ClientChannelHolder {

    public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
}

接着新增一個契約動態代理工廠(工具類)ContractProxyFactory,用於爲契約接口生成代理類實例:框架

public class ContractProxyFactory {

    private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
    private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();

    @SuppressWarnings("unchecked")
    public static <T> T ofProxy(Class<T> interfaceKlass) {
        // 緩存契約接口的代理類實例
        return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
                    RequestArgumentExtractInput input = new RequestArgumentExtractInput();
                    input.setInterfaceKlass(interfaceKlass);
                    input.setMethod(method);
                    RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
                    // 封裝請求參數
                    RequestMessagePacket packet = new RequestMessagePacket();
                    packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
                    packet.setVersion(ProtocolConstant.VERSION);
                    packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
                    packet.setMessageType(MessageType.REQUEST);
                    packet.setInterfaceName(output.getInterfaceName());
                    packet.setMethodName(output.getMethodName());
                    packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
                    packet.setMethodArguments(args);
                    Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
                    // 發起請求
                    channel.writeAndFlush(packet);
                    // 這裏方法返回值須要進行同步處理,相對複雜,後面專門開一篇文章講解,暫時統一返回字符串
                    // 若是契約接口的返回值類型不是字符串,這裏方法返回後會拋出異常
                    return String.format("[%s#%s]調用成功,發送了[%s]到NettyServer[%s]", output.getInterfaceName(),
                            output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
                }));
    }
}

最後編寫客戶端ClientApplication的代碼:異步

@Slf4j
public class ClientApplication {

    public static void main(String[] args) throws Exception {
        int port = 9092;
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                    ch.pipeline().addLast(new LengthFieldPrepender(4));
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
                    ch.pipeline().addLast(new ResponseMessagePacketDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
                            Object targetPayload = packet.getPayload();
                            if (targetPayload instanceof ByteBuf) {
                                ByteBuf byteBuf = (ByteBuf) targetPayload;
                                int readableByteLength = byteBuf.readableBytes();
                                byte[] bytes = new byte[readableByteLength];
                                byteBuf.readBytes(bytes);
                                targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
                                byteBuf.release();
                            }
                            packet.setPayload(targetPayload);
                            log.info("接收到來自服務端的響應消息,消息內容:{}", JSON.toJSONString(packet));
                        }
                    });
                }
            });
            ChannelFuture future = bootstrap.connect("localhost", port).sync();
            // 保存Channel實例,暫時不考慮斷連重連
            ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
            // 構造契約接口代理類實例
            HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
            String result = helloService.sayHello("throwable");
            log.info(result);
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

先啓動《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》一文中的ServerApplication,再啓動ClientApplication,控制檯輸出以下:

// 服務端日誌
2020-01-16 22:34:51 [main] INFO  c.throwable.server.ServerApplication - 啓動NettyServer[9092]成功...
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 查找目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}

// 客戶端日誌
2020-01-16 22:36:35 [main] INFO  c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]調用成功,發送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092]
2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO  c.throwable.client.ClientApplication - 接收到來自服務端的響應消息,消息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}

小結

Client端主要負責契約接口調用轉換爲發送RPC協議請求這一步,核心技術就是動態代理,在不進行模塊封裝優化的前提下實現是相對簡單的。這裏其實Client端還有一個比較大的技術難題沒有解決,上面例子中客戶端日誌輸出若是眼尖的夥伴會發現,Client端發送RPC請求的線程(main線程)和Client端接收ServerRPC響應處理的線程(nioEventLoopGroup-2-1線程)並不相同,這一點是Netty處理網絡請求之因此可以如此高效的根源(簡單來講就是請求和響應是異步的,兩個流程原本是互不感知的)。可是更多狀況下,咱們但願外部請求是同步的,但願發送RPC請求的線程獲得響應結果再返回(這裏請求和響應有可能依然是異步流程)。下一篇文章會詳細分析一下若是對請求-響應作同步化處理。

Demo項目地址:

(c-2-d e-a-20200116)

相關文章
相關標籤/搜索