你真的明白RPC 嗎?一塊兒來探究 RPC 的實質

​ 不論你是科班出身仍是半路轉行,這麼優秀的你必定上太小學語文,那麼對擴句和縮句你必定不陌生。縮句就是去除各類修飾提煉出一句話的核心,而不失基本的語義。下面來實現一個簡易的 rpc 程序探究其實質,進而去理解複雜的 rpc 框架。所謂複雜的框架就是在簡單的過程當中加入了一些設計裝飾將rpc的功能豐富起來,如 dubbo 的 filter、router、loadblance、集羣容錯、多種 Invoker 、通信協議等等,這就是一個擴句的過程。 文中福利,附一張劉祕美照java

RPC是指遠程過程調用,也就是說兩臺服務器A、B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,因爲不在一個內存空間,不能直接調用,須要經過網絡去發起一次調用請求獲取結果。node

​ 不管是市面上主流的 rpc 框架仍是小衆的 rpc 框架都實現了上述 rpc的語義。【服務治理型:dubbo、dubbox、motan;多語言型:grpc、thrift、avro、protocol buffers】git

打一波廣告:【博主最近在寫一個 java 實現的 rpc 框架 bridge 歡迎關注,考慮Mesh 化】github

1、原理

首先用一幅圖來簡單描述一下 rpc 的調用過程,從 dubbo 官網拿來的,不算是最簡單的圖,可是也很是簡單了,去掉上面的 Registry 和下面的 Monitor 剩下的就是最簡單的 rpc 調用,說白了就是一個網絡請求。 spring

過程描述:bootstrap

  1. 啓動服務端provider,並向註冊中心登記一下本身暴露服務的地址和服務詳情
  2. 而後啓動消費端consumer, 訂閱註冊中心的內容,也就是訂閱服務,獲取服務的詳情
  3. 若是服務有變更,註冊中心會通知消費端去更新訂閱內容,更新服務詳情。
  4. 客戶端拿到了服務詳情,經過網絡對服務端發起網絡請求,獲取結果
  5. 監視器能夠獲取到服務調用詳情和消費詳情,但不限於此

OK,原理就是這麼簡單,接下來根據上面的描述逐步實現。api

2、動手實踐

下面基於 springboot 來實現上述的過程。數組

2.1 構建模塊

搭建工程和子模塊,工程結構以下: springboot

2.2 實現服務端

看下服務端的內容,貼圖服務器

把接口定義在 api 模塊,consumer 和 provider 模塊都要引用到,接口HelloService代碼以下

package com.glmapper.simple.api;

/** * service interface * * @author: Jerry */
public interface HelloService {

    /** * service function * * @param name * @return */
    String hello(String name);
}
複製代碼

而後在 provider 模塊實現接口,用自定註解 @SimpleProvider 標識,先看下註解內容

package com.glmapper.simple.provider.annotation;

/** * 自定義服務註解 * * @author Jerry */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// 標明可被 Spring 掃描
@Component 
public @interface SimpleProvider {

    Class<?> value();
}
複製代碼

註解使用了@Component標識,因此可被 spring 掃描到,接下來看實現類HelloServiceImpl

package com.glmapper.simple.provider.service;

/** * service implement class * * @author: Jerry */
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {

    /** * service function * * @param name * @return */
    @Override
    public String hello(String name) {
        return "Hello! " + name;
    }
}
複製代碼

在定義一個服務配置的類SimpleProviderProperties,方便經過 application.yml 文件配置,

package com.glmapper.simple.provider.property;

/** * provider properties * * @author: Jerry */
public class SimpleProviderProperties {

    /** * 暴露服務的端口 */
    private Integer port;

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }
}
複製代碼

到這裏基礎的類文件就已經結束了,下面開始服務初始化,入口 ProviderInitializer

package com.glmapper.simple.provider;

/** * 啓動並註冊服務 * * @author Jerry */
public class ProviderInitializer implements ApplicationContextAware, InitializingBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);

    private SimpleProviderProperties providerProperties;

    /** * service registry */
    private ServiceRegistry serviceRegistry;

    /** * store interface and service implement mapping */
    private Map<String, Object> handlerMap = new HashMap<>();

    public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
        this.providerProperties = providerProperties;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        // 獲取被 SimpleProvider 註解的 Bean
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            .addLast(new SimpleDecoder(SimpleRequest.class))
                            .addLast(new SimpleEncoder(SimpleResponse.class))
                            .addLast(new SimpleHandler(handlerMap));
                }
            };
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(channelHandler)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String host = getLocalHost();
            if (null == host) {
                LOGGER.error("can't get service address,because address is null");
                throw new SimpleException("can't get service address,because address is null");
            }
            int port = providerProperties.getPort();
            ChannelFuture future = bootstrap.bind(host, port).sync();
            LOGGER.debug("server started on port {}", port);

            if (serviceRegistry != null) {
                String serverAddress = host + ":" + port;
                serviceRegistry.register(serverAddress);
            }
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    /** * get service host * * @return */
    private String getLocalHost() {
        Enumeration<NetworkInterface> allNetInterfaces;
        try {
            allNetInterfaces = NetworkInterface.getNetworkInterfaces();
        } catch (SocketException e) {
            LOGGER.error("get local address error,cause:", e);
            return null;
        }
        while (allNetInterfaces.hasMoreElements()) {
            NetworkInterface netInterface = allNetInterfaces.nextElement();
            Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress ip = addresses.nextElement();
                if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {
                    return ip.getHostAddress();
                }
            }
        }
        return null;
    }
}
複製代碼

描述一下這個類作了什麼工做:

  • 首先他實現了ApplicationContextAware, InitializingBean這兩個 spring 中接口,根據IOC容器初始化的順序,會依次回調用接口中的setApplicationContextafterPropertiesSet 方法。
    • setApplicationContext方法中獲取了容器中被@SimpleProvider標註的類,並將服務接口名和服務實現類綁定,存放到handlerMap中,在@SimpleProvider中有一個 value 屬性,是考慮到一個類能夠實現多個接口,經過 value 能夠指定哪一個服務接口,固然也能夠定義爲數組,處理多個接口
    • afterPropertiesSet 方法中作了兩件事:
      • 在服務端開啓了一個處理socket請求的線程池,監聽和處理服務暴露端口上接受到的請求,指定了一個處理器SimpleHandler
      • 調用ServiceRegistry類的registry方法向 zookeeper 註冊服務的地址和端口,這裏沒有用到協議,只註冊了 ip:port

SimpleHandler是一個實現了 nettySimpleChannelInboundHandler的請求處理器類

package com.glmapper.simple.provider.handler;

/** * request handler * * @author Jerry */
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);

    private final Map<String, Object> handlerMap;

    public SimpleHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
        SimpleResponse response = new SimpleResponse();
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Throwable t) {
            response.setError(t);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(SimpleRequest request) throws Throwable {
        String className = request.getClassName();
        Object serviceBean = handlerMap.get(className);

        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        LOGGER.error("server caught exception", cause);
        ctx.close();
    }
}
複製代碼

SimpleHandler基於 netty 的事件驅動模型觸發對應的方法,當收到請求事件會調用channelRead0方法,這個方法的做用就是,根據請求參數中的接口名找到對應的實現類調用指定的方法,而後把結果返回。

再瞅瞅ServiceRegistry,入口是ProviderInitializer調用了ServiceRegistryregistry方法

package com.glmapper.simple.provider.registry;

/** * connect zookeeper to registry service * * @author Jerry */
public class ServiceRegistry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private ZookeeperProperties zookeeperProperties;

    public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    public void register(String data) {
        if (data != null) {
            ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
            if (zk != null) {
                addRootNode(zk);
                createNode(zk, data);
            }
        }
    }

    /** * add one zookeeper root node * * @param zk */
    private void addRootNode(ZooKeeper zk) {
        try {
            String registryPath = zookeeperProperties.getRootPath();
            Stat s = zk.exists(registryPath, false);
            if (s == null) {
                zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("zookeeper add root node error,cause:", e);
        }
    }

    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
            String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
            String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("create zookeeper node error,cause:", e);
        }
    }
}
複製代碼

ServiceRegistry類作的工做比較簡單,就是把 服務ip:port註冊到 zk 的指定目錄下

  • 建立根節點,根節點是個永久節點
  • 在根節點下建立臨時的子節點,子節點存儲了服務的 ip:port,服務被掛掉對應的子節點就會被幹掉

2.3 消費端

消費端內容:

消費端的內容比較少,核心就三個類:ServiceDiscoveryConsumerHandlerConsumerProxy

先看下ServiceDiscovery內容:

package com.glmapper.simple.consumer.discovery;

/** * 服務發現:鏈接ZK,添加watch事件 * * @author Jerry */
public class ServiceDiscovery {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

    private volatile List<String> nodes = new ArrayList<>();

    private ZookeeperProperties zookeeperProperties;

    public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
        String address = zookeeperProperties.getAddress();
        int timeout = zookeeperProperties.getTimeout();
        ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
        if (zk != null) {
            watchNode(zk);
        }
    }

    public String discover() {
        String data = null;
        int size = nodes.size();
        if (size > 0) {
            if (size == 1) {
                data = nodes.get(0);
                LOGGER.debug("using only node: {}", data);
            } else {
                data = nodes.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("using random node: {}", data);
            }
        }
        return data;
    }

    private void watchNode(final ZooKeeper zk) {
        try {
            Watcher childrenNodeChangeWatcher = event -> {
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    watchNode(zk);
                }
            };
            String rootPath = zookeeperProperties.getRootPath();
            List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);
            List<String> nodes = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(rootPath + "/" + node, false, null);
                nodes.add(new String(bytes, Charset.forName("UTF-8")));
            }
            LOGGER.info("node data: {}", nodes);
            this.nodes = nodes;
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("節點監控出錯,緣由:", e);
        }
    }
}
複製代碼

這個類的入口是構造器,做用是獲取 zk 的地址,而後獲取 zk 上的節點信息,這裏沒有實現服務訂閱,也就是說若是 zk 上本來有兩個服務,掛掉一個,客戶端不會剔除掛掉的服務信息,致使調用失敗。

而後是ConsumerProxy,它是一個代理工廠:

package com.glmapper.simple.consumer.proxy;

/** * ConsumerProxy * * @author Jerry */
public class ConsumerProxy {

    private ServiceDiscovery serviceDiscovery;

    public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new SimpleInvocationHandler());
    }

    private class SimpleInvocationHandler implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            SimpleRequest request = buildRequest(method, args);
            String serverAddress = getServerAddress();
            String[] array = serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);
            ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
            SimpleResponse response = consumerHandler.send(request);
            if (response.getError() != null) {
                throw new SimpleException("service invoker error,cause:", response.getError());
            } else {
                return response.getResult();
            }
        }

        private SimpleRequest buildRequest(Method method, Object[] args) {
            SimpleRequest request = new SimpleRequest();
            request.setRequestId(UUID.randomUUID().toString());
            request.setClassName(method.getDeclaringClass().getName());
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameters(args);
            return request;
        }

        private String getServerAddress() {
            String serverAddress = null;
            if (serviceDiscovery != null) {
                serverAddress = serviceDiscovery.discover();
            }
            if (null == serverAddress) {
                throw new SimpleException("no server address available");
            }
            return serverAddress;
        }
    }
}
複製代碼

這裏有個內部類SimpleInvocationHandler是生產代理的核心,方法的核心是在 SimpleInvocationHandler.invoke()中是調用這兩行代碼

ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
複製代碼

發起網絡請求,下面看下ConsumerHandler

package com.glmapper.simple.consumer.handler;

/** * RPC真正調用客戶端 * * @author Jerry */
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);

    private int port;

    private String host;

    private SimpleResponse response;

    private CountDownLatch latch = new CountDownLatch(1);

    public ConsumerHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
        this.response = response;
        latch.countDown();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("client caught exception", cause);
        ctx.close();
    }

    public SimpleResponse send(SimpleRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            // 將 RPC 請求進行編碼(爲了發送請求)
                            .addLast(new SimpleEncoder(SimpleRequest.class))
                            // 將 RPC 響應進行解碼(爲了處理響應)
                            .addLast(new SimpleDecoder(SimpleResponse.class))
                            // 使用 RpcClient 發送 RPC 請求
                            .addLast(ConsumerHandler.this);
                }
            };
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(channelHandler)
                    .option(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().writeAndFlush(request).sync();
            latch.await();
            if (response != null) {
                future.channel().closeFuture().sync();
            }
            return response;
        } finally {
            group.shutdownGracefully();
        }
    }
}
複製代碼

這個類和服務端的 ProviderHandler 的代碼差很少,也是netty通信類

附一下 GitHub 地址 simple-rpc

相關文章
相關標籤/搜索