simpleRpc解析-客戶端

上一篇剖析了SimpleRpc的服務端,這裏來看看客戶端調用java

慣例,先看看時序圖:node

1:spring初始化zk服務發現類ZooKeeperServiceDiscovery和服務代理類,且後者依賴前者spring

2:獲取服務代理類bean,並建立請求代理類(代理的是HelloService接口的實現類)bootstrap

3:建立代理時,首先利用zk服務發現類去zk獲取服務類節點信息(包括服務類及其提供者地址)服務器

4:利用獲取到的地址發起調用(Netty發起調用)併發

5:服務端處理完畢-->獲取服務端的返回值dom

服務端已介紹過,這裏看看客戶端如何處理吧ide

啓動main方法:oop

public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
        RpcProxy rpcProxy = context.getBean(RpcProxy.class);

        HelloService helloService = rpcProxy.create(HelloService.class);
        String result = helloService.hello("World");
        System.out.println(result);

        HelloService helloService2 = rpcProxy.create(HelloService.class, "sample.hello2");
        String result2 = helloService2.hello("世界");
        System.out.println(result2);

        System.exit(0);
    }

首先進行spring的初始化:this

<context:property-placeholder location="classpath:rpc.properties"/>

    <bean id="serviceDiscovery" class="com.xxx.rpc.registry.zookeeper.ZooKeeperServiceDiscovery">
        <constructor-arg name="zkAddress" value="${rpc.registry_address}"/>
    </bean>

    <bean id="rpcProxy" class="com.xxx.rpc.client.RpcProxy">
        <constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/>
    </bean>

初始化zk服務發現類及服務代理類,這裏的zk地址經過配置文件注入,和服務端的地址相同

初始化完畢後,獲得服務代理類Bean,建立代理,傳入接口參數(恰好這裏利用的是jdk的代理)

@SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
        // 建立動態代理對象
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 建立 RPC 請求對象並設置請求屬性
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        // 獲取 RPC 服務地址
                        if (serviceDiscovery != null) {
                            String serviceName = interfaceClass.getName();
                            if (StringUtil.isNotEmpty(serviceVersion)) {
                                serviceName += "-" + serviceVersion;
                            }
                            serviceAddress = serviceDiscovery.discover(serviceName);
                            LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
                        }
                        if (StringUtil.isEmpty(serviceAddress)) {
                            throw new RuntimeException("server address is empty");
                        }
                        // 從 RPC 服務地址中解析主機名與端口號
                        String[] array = StringUtil.split(serviceAddress, ":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        // 建立 RPC 客戶端對象併發送 RPC 請求
                        RpcClient client = new RpcClient(host, port);
                        long time = System.currentTimeMillis();
                        RpcResponse response = client.send(request);
                        LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
                        if (response == null) {
                            throw new RuntimeException("response is null");
                        }
                        // 返回 RPC 響應結果
                        if (response.hasException()) {
                            throw response.getException();
                        } else {
                            return response.getResult();
                        }
                    }
                }
        );
    }

在建立代理的過程當中就發生了rpc調用

 先是建立請求對象RpcRequest,接着利用注入的zk發現類去zk服務端獲取服務類地址(這裏利用多態,使用接口形式,實際使用注入bean)

@Override
    public String discover(String name) {
        // 建立 ZooKeeper 客戶端
        ZkClient zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT);
        LOGGER.debug("connect zookeeper");
        try {
            // 獲取 service 節點
            String servicePath = Constant.ZK_REGISTRY_PATH + "/" + name;
            if (!zkClient.exists(servicePath)) {
                throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
            }
            List<String> addressList = zkClient.getChildren(servicePath);
            if (CollectionUtil.isEmpty(addressList)) {
                throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
            }
            // 獲取 address 節點
            String address;
            int size = addressList.size();
            if (size == 1) {
                // 若只有一個地址,則獲取該地址
                address = addressList.get(0);
                LOGGER.debug("get only address node: {}", address);
            } else {
                // 若存在多個地址,則隨機獲取一個地址
                address = addressList.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("get random address node: {}", address);
            }
            // 獲取 address 節點的值
            String addressPath = servicePath + "/" + address;
            return zkClient.readData(addressPath);
        } finally {
            zkClient.close();
        }
    }

拿到服務類信息,也就有了服務提供者的ip+port,這些信息就是服務端在啓動時註冊到zk中的

有了這些信息,就能夠利用netty發起請求調用了

public RpcResponse send(RpcRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 建立並初始化 Netty 客戶端 Bootstrap 對象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 編碼 RPC 請求
                    pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解碼 RPC 響應
                    pipeline.addLast(RpcClient.this); // 處理 RPC 響應
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            // 鏈接 RPC 服務器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // 寫入 RPC 請求數據並關閉鏈接
            Channel channel = future.channel();
            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();
            // 返回 RPC 響應對象
            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

返回的信息由全局變量RpcResponse接收,實際的返回結果被其包裝着

相關文章
相關標籤/搜索