帶你手寫基於 Spring 的可插拔式 RPC 框架(五)註冊中心

註冊中心代碼使用 zookeeper 實現,咱們經過圖片來看看咱們註冊中心的架構。
java

首先說明, zookeeper 的實現思路和代碼是參考架構探險這本書上的,另外在 github 和我前面配置文件中的 zookeeper 服務器是用的1個月免費適用的阿里雲,你們也能夠用它當測試用。git

很少說,一次性給出註冊中心所有代碼。github

客戶端對應的註冊中心接口編程

public interface RegisterCenter4Consumer {

    /**
     * 消費端初始化服務提供者信息本地緩存
     */
    public void initProviderMap();

    /**
     * 消費端獲取服務提供者信息
     * @return
     */
    public Map<String,List<ServiceProvider>> getServiceMetaDataMap4Consumer();

    /**
     * 消費端將消費者信息註冊到 zookeeper 對應的節點下
     * @param invokers
     */
    public void registerConsumer(final List<ServiceConsumer> invokers);
}

服務端對應的註冊中心接口緩存

public interface RegisterCenter4Provider {
    /**
     * 服務端將服務提供者信息註冊到 zookeeper 對應的節點下
     * @param serivceList
     */
    public void registerProvider(final List<ServiceProvider> serivceList);

    /**
     * 服務端獲取服務提供者信息
     * @return key:服務提供者接口 value:服務提供者服務方法列表
     */
    public Map<String, List<ServiceProvider>> getProviderService();
}

註冊中心實現類:服務器

public class ZookeeperRegisterCenter implements RegisterCenter4Provider, RegisterCenter4Consumer {

    private static ZookeeperRegisterCenter registerCenter = new ZookeeperRegisterCenter();

    private ZookeeperRegisterCenter(){};

    public static ZookeeperRegisterCenter getInstance(){
        return registerCenter;
    }
    //服務提供者列表,key:服務提供者接口,value:服務提供者服務方法列表
    private static final Map<String,List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();

    //服務端 zookeeper 元信息,選擇服務(第一次從zookeeper 拉取,後續由zookeeper監聽機制主動更新)
    private static final Map<String,List<ServiceProvider>> serviceData4Consumer = new ConcurrentHashMap<>();

    //從配置文件中獲取 zookeeper 服務地址列表
    private static String  ZK_SERIVCE = Configuration.getInstance().getAddress();

    //從配置文件中獲取 zookeeper 會話超時時間配置
    private static int ZK_SESSION_TIME_OUT = 5000;

    //從配置文件中獲取 zookeeper 鏈接超時事件配置
    private static int  ZK_CONNECTION_TIME_OUT = 5000;

    private static String ROOT_PATH = "/rpc_register";
    public  static String PROVIDER_TYPE = "/provider";
    public  static String CONSUMER_TYPE = "/consumer";

    private static volatile ZkClient zkClient = null;

    @Override
    public void initProviderMap() {
        if(serviceData4Consumer.isEmpty()){
            serviceData4Consumer.putAll(fetchOrUpdateServiceMetaData());
        }

    }

    @Override
    public Map<String, List<ServiceProvider>> getServiceMetaDataMap4Consumer() {
        return serviceData4Consumer;
    }

    @Override
    public void registerConsumer(List<ServiceConsumer> consumers) {
        if(consumers == null || consumers.size() == 0){
            return;
        }

        //鏈接 zookeeper ,註冊服務
        synchronized (ZookeeperRegisterCenter.class){
            if(zkClient == null){
                zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
            }
            //建立  zookeeper 命名空間
            boolean exist = zkClient.exists(ROOT_PATH);
            if(!exist){
                zkClient.createPersistent(ROOT_PATH,true);
            }
            //建立服務提供者節點
            exist = zkClient.exists((ROOT_PATH));
            if(!exist){
                zkClient.createPersistent(ROOT_PATH);
            }

            for(int i = 0; i< consumers.size();i++) {
                ServiceConsumer consumer = consumers.get(i);
                //建立服務消費者節點
                String serviceNode = consumer.getConsumer().getName();
                String servicePath = ROOT_PATH + CONSUMER_TYPE + "/" + serviceNode;

                exist = zkClient.exists(servicePath);
                System.out.println("exist:" + exist);
                System.out.println("servicePath:" + servicePath);
                if (!exist) {
                    zkClient.createPersistent(servicePath, true);
                }

                //建立當前服務器節點
                InetAddress addr = null;
                try {
                    addr = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                String ip = addr.getHostAddress();
                String currentServiceIpNode = servicePath + "/" + ip;
                exist = zkClient.exists(currentServiceIpNode);
                if (!exist) {
                    zkClient.createEphemeral(currentServiceIpNode);
                }


            }


        }

    }

    @Override
    public void registerProvider(List<ServiceProvider> serivceList) {
        if(serivceList == null || serivceList.size() == 0){
            return;
        }
        
        //鏈接 zookeeper,註冊服務,加鎖,將全部須要註冊的服務放到providerServiceMap裏面
        synchronized (ZookeeperRegisterCenter.class){
            for(ServiceProvider provider:serivceList){
                //獲取接口名稱
                String serviceItfKey = provider.getProvider().getName();
                //先從當前服務提供者的集合裏面獲取
                List<ServiceProvider> providers = providerServiceMap.get(serviceItfKey);
                if(providers == null){
                    providers = new ArrayList<>();
                }
                providers.add(provider);
                providerServiceMap.put(serviceItfKey,providers);
            }

            if(zkClient == null){
                zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT,new SerializableSerializer());
            }

            //建立當前應用 zookeeper 命名空間
            boolean exist = zkClient.exists(ROOT_PATH);
            if(!exist){
                zkClient.createPersistent(ROOT_PATH,true);
            }

            //服務提供者節點
            exist = zkClient.exists((ROOT_PATH));
            if(!exist){
                zkClient.createPersistent(ROOT_PATH);
            }

            for(Map.Entry<String,List<ServiceProvider>> entry:providerServiceMap.entrySet()){
                //建立服務提供者節點
                String serviceNode = entry.getKey();
                String servicePath = ROOT_PATH +PROVIDER_TYPE +"/" + serviceNode;
                exist = zkClient.exists(servicePath);
                if(!exist){
                    zkClient.createPersistent(servicePath,true);
                }

                //建立當前服務器節點,這裏是註冊時使用,一個接口對應的ServiceProvider 只有一個 
                int serverPort = entry.getValue().get(0).getPort();
                InetAddress addr = null;
                try {
                    addr = InetAddress.getLocalHost();
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                }
                String ip = addr.getHostAddress();
                String impl = (String)entry.getValue().get(0).getServiceObject();
                String serviceIpNode = servicePath +"/" + ip + "|" + serverPort + "|" + impl;
                System.out.println("serviceIpNode:" + serviceIpNode);
                exist = zkClient.exists(serviceIpNode);
                if(!exist){
                    //建立臨時節點
                    zkClient.createEphemeral(serviceIpNode);
                }
                //監聽註冊服務的變化,同時更新數據到本地緩存
                zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String s, List<String> list) throws Exception {
                        if(list  == null){
                            list = new ArrayList<>();
                        }
                        //存活的服務 IP 列表
                        List<String> activeServiceIpList = new ArrayList<>();
                        for(String input:list){
                            String ip = StringUtils.split(input, "|").get(0);
                            activeServiceIpList.add(ip);
                        }
                        refreshActivityService(activeServiceIpList);
                    }
                });

            }
        }

    }

    /**
     * 
     * 在某個服務端獲取本身暴露的服務
     */
    @Override
    public Map<String, List<ServiceProvider>> getProviderService() {
        return providerServiceMap;
    }
    
    
    //利用ZK自動刷新當前存活的服務提供者列表數據
    private void refreshActivityService(List<String> serviceIpList) {
        if (serviceIpList == null||serviceIpList.isEmpty()) {
            serviceIpList = new ArrayList<>();
        }

        Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
        for (Map.Entry<String, List<ServiceProvider>> entry : providerServiceMap.entrySet()) {
            String key = entry.getKey();
            List<ServiceProvider> providerServices = entry.getValue();

            List<ServiceProvider> serviceMetaDataModelList = currentServiceMetaDataMap.get(key);
            if (serviceMetaDataModelList == null) {
                serviceMetaDataModelList = new ArrayList<>();
            }

            for (ServiceProvider serviceMetaData : providerServices) {
                if (serviceIpList.contains(serviceMetaData.getIp())) {
                    serviceMetaDataModelList.add(serviceMetaData);
                }
            }
            currentServiceMetaDataMap.put(key, serviceMetaDataModelList);
        }
        providerServiceMap.clear();
        providerServiceMap.putAll(currentServiceMetaDataMap);
    }


    private void refreshServiceMetaDataMap(List<String> serviceIpList) {
        if (serviceIpList == null) {
            serviceIpList = new ArrayList<>();
        }

        Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>();
        for (Map.Entry<String, List<ServiceProvider>> entry : serviceData4Consumer.entrySet()) {
            String serviceItfKey = entry.getKey();
            List<ServiceProvider> serviceList = entry.getValue();

            List<ServiceProvider> providerServiceList = currentServiceMetaDataMap.get(serviceItfKey);
            if (providerServiceList == null) {
                providerServiceList = new ArrayList<>();
            }

            for (ServiceProvider serviceMetaData : serviceList) {
                if (serviceIpList.contains(serviceMetaData.getIp())) {
                    providerServiceList.add(serviceMetaData);
                }
            }
            currentServiceMetaDataMap.put(serviceItfKey, providerServiceList);
        }

        serviceData4Consumer.clear();
        serviceData4Consumer.putAll(currentServiceMetaDataMap);
    }


    private Map<String, List<ServiceProvider>> fetchOrUpdateServiceMetaData() {
        final Map<String, List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>();
        //鏈接zk
        synchronized (ZookeeperRegisterCenter.class) {
            if (zkClient == null) {
                zkClient = new ZkClient(ZK_SERIVCE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
            }
        }

        //從ZK獲取服務提供者列表
        String providePath = ROOT_PATH+PROVIDER_TYPE;
        System.out.println("111111:"+providePath);
        List<String> providerServices = zkClient.getChildren(providePath);
        System.out.println(providerServices.toString());
        for (String serviceName : providerServices) {
            String servicePath = providePath +"/"+ serviceName;
            System.out.println("1100:"+servicePath);
            List<String> ipPathList = zkClient.getChildren(servicePath);
            System.out.println("ipPathList:"+ipPathList.toString());
            for (String ipPath : ipPathList) {
                String serverIp = ipPath.split("\\|")[0];
                String serverPort = ipPath.split("\\|")[1];
                String impl = ipPath.split("\\|")[2];
                List<ServiceProvider> providerServiceList = providerServiceMap.get(serviceName);
                if (providerServiceList == null) {
                    providerServiceList = new ArrayList<>();
                }
                ServiceProvider providerService = new ServiceProvider();

                try {
                    Class clazz = Class.forName(serviceName);
                    providerService.setProvider(clazz);
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException(e);
                }

                providerService.setIp(serverIp);
                providerService.setPort(Integer.parseInt(serverPort));
                providerService.setServiceObject(impl);
                providerService.setGroupName("");
                providerServiceList.add(providerService);

                providerServiceMap.put(serviceName, providerServiceList);
            }

            //監聽註冊服務的變化,同時更新數據到本地緩存
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = new ArrayList<>();
                    }
                    List<String> activeServiceIpList = new ArrayList<>();
                    for(String input:currentChilds){
                        String ip = StringUtils.split(input, "|").get(0);
                        activeServiceIpList.add(ip);
                    }
                    refreshServiceMetaDataMap(activeServiceIpList);
                }
            });
        }
        return providerServiceMap;
    }

}

寫完這部分整個 rpc 框架也就實現了,測試的客戶端和服務端在代碼裏也有,這裏就不貼出來了。平時時間有限,只能下班和週末的時間來寫,整個框架確定有不足和錯誤的地方,也有能夠改進的地方。但願你們可以不吝指教,互相進步。架構

我只是想將本身思考的過程給你們展現出來,但願你們一塊兒討論這些問題,看看還有哪些可以改進的地方。併發

須要改進的地方:框架

  1. 服務端的啓動方式。
  2. 更高併發的改進。
  3. 服務治理。
  4. 監測中心。

這篇文章沒有收費,若是您以爲對你的編程或多或少有點啓發就點個贊。ide

最後給出源碼 github 地址,源碼 編碼和碼字不易,若是您以爲學到了東西就請在 github 上加個 star, 固然在 github 上提出問題一塊兒改進是最好的。

相關文章
相關標籤/搜索