註冊中心代碼使用 zookeeper 實現,咱們經過圖片來看看咱們註冊中心的架構。
java
首先說明, zookeeper 的實現思路和代碼是參考架構探險這本書上的,另外在 github 和我前面配置文件中的 zookeeper 服務器是用的1個月免費適用的阿里雲,你們也能夠用它當測試用。git
很少說,一次性給出註冊中心所有代碼。github
客戶端對應的註冊中心接口編程
public interface RegisterCenter4Consumer { /** * 消費端初始化服務提供者信息本地緩存 */ public void initProviderMap(); /** * 消費端獲取服務提供者信息 * @return */ public Map<String,List<ServiceProvider>> getServiceMetaDataMap4Consumer(); /** * 消費端將消費者信息註冊到 zookeeper 對應的節點下 * @param invokers */ public void registerConsumer(final List<ServiceConsumer> invokers); }
服務端對應的註冊中心接口緩存
public interface RegisterCenter4Provider { /** * 服務端將服務提供者信息註冊到 zookeeper 對應的節點下 * @param serivceList */ public void registerProvider(final List<ServiceProvider> serivceList); /** * 服務端獲取服務提供者信息 * @return key:服務提供者接口 value:服務提供者服務方法列表 */ public Map<String, List<ServiceProvider>> getProviderService(); }
註冊中心實現類:服務器
public class ZookeeperRegisterCenter implements RegisterCenter4Provider, RegisterCenter4Consumer { private static ZookeeperRegisterCenter registerCenter = new ZookeeperRegisterCenter(); private ZookeeperRegisterCenter(){}; public static ZookeeperRegisterCenter getInstance(){ return registerCenter; } //服務提供者列表,key:服務提供者接口,value:服務提供者服務方法列表 private static final Map<String,List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>(); //服務端 zookeeper 元信息,選擇服務(第一次從zookeeper 拉取,後續由zookeeper監聽機制主動更新) private static final Map<String,List<ServiceProvider>> serviceData4Consumer = new ConcurrentHashMap<>(); //從配置文件中獲取 zookeeper 服務地址列表 private static String ZK_SERIVCE = Configuration.getInstance().getAddress(); //從配置文件中獲取 zookeeper 會話超時時間配置 private static int ZK_SESSION_TIME_OUT = 5000; //從配置文件中獲取 zookeeper 鏈接超時事件配置 private static int ZK_CONNECTION_TIME_OUT = 5000; private static String ROOT_PATH = "/rpc_register"; public static String PROVIDER_TYPE = "/provider"; public static String CONSUMER_TYPE = "/consumer"; private static volatile ZkClient zkClient = null; @Override public void initProviderMap() { if(serviceData4Consumer.isEmpty()){ serviceData4Consumer.putAll(fetchOrUpdateServiceMetaData()); } } @Override public Map<String, List<ServiceProvider>> getServiceMetaDataMap4Consumer() { return serviceData4Consumer; } @Override public void registerConsumer(List<ServiceConsumer> consumers) { if(consumers == null || consumers.size() == 0){ return; } //鏈接 zookeeper ,註冊服務 synchronized (ZookeeperRegisterCenter.class){ if(zkClient == null){ zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT, new SerializableSerializer()); } //建立 zookeeper 命名空間 boolean exist = zkClient.exists(ROOT_PATH); if(!exist){ zkClient.createPersistent(ROOT_PATH,true); } //建立服務提供者節點 exist = zkClient.exists((ROOT_PATH)); if(!exist){ zkClient.createPersistent(ROOT_PATH); } for(int i = 0; i< consumers.size();i++) { ServiceConsumer consumer = consumers.get(i); //建立服務消費者節點 String serviceNode = consumer.getConsumer().getName(); String servicePath = ROOT_PATH + CONSUMER_TYPE + "/" + serviceNode; exist = zkClient.exists(servicePath); System.out.println("exist:" + exist); System.out.println("servicePath:" + servicePath); if (!exist) { zkClient.createPersistent(servicePath, true); } //建立當前服務器節點 InetAddress addr = null; try { addr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } String ip = addr.getHostAddress(); String currentServiceIpNode = servicePath + "/" + ip; exist = zkClient.exists(currentServiceIpNode); if (!exist) { zkClient.createEphemeral(currentServiceIpNode); } } } } @Override public void registerProvider(List<ServiceProvider> serivceList) { if(serivceList == null || serivceList.size() == 0){ return; } //鏈接 zookeeper,註冊服務,加鎖,將全部須要註冊的服務放到providerServiceMap裏面 synchronized (ZookeeperRegisterCenter.class){ for(ServiceProvider provider:serivceList){ //獲取接口名稱 String serviceItfKey = provider.getProvider().getName(); //先從當前服務提供者的集合裏面獲取 List<ServiceProvider> providers = providerServiceMap.get(serviceItfKey); if(providers == null){ providers = new ArrayList<>(); } providers.add(provider); providerServiceMap.put(serviceItfKey,providers); } if(zkClient == null){ zkClient = new ZkClient(ZK_SERIVCE,ZK_SESSION_TIME_OUT,ZK_CONNECTION_TIME_OUT,new SerializableSerializer()); } //建立當前應用 zookeeper 命名空間 boolean exist = zkClient.exists(ROOT_PATH); if(!exist){ zkClient.createPersistent(ROOT_PATH,true); } //服務提供者節點 exist = zkClient.exists((ROOT_PATH)); if(!exist){ zkClient.createPersistent(ROOT_PATH); } for(Map.Entry<String,List<ServiceProvider>> entry:providerServiceMap.entrySet()){ //建立服務提供者節點 String serviceNode = entry.getKey(); String servicePath = ROOT_PATH +PROVIDER_TYPE +"/" + serviceNode; exist = zkClient.exists(servicePath); if(!exist){ zkClient.createPersistent(servicePath,true); } //建立當前服務器節點,這裏是註冊時使用,一個接口對應的ServiceProvider 只有一個 int serverPort = entry.getValue().get(0).getPort(); InetAddress addr = null; try { addr = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } String ip = addr.getHostAddress(); String impl = (String)entry.getValue().get(0).getServiceObject(); String serviceIpNode = servicePath +"/" + ip + "|" + serverPort + "|" + impl; System.out.println("serviceIpNode:" + serviceIpNode); exist = zkClient.exists(serviceIpNode); if(!exist){ //建立臨時節點 zkClient.createEphemeral(serviceIpNode); } //監聽註冊服務的變化,同時更新數據到本地緩存 zkClient.subscribeChildChanges(servicePath, new IZkChildListener() { @Override public void handleChildChange(String s, List<String> list) throws Exception { if(list == null){ list = new ArrayList<>(); } //存活的服務 IP 列表 List<String> activeServiceIpList = new ArrayList<>(); for(String input:list){ String ip = StringUtils.split(input, "|").get(0); activeServiceIpList.add(ip); } refreshActivityService(activeServiceIpList); } }); } } } /** * * 在某個服務端獲取本身暴露的服務 */ @Override public Map<String, List<ServiceProvider>> getProviderService() { return providerServiceMap; } //利用ZK自動刷新當前存活的服務提供者列表數據 private void refreshActivityService(List<String> serviceIpList) { if (serviceIpList == null||serviceIpList.isEmpty()) { serviceIpList = new ArrayList<>(); } Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>(); for (Map.Entry<String, List<ServiceProvider>> entry : providerServiceMap.entrySet()) { String key = entry.getKey(); List<ServiceProvider> providerServices = entry.getValue(); List<ServiceProvider> serviceMetaDataModelList = currentServiceMetaDataMap.get(key); if (serviceMetaDataModelList == null) { serviceMetaDataModelList = new ArrayList<>(); } for (ServiceProvider serviceMetaData : providerServices) { if (serviceIpList.contains(serviceMetaData.getIp())) { serviceMetaDataModelList.add(serviceMetaData); } } currentServiceMetaDataMap.put(key, serviceMetaDataModelList); } providerServiceMap.clear(); providerServiceMap.putAll(currentServiceMetaDataMap); } private void refreshServiceMetaDataMap(List<String> serviceIpList) { if (serviceIpList == null) { serviceIpList = new ArrayList<>(); } Map<String, List<ServiceProvider>> currentServiceMetaDataMap = new HashMap<>(); for (Map.Entry<String, List<ServiceProvider>> entry : serviceData4Consumer.entrySet()) { String serviceItfKey = entry.getKey(); List<ServiceProvider> serviceList = entry.getValue(); List<ServiceProvider> providerServiceList = currentServiceMetaDataMap.get(serviceItfKey); if (providerServiceList == null) { providerServiceList = new ArrayList<>(); } for (ServiceProvider serviceMetaData : serviceList) { if (serviceIpList.contains(serviceMetaData.getIp())) { providerServiceList.add(serviceMetaData); } } currentServiceMetaDataMap.put(serviceItfKey, providerServiceList); } serviceData4Consumer.clear(); serviceData4Consumer.putAll(currentServiceMetaDataMap); } private Map<String, List<ServiceProvider>> fetchOrUpdateServiceMetaData() { final Map<String, List<ServiceProvider>> providerServiceMap = new ConcurrentHashMap<>(); //鏈接zk synchronized (ZookeeperRegisterCenter.class) { if (zkClient == null) { zkClient = new ZkClient(ZK_SERIVCE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer()); } } //從ZK獲取服務提供者列表 String providePath = ROOT_PATH+PROVIDER_TYPE; System.out.println("111111:"+providePath); List<String> providerServices = zkClient.getChildren(providePath); System.out.println(providerServices.toString()); for (String serviceName : providerServices) { String servicePath = providePath +"/"+ serviceName; System.out.println("1100:"+servicePath); List<String> ipPathList = zkClient.getChildren(servicePath); System.out.println("ipPathList:"+ipPathList.toString()); for (String ipPath : ipPathList) { String serverIp = ipPath.split("\\|")[0]; String serverPort = ipPath.split("\\|")[1]; String impl = ipPath.split("\\|")[2]; List<ServiceProvider> providerServiceList = providerServiceMap.get(serviceName); if (providerServiceList == null) { providerServiceList = new ArrayList<>(); } ServiceProvider providerService = new ServiceProvider(); try { Class clazz = Class.forName(serviceName); providerService.setProvider(clazz); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } providerService.setIp(serverIp); providerService.setPort(Integer.parseInt(serverPort)); providerService.setServiceObject(impl); providerService.setGroupName(""); providerServiceList.add(providerService); providerServiceMap.put(serviceName, providerServiceList); } //監聽註冊服務的變化,同時更新數據到本地緩存 zkClient.subscribeChildChanges(servicePath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if (currentChilds == null) { currentChilds = new ArrayList<>(); } List<String> activeServiceIpList = new ArrayList<>(); for(String input:currentChilds){ String ip = StringUtils.split(input, "|").get(0); activeServiceIpList.add(ip); } refreshServiceMetaDataMap(activeServiceIpList); } }); } return providerServiceMap; } }
寫完這部分整個 rpc 框架也就實現了,測試的客戶端和服務端在代碼裏也有,這裏就不貼出來了。平時時間有限,只能下班和週末的時間來寫,整個框架確定有不足和錯誤的地方,也有能夠改進的地方。但願你們可以不吝指教,互相進步。架構
我只是想將本身思考的過程給你們展現出來,但願你們一塊兒討論這些問題,看看還有哪些可以改進的地方。併發
須要改進的地方:框架
這篇文章沒有收費,若是您以爲對你的編程或多或少有點啓發就點個贊。ide
最後給出源碼 github 地址,源碼 編碼和碼字不易,若是您以爲學到了東西就請在 github 上加個 star, 固然在 github 上提出問題一塊兒改進是最好的。