上一章節咱們已經實現了從客戶端往服務端發送數據而且經過反射方法調用服務端的實現類最後返回給客戶端的底層協議。
這一章節咱們來實現客戶端代理類的注入。java
承接上一章,咱們實現了多個底層協議,procotol 有 netty,http,和 socket 三個實現類,每一個實現類都有啓動服務端和客戶端發送數據兩個方法。spring
問題緩存
自定義 Spring 標籤服務器
先看總體的結構:
併發
寫一個 xsd 文件來自定義咱們的標籤和屬性,注意 schema 的 xmlns 和
targetNamespace 屬性, http://paul.com/schema。app
<xsd:schema xmlns="http://paul.com/schema" xmlns:xsd="http://www.w3.org/2001/XMLSchema" targetNamespace="http://paul.com/schema"> <xsd:complexType name="procotol-type"> <xsd:attribute name="procotol" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="port" type="xsd:int"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="serialize" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="stragety" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="role" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="address" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:element name="procotol" type="procotol-type"> <xsd:annotation> <xsd:documentation><![CDATA[ elementname1的文檔 ]]></xsd:documentation> </xsd:annotation> </xsd:element> <xsd:complexType name="application-type"> <xsd:attribute name="name" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:element name="application" type="application-type"> <xsd:annotation> <xsd:documentation><![CDATA[ elementname1的文檔 ]]></xsd:documentation> </xsd:annotation> </xsd:element> <xsd:complexType name="service-type"> <xsd:attribute name="interfaces" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="ref" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="timeout" type="xsd:int"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:element name="service" type="service-type"> <xsd:annotation> <xsd:documentation><![CDATA[ elementname1的文檔 ]]></xsd:documentation> </xsd:annotation> </xsd:element> <xsd:complexType name="provider-type"> <xsd:attribute name="interf" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> <xsd:attribute name="impl" type="xsd:string"> <xsd:annotation> <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation> </xsd:annotation> </xsd:attribute> </xsd:complexType> <xsd:element name="provider" type="provider-type"> <xsd:annotation> <xsd:documentation><![CDATA[ elementname1的文檔 ]]></xsd:documentation> </xsd:annotation> </xsd:element> </xsd:schema>
在自定義的 BeanDefinitionParser 來對咱們自定義標籤的屬性進行解析。
在 BeanDefinitionParser 裏面咱們可使用 Spring 的一些組件,也能夠只將咱們自定的屬性解析出來。parse 方法裏面傳入的兩個參數,經過 element 能夠得到 xml 中的屬性信息,經過 parserContext 能夠獲取到 BeanDefinitionRegistry,熟悉 Spring 源碼的同窗應該知道這個類,咱們能夠經過這個類將咱們的類注入到 Spring 容器中。
構造方法中的 beanClass 咱們能夠傳入本身定義的類,將解析出來的屬性賦值到類的屬性中。負載均衡
rpc:procotol 標籤
這個標籤中包含了協議類型,端口,序列化協議,註冊中心地址和角色(服務端仍是客戶端)。這個標籤解析中咱們將一些屬性賦值到了 Configuration 配置類中,根據屬性選擇了協議類型,若是是客戶端,提早初始化出 channel 保存到阻塞隊列中,提升併發能力,若是是客戶端則啓動通訊服務器。dom
客戶端 procotol 標籤配置:socket
<rpc:procotol procotol="Dubbo" port="3230" serialize="ProtoStuff" address="47.107.56.23:2181"/>
服務端 procotol 標籤配置:ide
<rpc:procotol procotol="Dubbo" port="3230" serialize="ProtoStuff" role="provider" address="47.107.56.23:2181"/>
對應的解析器。
public class ProcotolBeanDefinitionParser implements BeanDefinitionParser { private final Class<?> beanClass; public ProcotolBeanDefinitionParser(Class<?> beanClass) { this.beanClass = beanClass; } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { System.out.println("1"); String pro = element.getAttribute("procotol"); int port = Integer.parseInt(element.getAttribute("port")); Configuration.getInstance().setProcotol(pro); Configuration.getInstance().setPort(port); Configuration.getInstance().setSerialize(element.getAttribute("serialize")); Configuration.getInstance().setStragety(element.getAttribute("stragety")); Configuration.getInstance().setRole(element.getAttribute("role")); Configuration.getInstance().setAddress(element.getAttribute("address")); if("provider".equals(element.getAttribute("role"))){ Procotol procotol = null; if("Dubbo".equalsIgnoreCase(pro)){ procotol = new DubboProcotol(); }else if("Http".equalsIgnoreCase(pro)){ procotol = new HttpProcotol(); }else if("Socket".equalsIgnoreCase(pro)){ procotol = new SocketProcotol(); }else{ procotol = new DubboProcotol(); } try { InetAddress addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); if(port == 0){ port = 32115; } URL url = new URL(ip,port); procotol.start(url); } catch (Exception e) { e.printStackTrace(); } }else{ //獲取服務註冊中心 ZookeeperRegisterCenter registerCenter4Consumer = ZookeeperRegisterCenter.getInstance(); //初始化服務提供者列表到本地緩存 registerCenter4Consumer.initProviderMap(); //初始化Netty Channel Map<String, List<ServiceProvider>> providerMap = registerCenter4Consumer.getServiceMetaDataMap4Consumer(); if (MapUtils.isEmpty(providerMap)) { throw new RuntimeException("service provider list is empty."); } NettyChannelPoolFactory.getInstance().initNettyChannelPoolFactory(providerMap); } return null; } }
rpc:provider 標籤,這個是服務端服務發佈標籤。經過這個標籤代表服務端想要將哪些服務發佈出來。
<rpc:provider interf="com.paul.service.HelloService" impl="com.paul.service.HelloServiceImpl" /> <rpc:provider interf="com.paul.service.UserService" impl="com.paul.service.UserServiceImpl" />
對應的解析器:
將須要暴露的服務註冊中 zookeeper。
public class ProviderBeanDefinitionParser implements BeanDefinitionParser { private final Class<?> beanClass; public ProviderBeanDefinitionParser(Class<?> beanClass) { this.beanClass = beanClass; } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { System.out.println("15"); String interfaces = element.getAttribute("interf"); String impl = element.getAttribute("impl"); int port = Configuration.getInstance().getPort(); InetAddress addr = null; try { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); if(port == 0) { port = 32115; } List<ServiceProvider> providerList = new ArrayList<>(); ServiceProvider providerService = new ServiceProvider(); providerService.setProvider(Class.forName(interfaces)); providerService.setServiceObject(impl); providerService.setIp(ip); providerService.setPort(port); providerService.setTimeout(5000); providerService.setServiceMethod(null); providerService.setApplicationName(""); providerService.setGroupName("nettyrpc"); providerList.add(providerService); //註冊到zk,元數據註冊中心 RegisterCenter4Provider registerCenter4Provider = ZookeeperRegisterCenter.getInstance(); registerCenter4Provider.registerProvider(providerList); } catch (UnknownHostException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }
rpc:service 標籤,這個標籤代表客戶端須要調用哪些服務端的接口,將對應的代理類注入到 Spring 中,在成需中能夠直接使用 @Autowired 注入這個代理類,就能夠像調用本地服務同樣調用遠程服務了。
<rpc:service interfaces="com.paul.service.HelloService" ref="helloService" timeout="5000"/>
對應的解析器:
將接口的代理類注入到 Spring 中,而且將消費者也就是客戶端註冊到註冊中心。
public class ServiceBeanDefinitionParser implements BeanDefinitionParser { private final Class<?> beanClass; public ServiceBeanDefinitionParser(Class<?> beanClass) { this.beanClass = beanClass; } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { String interfaces = element.getAttribute("interfaces"); String ref = element.getAttribute("ref"); Class clazz = null; try { clazz = Class.forName(interfaces); } catch (ClassNotFoundException e) { e.printStackTrace(); } BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz); GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition(); definition.getConstructorArgumentValues().addGenericArgumentValue(clazz); definition.setBeanClass(ProxyFactory.class); definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); BeanDefinitionRegistry beanDefinitionRegistry = parserContext.getRegistry(); beanDefinitionRegistry.registerBeanDefinition(ref,definition); //獲取服務註冊中心 ZookeeperRegisterCenter registerCenter4Consumer = ZookeeperRegisterCenter.getInstance(); //將消費者信息註冊到註冊中心 ServiceConsumer invoker = new ServiceConsumer(); List<ServiceConsumer> consumers = new ArrayList<>(); consumers.add(invoker); invoker.setConsumer(clazz); invoker.setServiceObject(interfaces); invoker.setGroupName(""); registerCenter4Consumer.registerConsumer(consumers); return definition; } }
定義一個 NamespaceHandler 來註冊對應的標籤和 BeanDefinitionParser。
public class RpcNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("procotol", new ProcotolBeanDefinitionParser(Configuration.class)); // registerBeanDefinitionParser("register", new RegisterBeanDefinitionParser(Configuration.class)); registerBeanDefinitionParser("application", new ApplicationBeanDefinitionParser(Configuration.class)); registerBeanDefinitionParser("provider", new ProviderBeanDefinitionParser(Configuration.class)); // registerBeanDefinitionParser("role", new ServerBeanDefinitionParser(Configuration.class)); registerBeanDefinitionParser("service", new ServiceBeanDefinitionParser(Configuration.class)); } }
在 Spring 中註冊上面的 schema 和 handler。
spring.handlers, 這裏要將 schema 和咱們自定義的 handler 類 mapping 起來。
http\://paul.com/schema=com.paul.spring.RpcNamespaceHandler
spring.schema,代表 xsd 文件的位置。
http\://paul.com/schema/rpc.xsd=META-INF/rpc.xsd
經過上面的配置咱們實現了根據配置來作通訊協議,序列化協議的選擇以及客戶端代理類注入到 Spring 中方便咱們之後調用,還實現了服務端的啓動,以及對應註冊到註冊中心的功能。
獲取接口代理類的實現
咱們使用的是 JDK 動態代理。
```java
public class ProxyFactory
private Class
private ApplicationContext ctx; public ProxyFactory(Class<T> interfaceClass) { this.interfaceClass = interfaceClass; } @Override public T getObject() throws Exception { return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new Handler(interfaceClass)); } @Override public Class<?> getObjectType() { return interfaceClass; }
}
```
Invocation 的實現類 handler, 也就是動態代理類的 invoke 方法的調用,經過 invoke 方法調用對應協議的 send 方法去發送數據。在發送數據前,經過負載均衡策略選擇對應的服務端地址,拼裝 RpcRequest 調用 proctol 接口實現類的 send 方法發送數據。
```java
public class Handler
private Class<T> interfaceClass; public Handler(Class<T> interfaceClass) { this.interfaceClass = interfaceClass; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Configuration configuration = Configuration.getInstance(); Procotol procotol; if("Dubbo".equalsIgnoreCase(configuration.getProcotol())){ procotol = new DubboProcotol(); }else if("Http".equalsIgnoreCase(configuration.getProcotol())){ procotol = new HttpProcotol(); }else if("Socket".equalsIgnoreCase(configuration.getProcotol())){ procotol = new SocketProcotol(); }else{ procotol = new DubboProcotol(); } //服務接口名稱 String serviceKey = interfaceClass.getName(); //獲取某個接口的服務提供者列表 RegisterCenter4Consumer registerCenter4Consumer = ZookeeperRegisterCenter.getInstance(); List<ServiceProvider> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consumer().get(serviceKey); //根據軟負載策略,從服務提供者列表選取本次調用的服務提供者 String stragety = configuration.getStragety(); if(null == stragety || stragety == ""){ stragety = "random"; } System.out.println("paul:"+ providerServices.get(0).toString()); LoadStrategy loadStrategyService = LoadBalanceEngine.queryLoadStrategy(stragety); ServiceProvider serviceProvider = loadStrategyService.select(providerServices); URL url = new URL(serviceProvider.getIp(),serviceProvider.getPort()); String impl = serviceProvider.getServiceObject().toString(); int timeout = 20000; RpcRequest invocation = new RpcRequest(UUID.randomUUID().toString(),interfaceClass.getName(),method.getName(),args, method.getParameterTypes(),impl,timeout); Object res = procotol.send(url, invocation); return res; }
}
```
這樣咱們完成 rpc-spring 模塊的代碼。