從0到1搭建RPC框架

Cool-Rpc

前言

此博客所述項目代碼已在github開源,歡迎你們一塊兒貢獻!
點此進入:Cool-RPCjava

最近一次寫博客仍是17年末,謝謝你們持久以來的關注
本篇博文將會教你們如何從0到1,搭建一個簡單、高效且拓展性強的rpc框架.node

什麼是RPC

相信你們都或多或少使用過RPC框架,好比阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等git

那麼究竟什麼是rpc?
rpc翻譯成中文叫作遠程過程調用,通俗易懂點:將單應用架構成分佈式系統架構後,多個系統間數據怎麼交互,這就是rpc的職責.github

從服務的角度來看,rpc分爲服務提供者(provider)和服務消費者(consumer)兩大類,中間會有一些共用java接口,叫作開放api接口
也就是說,接口服務實現類所處的地方叫作provider,接口服務調用類所處的地方叫consumerredis

由於處於分佈式環境中,那consumer調用provider時,如何知道對方服務器的IP和開放端口呢?
這時須要一個組件叫作註冊中心,consumer經過服務名後,去註冊中心上查找該服務的IP+Port,拿到地址數據後,再去請求該地址的服務bootstrap

如圖:api

clipboard.png

Cool-Rpc技術簡介

此項目基於傳輸層(TCP/IP協議)進行通信,傳輸層框架使用netty編寫,github上會有mina版本
提供多套序列化框架,默認使用Protostuff序列化,可配置使用java序列化等
註冊中心默認zookeeper,可配置使用redis(只要有節點數據存儲和消息通知功能的組件便可)服務器

consumer經過java動態代理的方式使用執行遠程調用
將所要執行的類名,方法,參數等通知provider,以後provider拿着數據調用本地實現類,將處理後獲得的結果通知給consumer架構

註冊中心

廢話了那麼多,開始上乾貨,建議你們從github克隆完整代碼,本篇博文只講重點代碼框架

註冊中心以api接口名爲key,IP+Port爲value,將數據持久化,以供消費者查詢調用

以zookeeper爲例:

爲了更靈活地實現服務註冊者和發現者,這裏添加一個註冊中心適配器

public abstract class ServiceCenterAdapter implements ServiceCenter{

    String host;
    int port = 0;
    String passWord;

    ServiceCenterAdapter(){}

    ServiceCenterAdapter(String host){
        this.host = host;
    }

    ServiceCenterAdapter(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public String discover(String serviceName) {
        return null;
    }

    @Override
    public void register(String serviceName, String serviceAddress) {}

    @Override
    public void setHost(String host){
        this.host = host;
    };

    @Override
    public void setPort(int port){
        this.port = port;
    };

    @Override
    public void setPassWord(String passWord){
        this.passWord = passWord;
    };
    //獲取 IP:端口
    @Override
    public String getAddress(){
        if ("".equals(host) || host == null || port == 0){
            throw new RuntimeException("the zookeeper host or port error");
        }
        return host+":"+String.valueOf(port);
    };
}

zookeeper的服務註冊(provider使用):
在實際項目中,須要構造此類,並注入相應的IP和端口,最後以bean的形式注入到IOC容器中

public class ZooKeeperServiceRegistry extends ServiceCenterAdapter {

    private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class);

    private ZkClient zkClient;

    {
        this.port = 2181;
        zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
        log.info("connect zookeeper");
    }

    public ZooKeeperServiceRegistry(String zkHost) {
        super(zkHost);
    }

    public ZooKeeperServiceRegistry(String zkHost, int zkPort) {
        super(zkHost, zkPort);
    }

    // 註冊服務 serviceName=接口名  serviceAddress=IP+Port
    @Override
    public void register(String serviceName, String serviceAddress) {
        // create cool node permanent
        String registryPath = CoolConstant.ZK_REGISTRY_PATH;
        if (!zkClient.exists(registryPath)) {
            zkClient.createPersistent(registryPath);
            log.info("create registry node: {}", registryPath);
        }
        // create service node permanent
        String servicePath = registryPath + "/" + serviceName;
        if (!zkClient.exists(servicePath)) {
            zkClient.createPersistent(servicePath);
            log.info("create service node: {}", servicePath);
        }
        // create service address node temp
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
        log.info("create address node: {}", addressNode);
    }

}

zookeeper的服務發現者(consumer使用):
同上,也須要配置相應的IP和端口,並以bean注入到項目ioc容器中

public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter {

    private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class);

    {
        super.port = 2181;
    }

    public ZooKeeperServiceDiscovery(){};

    public ZooKeeperServiceDiscovery(String zkHost){
        super(zkHost);
    }

    public ZooKeeperServiceDiscovery(String zkHost, int zkPort){
        super(zkHost, zkPort);
    }

    // 服務發現    name=api接口名
    @Override
    public String discover(String name) {

        ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
        log.debug("connect zookeeper");
        try {
            String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name;
            if (!zkClient.exists(servicePath)) {
                throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
            }
            List<String> addressList = zkClient.getChildren(servicePath);
            if (addressList.size() == 0) {
                throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
            }
            String address;
            int size = addressList.size();
            if (size == 1) {
                address = addressList.get(0);
                log.debug("get only address node: {}", address);
            } else {
                address = addressList.get(ThreadLocalRandom.current().nextInt(size));
                log.debug("get random address node: {}", address);
            }
            String addressPath = servicePath + "/" + address;
            return zkClient.readData(addressPath);
        } finally {
            zkClient.close();
        }
    }

}

服務端TCP處理器

此篇博文的TCP數據(包括編解碼器、處理器)所有以netty編寫

服務端的netty引導類:

public class CoolRpcServer implements ApplicationContextAware {

    private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class);
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap bootstrap;
    private HandlerInitializer handlerInitializer;
    private ServiceCenter serviceRegistry;
    private String serviceIP;
    private int port;
    public static Map<String, Object> servicesMap ;

    {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        bootstrap = new ServerBootstrap();
        handlerInitializer = new HandlerInitializer();
        servicesMap = new HashMap<>(16);
    }

    public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){
        this.serviceRegistry = serviceRegistry;
        this.serviceIP = serviceIP;
        this.port = port;

    }

    /**
     * start and init tcp server if ioc contain is booting
     */
    @SuppressWarnings("unchecked")
    public void initServer() throws InterruptedException {

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(handlerInitializer);

        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        // the most send bytes ( 256KB )
        bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256);
        // the most receive bytes ( 2048KB )
        bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2);

        channel = bootstrap.bind(serviceIP,port).sync().channel();

        if (servicesMap != null && servicesMap.size() > 0){
            for (String beanName: servicesMap.keySet()){
                serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port));
                log.info("register service name = {}", beanName);
            }
        }
        log.info("TCP server started successfully, port:{}", port);

        channel.closeFuture().sync();
    }


    /**
     * close ioc contain and stop tcp server
     */
    public void stopServer(){

        if (channel != null && channel.isActive()) {
            channel.close();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }

        log.info("TCP server stopped successfully, port: {}", port);
    }

    /**
     *  scan Annotation of CoolService
     */
    @Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> beans = ctx.getBeansWithAnnotation(CoolService.class);
        if (beans != null && beans.size()>0){
            for (Object bean : beans.values()){
                String name = bean.getClass().getAnnotation(CoolService.class).value().getName();
                servicesMap.put(name, bean);
            }
        }
    }

}

此項目的開放api接口實現類須要用@CoolService註解標識,服務端容器啓動時,會掃描全部帶有此註解的實現類,並注入到註冊中心

服務端處理器(netty handler):

@ChannelHandler.Sharable
public class CoolServerHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CoolResponse response = new CoolResponse();
        CoolRequest request = (CoolRequest) msg;

        try {
            Object result = invoke(request);
            response.setRequestID(request.getRequestID());
            response.setResult(result);
        } catch (Throwable error) {
            response.setError(error);
        }
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }


    private Object invoke(CoolRequest request) throws Throwable{
        if (request == null){
            throw new Throwable("cool rpc request not found");
        }

        String className = request.getClassName();
        String methodName = request.getMethodName();
        Object[] parameters = request.getParameters();
        Object service = CoolRpcServer.servicesMap.get(className);
        if (service == null){
            throw new Throwable("cool rpc service not exist");
        }

        Class<?> serviceClass = service.getClass();
        Class<?>[] parameterTypes = request.getParameterTypes();

        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        return fastMethod.invoke(service, parameters);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("server caught exception", cause);
        ctx.close();
    }

}

將客戶端傳輸過來的請求數據(類名,方法,參數)在本地以cglib的方式反射調用
調用成功後,將處理完畢的結果編碼返回給客戶端,而且關閉TCP鏈接

客戶端TCP處理器

consumer只有api接口,並無其實現類,因此咱們能夠用java動態代理的方式去自定義方法實現,代理的方法實現即是創建TCP握手鍊接,有provider來執行方法,將獲得的結果返回給代理類,由此形成一種單憑接口就能調用實現類方法的假象

第一步: 使用java動態代理new出代理對象

public class CoolProxy {

    private static Logger log = LoggerFactory.getLogger(CoolProxy.class);

    private ServiceCenter serviceDiscovery;

    public CoolProxy(ServiceCenter serviceDiscovery){
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T getInstance(Class<T> cls){

        return (T)Proxy.newProxyInstance(cls.getClassLoader(),
                new Class<?>[]{cls},
                (proxy, method, args) -> {

                    CoolRequest request = new CoolRequest();
                    request.setRequestID(UUID.randomUUID().toString());
                    request.setClassName(method.getDeclaringClass().getName());
                    request.setMethodName(method.getName());
                    request.setParameters(args);
                    request.setParameterTypes(method.getParameterTypes());

                    String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2);

                    CoolRpcClient client = new CoolRpcClient(addr[0],
                            Integer.parseInt(addr[1]));

                    CoolResponse response = client.send(request);
                    if (response.getError()!=null){
                        throw response.getError();
                    } else {
                        return response.getResult();
                    }

                });
    }

}

第二步: 在代理方法中,使用遠程過程調用(rpc)

客戶端引導類:

public class CoolRpcClient {

    private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class);

    private CountDownLatch countDownLatch;
    private EventLoopGroup group;
    private Bootstrap bootstrap;
    private CoolResponse response;
    private String serviceIP;
    private int port;

    {
        response = new CoolResponse();
        countDownLatch = new CountDownLatch(1);
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
    }


    public CoolRpcClient(String serviceIP, int port){
        this.serviceIP = serviceIP;
        this.port = port;
    }


    public CoolResponse send(CoolRequest request){
        try {
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                            .addLast(new CoolRpcDecoder(CoolResponse.class))
                            .addLast(new CoolRpcEncoder(CoolRequest.class))
                            .addLast(new CoolClientHandler(countDownLatch, response));
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);

            Channel channel = bootstrap.connect(serviceIP, port).sync().channel();
            channel.writeAndFlush(request).sync();
            countDownLatch.await();
            channel.closeFuture().sync();

            return response;
        } catch (Exception e){
            e.printStackTrace();
            return null;
        } finally {
            group.shutdownGracefully();
        }
    }


}

客戶端處理器(handler):

@ChannelHandler.Sharable
public class CoolClientHandler extends ChannelInboundHandlerAdapter {

    private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class);

    private CountDownLatch latch;
    private CoolResponse response;

    public CoolClientHandler(CountDownLatch latch, CoolResponse response){
        this.latch = latch;
        this.response = response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CoolResponse enResponse = (CoolResponse) msg;
        this.response.sync(enResponse);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        latch.countDown();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("api caught exception", cause);
        ctx.close();
    }

}

最後使用CountDownLatch同步通知調用者,rpc調用完畢

結束語

以上即是Cool-Rpc的簡單講解,若有更好的想法請聯繫我
熱烈歡迎你們一塊兒維護此項目Cool-RPC

相關文章
相關標籤/搜索