上一篇剖析了SimpleRpc的服務端,這裏來看看客戶端調用java
慣例,先看看時序圖:node
1:spring初始化zk服務發現類ZooKeeperServiceDiscovery和服務代理類,且後者依賴前者spring
2:獲取服務代理類bean,並建立請求代理類(代理的是HelloService接口的實現類)bootstrap
3:建立代理時,首先利用zk服務發現類去zk獲取服務類節點信息(包括服務類及其提供者地址)服務器
4:利用獲取到的地址發起調用(Netty發起調用)併發
5:服務端處理完畢-->獲取服務端的返回值dom
服務端已介紹過,這裏看看客戶端如何處理吧ide
啓動main方法:oop
public static void main(String[] args) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("spring.xml"); RpcProxy rpcProxy = context.getBean(RpcProxy.class); HelloService helloService = rpcProxy.create(HelloService.class); String result = helloService.hello("World"); System.out.println(result); HelloService helloService2 = rpcProxy.create(HelloService.class, "sample.hello2"); String result2 = helloService2.hello("世界"); System.out.println(result2); System.exit(0); }
首先進行spring的初始化:this
<context:property-placeholder location="classpath:rpc.properties"/> <bean id="serviceDiscovery" class="com.xxx.rpc.registry.zookeeper.ZooKeeperServiceDiscovery"> <constructor-arg name="zkAddress" value="${rpc.registry_address}"/> </bean> <bean id="rpcProxy" class="com.xxx.rpc.client.RpcProxy"> <constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/> </bean>
初始化zk服務發現類及服務代理類,這裏的zk地址經過配置文件注入,和服務端的地址相同
初始化完畢後,獲得服務代理類Bean,建立代理,傳入接口參數(恰好這裏利用的是jdk的代理)
@SuppressWarnings("unchecked") public <T> T create(final Class<?> interfaceClass, final String serviceVersion) { // 建立動態代理對象 return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 建立 RPC 請求對象並設置請求屬性 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(method.getDeclaringClass().getName()); request.setServiceVersion(serviceVersion); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); // 獲取 RPC 服務地址 if (serviceDiscovery != null) { String serviceName = interfaceClass.getName(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } serviceAddress = serviceDiscovery.discover(serviceName); LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress); } if (StringUtil.isEmpty(serviceAddress)) { throw new RuntimeException("server address is empty"); } // 從 RPC 服務地址中解析主機名與端口號 String[] array = StringUtil.split(serviceAddress, ":"); String host = array[0]; int port = Integer.parseInt(array[1]); // 建立 RPC 客戶端對象併發送 RPC 請求 RpcClient client = new RpcClient(host, port); long time = System.currentTimeMillis(); RpcResponse response = client.send(request); LOGGER.debug("time: {}ms", System.currentTimeMillis() - time); if (response == null) { throw new RuntimeException("response is null"); } // 返回 RPC 響應結果 if (response.hasException()) { throw response.getException(); } else { return response.getResult(); } } } ); }
在建立代理的過程當中就發生了rpc調用
先是建立請求對象RpcRequest,接着利用注入的zk發現類去zk服務端獲取服務類地址(這裏利用多態,使用接口形式,實際使用注入bean)
@Override public String discover(String name) { // 建立 ZooKeeper 客戶端 ZkClient zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT); LOGGER.debug("connect zookeeper"); try { // 獲取 service 節點 String servicePath = Constant.ZK_REGISTRY_PATH + "/" + name; if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath)); } List<String> addressList = zkClient.getChildren(servicePath); if (CollectionUtil.isEmpty(addressList)) { throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath)); } // 獲取 address 節點 String address; int size = addressList.size(); if (size == 1) { // 若只有一個地址,則獲取該地址 address = addressList.get(0); LOGGER.debug("get only address node: {}", address); } else { // 若存在多個地址,則隨機獲取一個地址 address = addressList.get(ThreadLocalRandom.current().nextInt(size)); LOGGER.debug("get random address node: {}", address); } // 獲取 address 節點的值 String addressPath = servicePath + "/" + address; return zkClient.readData(addressPath); } finally { zkClient.close(); } }
拿到服務類信息,也就有了服務提供者的ip+port,這些信息就是服務端在啓動時註冊到zk中的
有了這些信息,就能夠利用netty發起請求調用了
public RpcResponse send(RpcRequest request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 建立並初始化 Netty 客戶端 Bootstrap 對象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 編碼 RPC 請求 pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解碼 RPC 響應 pipeline.addLast(RpcClient.this); // 處理 RPC 響應 } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); // 鏈接 RPC 服務器 ChannelFuture future = bootstrap.connect(host, port).sync(); // 寫入 RPC 請求數據並關閉鏈接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); // 返回 RPC 響應對象 return response; } finally { group.shutdownGracefully(); } }
返回的信息由全局變量RpcResponse接收,實際的返回結果被其包裝着