simpleRpc解析-服務端

本文主要是對勇哥的simpleRpc進行了簡單的剖析,用來學習rpc,加深對rpc的理解!java

源碼地址:http://git.oschina.net/huangyong/rpcnode

勇哥博客:https://my.oschina.net/huangyong/blog/361751git

rpc(Remote Procedure Call)主要是將遠程服務調用包裝成爲本地服務調用,使用起來比較直觀。之前,項目中有使用過將定時任務模塊剝離出來,單獨部署,定時任務模塊使用rmi調用主系統的服務,使用起來和rpc應該是相似。spring

首先是down下源碼,部署在本地IDE,具體過程參見gitee介紹吧!bootstrap

下面是eclipse的工程截圖:api

先試用下:服務器

1:啓動本地zk,使用默認配置(工程中的默認配置就是ZK的默認配置,基本不用改)app

2:啓動rpc-sample-server工程的RpcBootstrap.javaeclipse

    啓動日誌:ide

start server
connect zookeeper
create address node: /registry/com.xxx.rpc.sample.api.HelloService-sample.hello2/address-0000000003
register service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000
create address node: /registry/com.xxx.rpc.sample.api.HelloService/address-0000000003
register service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000
server started on port 8000

3:啓動rpc-sample-client工程的HelloClient.java

connect zookeeper
get only address node: address-0000000003
discover service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000
time: 364ms
Hello! World
connect zookeeper
get only address node: address-0000000003
discover service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000
time: 19ms
你好! 世界

試用成功!

下面來進行分析

服務端啓動時序圖:

1:啓動main方法,spring開始初始化

2:初始化zk註冊類,鏈接zk,返回zkClient

3:初始化server,掃描application,將被@RpcService標註的bean提取並存儲

4:啓動netty服務端,並將rpc服務類及對應的服務地址註冊到zk中

5:獲取請求並利用反射調用請求處理類,返回結果

流程很清晰,反推一下:

要提供服務,必需要啓動一個服務端,這裏使用netty實現,有了服務端就要考慮對外提供什麼服務,這裏的服務被統一註冊到了zk,因此只須要去zk裏面查詢便可,也就意味着必需要有一個zk連接的過程,有了連接,就須要註冊服務了,那麼就須要辨識哪些服務是須要被註冊的,這裏是經過@RpcService標註的,同時也是spring的bean,那麼當獲取到請求後,利用反射調用真正的服務提供類,處理完畢以後返回結果,這樣就基本實現了上述的流程。

簡單看下源碼:

啓動spring

new ClassPathXmlApplicationContext("spring.xml");

spring.xml配置

<context:component-scan base-package="com.xxx.rpc.sample.server"/>

<context:property-placeholder location="classpath:rpc.properties"/>

<bean id="serviceRegistry" class="com.xxx.rpc.registry.zookeeper.ZooKeeperServiceRegistry">
    <constructor-arg name="zkAddress" value="${rpc.registry_address}"/>
</bean>
<bean id="rpcServer" class="com.xxx.rpc.server.RpcServer">
    <constructor-arg name="serviceAddress" value="${rpc.service_address}"/>
    <constructor-arg name="serviceRegistry" ref="serviceRegistry"/>
</bean>

看到首先是初始化zk註冊類ZooKeeperServiceRegistry

基本就是建立zk連接:

public ZooKeeperServiceRegistry(String zkAddress) {
        // 建立 ZooKeeper 客戶端
        zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT);
        LOGGER.debug("connect zookeeper");
    }

緊接着就初始化RpcSserver類,它依賴zk註冊類以及提供rpc服務的服務器address

這裏的rpc.properties:

rpc.service_address=127.0.0.1:8000
rpc.registry_address=127.0.0.1:2181

RpcServer類實現了ApplicationContextAware, InitializingBean接口

在重寫setApplicationContext時,實現了服務類辨識及存儲:

// 掃描帶有 RpcService 註解的類並初始化 handlerMap 對象
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
                String serviceName = rpcService.value().getName();
                String serviceVersion = rpcService.version();
                if (StringUtil.isNotEmpty(serviceVersion)) {
                    serviceName += "-" + serviceVersion;
                }
                handlerMap.put(serviceName, serviceBean);
            }
        }

在重寫afterPropertiesSet時實現了Netty服務的啓動:

@Override
    public void afterPropertiesSet() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 建立並初始化 Netty 服務端 Bootstrap 對象
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解碼 RPC 請求
                    pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 編碼 RPC 響應
                    pipeline.addLast(new RpcServerHandler(handlerMap)); // 處理 RPC 請求
                }
            });
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // 獲取 RPC 服務器的 IP 地址與端口號
            String[] addressArray = StringUtil.split(serviceAddress, ":");
            String ip = addressArray[0];
            int port = Integer.parseInt(addressArray[1]);
            // 啓動 RPC 服務器
            ChannelFuture future = bootstrap.bind(ip, port).sync();
            // 註冊 RPC 服務地址
            if (serviceRegistry != null) {
                for (String interfaceName : handlerMap.keySet()) {
                    serviceRegistry.register(interfaceName, serviceAddress);
                    LOGGER.debug("register service: {} => {}", interfaceName, serviceAddress);
                }
            }
            LOGGER.debug("server started on port {}", port);
            // 關閉 RPC 服務器
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

標準的啓動Netty服務端,同時使用依賴的zk註冊類去zk中註冊服務,上一步已經獲得了rpc服務類,提供服務類的地址也配置在文件中,那麼就能夠開始註冊了

@Override
    public void register(String serviceName, String serviceAddress) {
        // 建立 registry 節點(持久)
        String registryPath = Constant.ZK_REGISTRY_PATH;
        if (!zkClient.exists(registryPath)) {
            zkClient.createPersistent(registryPath);
            LOGGER.debug("create registry node: {}", registryPath);
        }
        // 建立 service 節點(持久)
        String servicePath = registryPath + "/" + serviceName;
        if (!zkClient.exists(servicePath)) {
            zkClient.createPersistent(servicePath);
            LOGGER.debug("create service node: {}", servicePath);
        }
        // 建立 address 節點(臨時)
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
        LOGGER.debug("create address node: {}", addressNode);
    }

註冊完成後,zk信息以下:

能夠看到已經註冊了兩個類了,這兩個類就位於rpc-sample-server工程中

那麼如何處理請求呢?因爲啓動的是Netty服務端,那麼請求到來時,確定是netty獲取到,在啓動服務端時,配置了一個RpcServerHandler,請求的反射處理就是在這裏進行的:

@Override
    public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        // 建立並初始化 RPC 響應對象
        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            response.setResult(result);
        } catch (Exception e) {
            LOGGER.error("handle result failure", e);
            response.setException(e);
        }
        // 寫入 RPC 響應對象並自動關閉鏈接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {
        // 獲取服務對象
        String serviceName = request.getInterfaceName();
        String serviceVersion = request.getServiceVersion();
        if (StringUtil.isNotEmpty(serviceVersion)) {
            serviceName += "-" + serviceVersion;
        }
        Object serviceBean = handlerMap.get(serviceName);
        if (serviceBean == null) {
            throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
        }
        // 獲取反射調用所需的參數
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();
        // 執行反射調用
//        Method method = serviceClass.getMethod(methodName, parameterTypes);
//        method.setAccessible(true);
//        return method.invoke(serviceBean, parameters);
        // 使用 CGLib 執行反射調用
        FastClass serviceFastClass = FastClass.create(serviceClass);
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
        return serviceFastMethod.invoke(serviceBean, parameters);
    }

這裏的請求流程基本是Netty的標準處理流程,在處理時,從請求中獲取到指定的請求參數,再去handlerMap中匹配對應的服務類bean,而後利用反射調用此bean對應的處理方法,並獲得返回結果

這樣一來,服務端的基本功能就完畢了!

看看主要的涉及面有哪些:

利用zk作服務註冊

利用@RpcService標識遠程服務類

利用Netty實現服務請求接收

利用反射實現真實服務調用

利用Protostuff實現序列化

這些應該是rpc基礎的涉及點吧,代碼量很少,理解起來相對容易,看來,完成rpc功能相對簡單,可是考慮其餘的因素就複雜了,慢慢學習理解吧!

相關文章
相關標籤/搜索