實例列表獲取主要是HostReactor#getServiceInfo方法。Nacos - 啓動中namingService.subscribe註冊監聽的時候,也會調用這個方法。json
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { // 若是發生故障轉移,就從文件緩存裏取 NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 從serviceInfoMap裏取 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // serviceInfoMap沒有 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); // 內存沒有,從服務器取 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // 若是正在更新,則wait,避免多線程同時調用服務器 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 開啓定時任務更新服務列表 scheduleUpdateIfAbsent(serviceName, clusters); // 從內存裏取 return serviceInfoMap.get(serviceObj.getKey()); }
從服務器獲取,NamingProxy會調用NamingProxy#reqApi,他會隨機取一個server,調用NamingProxy#callServer。NamingProxy的代碼就略了。segmentfault
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 從服務器取 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 解析返回的字符串 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
主要是判斷是否有更新,有更新發送給serviceChanged,並寫入文件緩存
public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { return oldService; } boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); // 下面是修改、新增、移除的篩選。 List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); // 有更新發送給serviceChanged,並寫入文件 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 有更新發送給serviceChanged,並寫入文件 eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; }
把任務加入線程池服務器
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { // 任務已經有了就不加了 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { // // 任務已經有了就不加了 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } // 把任務加入線程池 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); // 加入任務 futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }
定時任務更新,默認每10秒更新,若是失敗了,就每次乘以2,好比第一次1秒,第二次2秒,第三次4秒,最可能是2的6次方,最大等待60秒。多線程
public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); // serviceInfoMap沒有就直接更新 if (serviceObj == null) { updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } // 失敗了就增長失敗次數 if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); // 成功就重置爲1 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { // 從新到線程池 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
結合Nacos - HostReactor的建立。ide