## 1. 使用zookeeper做註冊中心,把被調用方的信息註冊上去
服務的註冊前端
public void register(String data) { if (data != null) { byte[] bytes = data.getBytes(); try { if (zk.exists(dataPath, null) == null) { zk.create(dataPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.create(dataPath + "/data", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
服務的發現java
public String discover() { String data = null; int size = dataList.size(); // 存在新節點,使用便可 if (size > 0) { if (size == 1) { data = dataList.get(0); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); } } return data; }
註解RpcService標記被調用方的實現類,RpcClientService標記調用方的類須要生成代理類git
@Target({ ElementType.TYPE })//註解用在接口上 @Retention(RetentionPolicy.RUNTIME)//VM將在運行期也保留註釋,所以能夠經過反射機制讀取註解的信息 @Component public @interface RpcClientService { }
@Target({ ElementType.TYPE })//註解用在接口上 @Retention(RetentionPolicy.RUNTIME)//VM將在運行期也保留註釋,所以能夠經過反射機制讀取註解的信息 @Component public @interface RpcService { Class<?> value(); }
掃描包下的RpcClientService註解,並生成代理類github
/** * 用於Spring動態注入自定義接口 * * @author shuangyueliao */ @Component public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor { @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { Set<Class<?>> typesAnnotatedWith = new Reflections("com.shuangyueliao.rpc.myinterface").getTypesAnnotatedWith(RpcClientService.class); for (Class beanClazz : typesAnnotatedWith) { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz); GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition(); //在這裏,咱們能夠給該對象的屬性注入對應的實例。 //好比mybatis,就在這裏注入了dataSource和sqlSessionFactory, // 注意,若是採用definition.getPropertyValues()方式的話, // 相似definition.getPropertyValues().add("interfaceType", beanClazz); // 則要求在FactoryBean(本應用中即ServiceFactory)提供setter方法,不然會注入失敗 // 若是採用definition.getConstructorArgumentValues(), // 則FactoryBean中須要提供包含該屬性的構造方法,不然會注入失敗 Properties properties = new Properties(); InputStream is=this.getClass().getResourceAsStream("/application.properties"); try { properties.load(is); } catch (IOException e) { e.printStackTrace(); } String registerAddress = properties.getProperty("zookeeper.url"); String dataPath = properties.getProperty("zookeeper.register.path.prefix"); ServiceDiscovery serviceDiscovery = new ServiceDiscovery(registerAddress, dataPath); definition.getPropertyValues().addPropertyValue("serviceDiscovery", serviceDiscovery); definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz); //注意,這裏的BeanClass是生成Bean實例的工廠,不是Bean自己。 // FactoryBean是一種特殊的Bean,其返回的對象不是指定類的一個實例, // 其返回的是該工廠Bean的getObject方法所返回的對象。 definition.setBeanClass(RpcProxy.class); //這裏採用的是byType方式注入,相似的還有byName等 definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); registry.registerBeanDefinition(beanClazz.getSimpleName(), definition); } } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { } }
獲取代理類sql
public class RpcProxy<T> implements FactoryBean<T> { private String serverAddress; private Class<T> interfaceType; private ServiceDiscovery serviceDiscovery; public RpcProxy(Class<T> interfaceType) { this.interfaceType = interfaceType; } public ServiceDiscovery getServiceDiscovery() { return serviceDiscovery; } public void setServiceDiscovery(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } private RpcClient rpcClient; @Override public T getObject() throws Exception { return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[] { interfaceType }, (proxy, method, args) -> { //建立RpcRequest,封裝被代理類的屬性 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); //拿到聲明這個方法的業務接口名稱 request.setClassName(method.getDeclaringClass() .getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); synchronized (this) { if (rpcClient == null) { //查找服務 if (serviceDiscovery != null) { serverAddress = serviceDiscovery.discover(); } //隨機獲取服務的地址 String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); //建立Netty實現的RpcClient,連接服務端 rpcClient = new RpcClient(host, port); } } //經過netty向服務端發送請求 RpcResponse response = rpcClient.send(request); //返回信息 if (response.isError()) { throw response.getError(); } else { return response.getResult(); } }); } @Override public Class<?> getObjectType() { return interfaceType; } }
調用方底層基於netty的發送請求和接收響應bootstrap
public RpcClient(String host, int port) { this.host = host; this.port = port; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) { // 向pipeline中添加編碼、解碼、業務處理的handler channel.pipeline() .addLast(new RpcEncoder(RpcRequest.class)) //OUT - 1 .addLast(new RpcDecoder(RpcResponse.class)) //IN - 1 .addLast(RpcClient.this); //IN - 2 } }).option(ChannelOption.SO_KEEPALIVE, true); // 連接服務器 future = bootstrap.connect(host, port).sync(); } catch (Exception e) { e.printStackTrace(); try { future.channel().closeFuture().sync(); } catch (InterruptedException e1) { e1.printStackTrace(); } group.shutdownGracefully(); } } /** * 連接服務端,發送消息 * * @param request * @return * @throws Exception */ public RpcResponse send(RpcRequest request) throws Exception { //將request對象寫入outbundle處理後發出(即RpcEncoder編碼器) // 用線程等待的方式決定是否關閉鏈接 // 其意義是:先在此阻塞,等待獲取到服務端的返回後,被喚醒,從而關閉網絡鏈接 Object o = new Object(); locks.put(request.getRequestId(), o); synchronized (o) { future.channel().writeAndFlush(request); o.wait(10000); } return response; } /** * 讀取服務端的返回結果 */ @Override public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { this.response = response; Object o = locks.remove(response.getRequestId()); synchronized (o) { o.notify(); } }
獲取接口與實現類的對應關係瀏覽器
public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx .getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { //從業務實現類上的自定義註解中獲取到value,歷來獲取到業務接口的全名 String interfaceName = serviceBean.getClass() .getAnnotation(RpcService.class).value().getName(); handlerMap.put(interfaceName, serviceBean); } } }
讀取調用方傳遞過來的接口名和參數,利用反射調用相應類並返回結果給前端服務器
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { //根據request來處理具體的業務調用 Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t); } //寫入 outbundle(即RpcEncoder)進行下一步處理(即編碼)後發送到channel中給客戶端 ctx.writeAndFlush(response); } /** * 根據request來處理具體的業務調用 * 調用是經過反射的方式來完成 * * @param request * @return * @throws Throwable */ private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); //拿到實現類對象 Object serviceBean = handlerMap.get(className); //拿到要調用的方法名、參數類型、參數值 String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //拿到接口類 Class<?> forName = Class.forName(className); //調用實現類對象的指定方法並返回結果 Method method = forName.getMethod(methodName, parameterTypes); return method.invoke(serviceBean, parameters); }
<dependency> <groupId>com.shuangyueliao</groupId> <artifactId>rpc-server</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
<dependency> <groupId>com.shuangyueliao</groupId> <artifactId>rpc-client</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency>
@RpcService(PayService.class) public class PayServiceImpl implements PayService { @Override public int calculate(int a, int b) { int result = a + b; return result; } }
@RpcClientService public interface PayService { int calculate(int a, int b); }
一、啓動zookeeper,如須要修改zookeeper鏈接地址,請修改rpc-sample-server和rpc-sample-client的配置文件application.properties中的配置項zookeeper.url
二、運行rpc-sample-server(被調用方)RpcBootstrap的main方法啓動被調用方
三、運行rpc-sample-client(調用方)的StartApp的main方法啓動調用方
四、瀏覽器輸入http://localhost:8090/order請求rpc-sample-client,rpc-sample-client會RPC調用rpc-sample-server 網絡
項目地址mybatis