一 Eurekajava
1. 須要掌握的一些基礎知識web
「Applications」:註冊在Eureka Server上的應用集合。-- 對應多個**Application**spring
「Application」:具體的一個應用(eureka-provider)。-- 對應多個"InstanceInfo"(localhost:8070,json
localhost:8071, localhost:8072)緩存
"InstanceInfo":應用實例。 IP + Port服務器
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package com.netflix.discovery.shared; 7 8 import com.fasterxml.jackson.annotation.JsonCreator; 9 import com.fasterxml.jackson.annotation.JsonIgnore; 10 import com.fasterxml.jackson.annotation.JsonProperty; 11 import com.fasterxml.jackson.annotation.JsonRootName; 12 import com.netflix.appinfo.InstanceInfo; 13 import com.netflix.appinfo.InstanceInfo.ActionType; 14 import com.netflix.appinfo.InstanceInfo.InstanceStatus; 15 import com.netflix.discovery.EurekaClientConfig; 16 import com.netflix.discovery.InstanceRegionChecker; 17 import com.netflix.discovery.provider.Serializer; 18 import com.thoughtworks.xstream.annotations.XStreamAlias; 19 import com.thoughtworks.xstream.annotations.XStreamImplicit; 20 import java.util.AbstractQueue; 21 import java.util.ArrayList; 22 import java.util.Collections; 23 import java.util.HashSet; 24 import java.util.Iterator; 25 import java.util.List; 26 import java.util.Locale; 27 import java.util.Map; 28 import java.util.Set; 29 import java.util.TreeMap; 30 import java.util.Map.Entry; 31 import java.util.concurrent.ConcurrentHashMap; 32 import java.util.concurrent.ConcurrentLinkedQueue; 33 import java.util.concurrent.atomic.AtomicInteger; 34 import java.util.concurrent.atomic.AtomicLong; 35 import java.util.concurrent.atomic.AtomicReference; 36 import javax.annotation.Nullable; 37 import org.slf4j.Logger; 38 import org.slf4j.LoggerFactory; 39 40 @Serializer("com.netflix.discovery.converters.EntityBodyConverter") 41 @XStreamAlias("applications") 42 @JsonRootName("applications") 43 public class Applications { 44 private static final String APP_INSTANCEID_DELIMITER = "$$"; 45 private static final Logger logger = LoggerFactory.getLogger(Applications.class); 46 private static final String STATUS_DELIMITER = "_"; 47 private Long versionDelta = -1L; 48 @XStreamImplicit 49 private AbstractQueue<Application> applications = new ConcurrentLinkedQueue(); 50 private Map<String, Application> appNameApplicationMap = new ConcurrentHashMap(); 51 private Map<String, AbstractQueue<InstanceInfo>> virtualHostNameAppMap = new ConcurrentHashMap(); 52 private Map<String, AbstractQueue<InstanceInfo>> secureVirtualHostNameAppMap = new ConcurrentHashMap(); 53 private Map<String, AtomicLong> virtualHostNameIndexMap = new ConcurrentHashMap(); 54 private Map<String, AtomicLong> secureVirtualHostNameIndexMap = new ConcurrentHashMap(); 55 private Map<String, AtomicReference<List<InstanceInfo>>> shuffleVirtualHostNameMap = new ConcurrentHashMap(); 56 private Map<String, AtomicReference<List<InstanceInfo>>> shuffledSecureVirtualHostNameMap = new ConcurrentHashMap(); 57 private String appsHashCode; 58 59 public Applications() { 60 } 61 62 @JsonCreator 63 public Applications(@JsonProperty("appsHashCode") String appsHashCode, @JsonProperty("versionDelta") Long versionDelta, @JsonProperty("application") List<Application> registeredApplications) { 64 Iterator var4 = registeredApplications.iterator(); 65 66 while(var4.hasNext()) { 67 Application app = (Application)var4.next(); 68 this.addApplication(app); 69 } 70 71 this.appsHashCode = appsHashCode; 72 this.versionDelta = versionDelta; 73 } 74 75 public Applications(List<Application> apps) { 76 this.applications.addAll(apps); 77 } 78 79 public void addApplication(Application app) { 80 this.appNameApplicationMap.put(app.getName().toUpperCase(Locale.ROOT), app); 81 this.addInstancesToVIPMaps(app); 82 this.applications.add(app); 83 } 84 85 @JsonProperty("application") 86 public List<Application> getRegisteredApplications() { 87 List<Application> list = new ArrayList(); 88 list.addAll(this.applications); 89 return list; 90 } 91 92 public Application getRegisteredApplications(String appName) { 93 return (Application)this.appNameApplicationMap.get(appName.toUpperCase(Locale.ROOT)); 94 } 95 96 public List<InstanceInfo> getInstancesByVirtualHostName(String virtualHostName) { 97 AtomicReference<List<InstanceInfo>> ref = (AtomicReference)this.shuffleVirtualHostNameMap.get(virtualHostName.toUpperCase(Locale.ROOT)); 98 return (List)(ref != null && ref.get() != null ? (List)ref.get() : new ArrayList()); 99 } 100 101 public List<InstanceInfo> getInstancesBySecureVirtualHostName(String secureVirtualHostName) { 102 AtomicReference<List<InstanceInfo>> ref = (AtomicReference)this.shuffledSecureVirtualHostNameMap.get(secureVirtualHostName.toUpperCase(Locale.ROOT)); 103 return (List)(ref != null && ref.get() != null ? (List)ref.get() : new ArrayList()); 104 } 105 106 public int size() { 107 int result = 0; 108 109 Application application; 110 for(Iterator var2 = this.applications.iterator(); var2.hasNext(); result += application.size()) { 111 application = (Application)var2.next(); 112 } 113 114 return result; 115 } 116 117 /** @deprecated */ 118 @Deprecated 119 public void setVersion(Long version) { 120 this.versionDelta = version; 121 } 122 123 /** @deprecated */ 124 @Deprecated 125 @JsonIgnore 126 public Long getVersion() { 127 return this.versionDelta; 128 } 129 130 public void setAppsHashCode(String hashCode) { 131 this.appsHashCode = hashCode; 132 } 133 134 @JsonIgnore 135 public String getAppsHashCode() { 136 return this.appsHashCode; 137 } 138 139 @JsonIgnore 140 public String getReconcileHashCode() { 141 TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap(); 142 this.populateInstanceCountMap(instanceCountMap); 143 return getReconcileHashCode(instanceCountMap); 144 } 145 146 public void populateInstanceCountMap(TreeMap<String, AtomicInteger> instanceCountMap) { 147 Iterator var2 = this.getRegisteredApplications().iterator(); 148 149 while(var2.hasNext()) { 150 Application app = (Application)var2.next(); 151 152 AtomicInteger instanceCount; 153 for(Iterator var4 = app.getInstancesAsIsFromEureka().iterator(); var4.hasNext(); instanceCount.incrementAndGet()) { 154 InstanceInfo info = (InstanceInfo)var4.next(); 155 instanceCount = (AtomicInteger)instanceCountMap.get(info.getStatus().name()); 156 if (instanceCount == null) { 157 instanceCount = new AtomicInteger(0); 158 instanceCountMap.put(info.getStatus().name(), instanceCount); 159 } 160 } 161 } 162 163 } 164 165 public static String getReconcileHashCode(TreeMap<String, AtomicInteger> instanceCountMap) { 166 String reconcileHashCode = ""; 167 168 Entry mapEntry; 169 for(Iterator var2 = instanceCountMap.entrySet().iterator(); var2.hasNext(); reconcileHashCode = reconcileHashCode + (String)mapEntry.getKey() + "_" + ((AtomicInteger)mapEntry.getValue()).get() + "_") { 170 mapEntry = (Entry)var2.next(); 171 } 172 173 return reconcileHashCode; 174 } 175 176 public Map<String, List<String>> getReconcileMapDiff(Applications apps) { 177 Map<String, List<String>> diffMap = new TreeMap(); 178 Set<Applications.Pair> allInstanceAppInstanceIds = new HashSet(); 179 Iterator var4 = apps.getRegisteredApplications().iterator(); 180 181 while(true) { 182 Application thisApp; 183 while(var4.hasNext()) { 184 Application otherApp = (Application)var4.next(); 185 thisApp = this.getRegisteredApplications(otherApp.getName()); 186 if (thisApp == null) { 187 logger.warn("Application not found in local cache : {}", otherApp.getName()); 188 } else { 189 Iterator var7 = thisApp.getInstancesAsIsFromEureka().iterator(); 190 191 InstanceInfo otherInstanceInfo; 192 while(var7.hasNext()) { 193 otherInstanceInfo = (InstanceInfo)var7.next(); 194 allInstanceAppInstanceIds.add(new Applications.Pair(thisApp.getName(), otherInstanceInfo.getId())); 195 } 196 197 for(var7 = otherApp.getInstancesAsIsFromEureka().iterator(); var7.hasNext(); allInstanceAppInstanceIds.remove(new Applications.Pair(otherApp.getName(), otherInstanceInfo.getId()))) { 198 otherInstanceInfo = (InstanceInfo)var7.next(); 199 InstanceInfo thisInstanceInfo = thisApp.getByInstanceId(otherInstanceInfo.getId()); 200 Object diffList; 201 if (thisInstanceInfo == null) { 202 diffList = (List)diffMap.get(ActionType.DELETED.name()); 203 if (diffList == null) { 204 diffList = new ArrayList(); 205 diffMap.put(ActionType.DELETED.name(), diffList); 206 } 207 208 ((List)diffList).add(otherInstanceInfo.getId()); 209 } else if (!thisInstanceInfo.getStatus().name().equalsIgnoreCase(otherInstanceInfo.getStatus().name())) { 210 diffList = (List)diffMap.get(ActionType.MODIFIED.name()); 211 if (diffList == null) { 212 diffList = new ArrayList(); 213 diffMap.put(ActionType.MODIFIED.name(), diffList); 214 } 215 216 ((List)diffList).add(thisInstanceInfo.getId() + "$$" + thisInstanceInfo.getStatus().name() + "$$" + otherInstanceInfo.getStatus().name()); 217 } 218 } 219 } 220 } 221 222 var4 = allInstanceAppInstanceIds.iterator(); 223 224 while(var4.hasNext()) { 225 Applications.Pair pair = (Applications.Pair)var4.next(); 226 thisApp = new Application(pair.getItem1()); 227 InstanceInfo thisInstanceInfo = thisApp.getByInstanceId(pair.getItem2()); 228 if (thisInstanceInfo != null) { 229 List<String> diffList = (List)diffMap.get(ActionType.ADDED.name()); 230 if (diffList == null) { 231 diffList = new ArrayList(); 232 diffMap.put(ActionType.ADDED.name(), diffList); 233 } 234 235 ((List)diffList).add(thisInstanceInfo.getId()); 236 } 237 } 238 239 return diffMap; 240 } 241 } 242 243 public void shuffleInstances(boolean filterUpInstances) { 244 this.shuffleInstances(filterUpInstances, false, (Map)null, (EurekaClientConfig)null, (InstanceRegionChecker)null); 245 } 246 247 public void shuffleAndIndexInstances(Map<String, Applications> remoteRegionsRegistry, EurekaClientConfig clientConfig, InstanceRegionChecker instanceRegionChecker) { 248 this.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances(), true, remoteRegionsRegistry, clientConfig, instanceRegionChecker); 249 } 250 251 private void shuffleInstances(boolean filterUpInstances, boolean indexByRemoteRegions, @Nullable Map<String, Applications> remoteRegionsRegistry, @Nullable EurekaClientConfig clientConfig, @Nullable InstanceRegionChecker instanceRegionChecker) { 252 this.virtualHostNameAppMap.clear(); 253 this.secureVirtualHostNameAppMap.clear(); 254 255 Application application; 256 for(Iterator var6 = this.appNameApplicationMap.values().iterator(); var6.hasNext(); this.addInstancesToVIPMaps(application)) { 257 application = (Application)var6.next(); 258 if (indexByRemoteRegions) { 259 application.shuffleAndStoreInstances(remoteRegionsRegistry, clientConfig, instanceRegionChecker); 260 } else { 261 application.shuffleAndStoreInstances(filterUpInstances); 262 } 263 } 264 265 this.shuffleAndFilterInstances(this.virtualHostNameAppMap, this.shuffleVirtualHostNameMap, this.virtualHostNameIndexMap, filterUpInstances); 266 this.shuffleAndFilterInstances(this.secureVirtualHostNameAppMap, this.shuffledSecureVirtualHostNameMap, this.secureVirtualHostNameIndexMap, filterUpInstances); 267 } 268 269 public AtomicLong getNextIndex(String virtualHostname, boolean secure) { 270 return secure ? (AtomicLong)this.secureVirtualHostNameIndexMap.get(virtualHostname) : (AtomicLong)this.virtualHostNameIndexMap.get(virtualHostname); 271 } 272 273 private void shuffleAndFilterInstances(Map<String, AbstractQueue<InstanceInfo>> srcMap, Map<String, AtomicReference<List<InstanceInfo>>> destMap, Map<String, AtomicLong> vipIndexMap, boolean filterUpInstances) { 274 Iterator var5 = srcMap.entrySet().iterator(); 275 276 while(var5.hasNext()) { 277 Entry<String, AbstractQueue<InstanceInfo>> entries = (Entry)var5.next(); 278 AbstractQueue<InstanceInfo> instanceInfoQueue = (AbstractQueue)entries.getValue(); 279 List<InstanceInfo> l = new ArrayList(instanceInfoQueue); 280 if (filterUpInstances) { 281 Iterator it = l.iterator(); 282 283 while(it.hasNext()) { 284 InstanceInfo instanceInfo = (InstanceInfo)it.next(); 285 if (!InstanceStatus.UP.equals(instanceInfo.getStatus())) { 286 it.remove(); 287 } 288 } 289 } 290 291 Collections.shuffle(l); 292 AtomicReference<List<InstanceInfo>> instanceInfoList = (AtomicReference)destMap.get(entries.getKey()); 293 if (instanceInfoList == null) { 294 instanceInfoList = new AtomicReference(l); 295 destMap.put(entries.getKey(), instanceInfoList); 296 } 297 298 instanceInfoList.set(l); 299 vipIndexMap.put(entries.getKey(), new AtomicLong(0L)); 300 } 301 302 Set<String> srcVips = srcMap.keySet(); 303 Set<String> destVips = destMap.keySet(); 304 destVips.retainAll(srcVips); 305 } 306 307 private void addInstanceToMap(InstanceInfo info, String vipAddresses, Map<String, AbstractQueue<InstanceInfo>> vipMap) { 308 if (vipAddresses != null) { 309 String[] vipAddressArray = vipAddresses.split(","); 310 String[] var5 = vipAddressArray; 311 int var6 = vipAddressArray.length; 312 313 for(int var7 = 0; var7 < var6; ++var7) { 314 String vipAddress = var5[var7]; 315 String vipName = vipAddress.toUpperCase(Locale.ROOT); 316 AbstractQueue<InstanceInfo> instanceInfoList = (AbstractQueue)vipMap.get(vipName); 317 if (instanceInfoList == null) { 318 instanceInfoList = new ConcurrentLinkedQueue(); 319 vipMap.put(vipName, instanceInfoList); 320 } 321 322 ((AbstractQueue)instanceInfoList).add(info); 323 } 324 } 325 326 } 327 328 private void addInstancesToVIPMaps(Application app) { 329 Iterator var2 = app.getInstances().iterator(); 330 331 while(true) { 332 InstanceInfo info; 333 String vipAddresses; 334 String secureVipAddresses; 335 do { 336 if (!var2.hasNext()) { 337 return; 338 } 339 340 info = (InstanceInfo)var2.next(); 341 vipAddresses = info.getVIPAddress(); 342 secureVipAddresses = info.getSecureVipAddress(); 343 } while(vipAddresses == null && secureVipAddresses == null); 344 345 this.addInstanceToMap(info, vipAddresses, this.virtualHostNameAppMap); 346 this.addInstanceToMap(info, secureVipAddresses, this.secureVirtualHostNameAppMap); 347 } 348 } 349 350 private static final class Pair { 351 private final String item1; 352 private final String item2; 353 354 public Pair(String item1, String item2) { 355 this.item1 = item1; 356 this.item2 = item2; 357 } 358 359 public int hashCode() { 360 int prime = true; 361 int result = 1; 362 int result = 31 * result + (this.item1 == null ? 0 : this.item1.hashCode()); 363 result = 31 * result + (this.item2 == null ? 0 : this.item2.hashCode()); 364 return result; 365 } 366 367 public boolean equals(Object obj) { 368 if (this == obj) { 369 return true; 370 } else if (obj == null) { 371 return false; 372 } else if (this.getClass() != obj.getClass()) { 373 return false; 374 } else { 375 Applications.Pair other = (Applications.Pair)obj; 376 if (this.item1 == null) { 377 if (other.item1 != null) { 378 return false; 379 } 380 } else if (!this.item1.equals(other.item1)) { 381 return false; 382 } 383 384 if (this.item2 == null) { 385 if (other.item2 != null) { 386 return false; 387 } 388 } else if (!this.item2.equals(other.item2)) { 389 return false; 390 } 391 392 return true; 393 } 394 } 395 396 public String getItem1() { 397 return this.item1; 398 } 399 400 public String getItem2() { 401 return this.item2; 402 } 403 } 404 }
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package com.netflix.discovery.shared; 7 8 import com.fasterxml.jackson.annotation.JsonCreator; 9 import com.fasterxml.jackson.annotation.JsonIgnore; 10 import com.fasterxml.jackson.annotation.JsonProperty; 11 import com.fasterxml.jackson.annotation.JsonRootName; 12 import com.netflix.appinfo.InstanceInfo; 13 import com.netflix.appinfo.InstanceInfo.InstanceStatus; 14 import com.netflix.discovery.EurekaClientConfig; 15 import com.netflix.discovery.InstanceRegionChecker; 16 import com.netflix.discovery.provider.Serializer; 17 import com.netflix.discovery.util.StringCache; 18 import com.thoughtworks.xstream.annotations.XStreamAlias; 19 import com.thoughtworks.xstream.annotations.XStreamImplicit; 20 import com.thoughtworks.xstream.annotations.XStreamOmitField; 21 import java.util.ArrayList; 22 import java.util.Collections; 23 import java.util.Iterator; 24 import java.util.LinkedHashSet; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 import java.util.concurrent.ConcurrentHashMap; 29 import java.util.concurrent.atomic.AtomicReference; 30 import javax.annotation.Nullable; 31 32 @Serializer("com.netflix.discovery.converters.EntityBodyConverter") 33 @XStreamAlias("application") 34 @JsonRootName("application") 35 public class Application { 36 private String name; 37 @XStreamOmitField 38 private volatile boolean isDirty; 39 @XStreamImplicit 40 private final Set<InstanceInfo> instances; 41 private AtomicReference<List<InstanceInfo>> shuffledInstances; 42 private Map<String, InstanceInfo> instancesMap; 43 44 public String toString() { 45 return "Application [name=" + this.name + ", isDirty=" + this.isDirty + ", instances=" + this.instances + ", shuffledInstances=" + this.shuffledInstances + ", instancesMap=" + this.instancesMap + "]"; 46 } 47 48 public Application() { 49 this.isDirty = false; 50 this.shuffledInstances = new AtomicReference(); 51 this.instances = new LinkedHashSet(); 52 this.instancesMap = new ConcurrentHashMap(); 53 } 54 55 public Application(String name) { 56 this.isDirty = false; 57 this.shuffledInstances = new AtomicReference(); 58 this.name = StringCache.intern(name); 59 this.instancesMap = new ConcurrentHashMap(); 60 this.instances = new LinkedHashSet(); 61 } 62 63 @JsonCreator 64 public Application(@JsonProperty("name") String name, @JsonProperty("instance") List<InstanceInfo> instances) { 65 this(name); 66 Iterator var3 = instances.iterator(); 67 68 while(var3.hasNext()) { 69 InstanceInfo instanceInfo = (InstanceInfo)var3.next(); 70 this.addInstance(instanceInfo); 71 } 72 73 } 74 75 public void addInstance(InstanceInfo i) { 76 this.instancesMap.put(i.getId(), i); 77 Set var2 = this.instances; 78 synchronized(this.instances) { 79 this.instances.remove(i); 80 this.instances.add(i); 81 this.isDirty = true; 82 } 83 } 84 85 public void removeInstance(InstanceInfo i) { 86 this.removeInstance(i, true); 87 } 88 89 @JsonProperty("instance") 90 public List<InstanceInfo> getInstances() { 91 return this.shuffledInstances.get() == null ? this.getInstancesAsIsFromEureka() : (List)this.shuffledInstances.get(); 92 } 93 94 @JsonIgnore 95 public List<InstanceInfo> getInstancesAsIsFromEureka() { 96 Set var1 = this.instances; 97 synchronized(this.instances) { 98 return new ArrayList(this.instances); 99 } 100 } 101 102 public InstanceInfo getByInstanceId(String id) { 103 return (InstanceInfo)this.instancesMap.get(id); 104 } 105 106 public String getName() { 107 return this.name; 108 } 109 110 public void setName(String name) { 111 this.name = StringCache.intern(name); 112 } 113 114 public int size() { 115 return this.instances.size(); 116 } 117 118 public void shuffleAndStoreInstances(boolean filterUpInstances) { 119 this._shuffleAndStoreInstances(filterUpInstances, false, (Map)null, (EurekaClientConfig)null, (InstanceRegionChecker)null); 120 } 121 122 public void shuffleAndStoreInstances(Map<String, Applications> remoteRegionsRegistry, EurekaClientConfig clientConfig, InstanceRegionChecker instanceRegionChecker) { 123 this._shuffleAndStoreInstances(clientConfig.shouldFilterOnlyUpInstances(), true, remoteRegionsRegistry, clientConfig, instanceRegionChecker); 124 } 125 126 private void _shuffleAndStoreInstances(boolean filterUpInstances, boolean indexByRemoteRegions, @Nullable Map<String, Applications> remoteRegionsRegistry, @Nullable EurekaClientConfig clientConfig, @Nullable InstanceRegionChecker instanceRegionChecker) { 127 Set var7 = this.instances; 128 ArrayList instanceInfoList; 129 synchronized(this.instances) { 130 instanceInfoList = new ArrayList(this.instances); 131 } 132 133 if (indexByRemoteRegions || filterUpInstances) { 134 Iterator it = instanceInfoList.iterator(); 135 136 label50: 137 while(true) { 138 while(true) { 139 if (!it.hasNext()) { 140 break label50; 141 } 142 143 InstanceInfo instanceInfo = (InstanceInfo)it.next(); 144 if (filterUpInstances && !InstanceStatus.UP.equals(instanceInfo.getStatus())) { 145 it.remove(); 146 } else if (indexByRemoteRegions && null != instanceRegionChecker && null != clientConfig && null != remoteRegionsRegistry) { 147 String instanceRegion = instanceRegionChecker.getInstanceRegion(instanceInfo); 148 if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { 149 Applications appsForRemoteRegion = (Applications)remoteRegionsRegistry.get(instanceRegion); 150 if (null == appsForRemoteRegion) { 151 appsForRemoteRegion = new Applications(); 152 remoteRegionsRegistry.put(instanceRegion, appsForRemoteRegion); 153 } 154 155 Application remoteApp = appsForRemoteRegion.getRegisteredApplications(instanceInfo.getAppName()); 156 if (null == remoteApp) { 157 remoteApp = new Application(instanceInfo.getAppName()); 158 appsForRemoteRegion.addApplication(remoteApp); 159 } 160 161 remoteApp.addInstance(instanceInfo); 162 this.removeInstance(instanceInfo, false); 163 it.remove(); 164 } 165 } 166 } 167 } 168 } 169 170 Collections.shuffle(instanceInfoList); 171 this.shuffledInstances.set(instanceInfoList); 172 } 173 174 private void removeInstance(InstanceInfo i, boolean markAsDirty) { 175 this.instancesMap.remove(i.getId()); 176 Set var3 = this.instances; 177 synchronized(this.instances) { 178 this.instances.remove(i); 179 if (markAsDirty) { 180 this.isDirty = true; 181 } 182 183 } 184 } 185 }
2. 源碼解析
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) 建立並執行一些線程容器
this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build()); this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());
1 1、一個從註冊中心獲取服務列表的任務,頻率:registryFetchIntervalSeconds(默認30s) 2 if (clientConfig.shouldFetchRegistry()) { 3 // registry cache refresh timer 4 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 5 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 6 scheduler.schedule( 7 new TimedSupervisorTask( 8 "cacheRefresh", 9 scheduler, 10 cacheRefreshExecutor, 11 registryFetchIntervalSeconds, 12 TimeUnit.SECONDS, 13 expBackOffBound, 14 new CacheRefreshThread() 15 ), 16 registryFetchIntervalSeconds, TimeUnit.SECONDS); 17 } 18 ``` 19 20 21 22 2、客戶端啓動一個心跳線程,向註冊中心發送心跳,頻率:renewalIntervalInSecs(默認30s) 23 // Heartbeat timer 24 scheduler.schedule( 25 new TimedSupervisorTask( 26 "heartbeat", 27 scheduler, 28 heartbeatExecutor, 29 renewalIntervalInSecs, 30 TimeUnit.SECONDS, 31 expBackOffBound, 32 new HeartbeatThread() 33 ), 34 renewalIntervalInSecs, TimeUnit.SECONDS); 35 ``` 36 37 38 39 40 3、客戶端向註冊中心註冊本身,頻率:InstanceInfoReplicationIntervalSeconds(默認30s)定時的上報本身的信息 。
註冊是爲了定時的向註冊中心上報本身的信息,因此他須要不停的註冊,而不是註冊一次 41 instanceInfoReplicator = new InstanceInfoReplicator( 42 this, 43 instanceInfo, 44 clientConfig.getInstanceInfoReplicationIntervalSeconds(), 45 2); // burstSize 46 ```
(1)客戶端經過http請求將 「服務名稱」+「ip」+"端口號"(instance)發送給服務端(post請求,參數
-- 服務端根據appName、instanceInfo獲取Instance
-- 對比同一instanceInfo節點下的的的節點(服務端存儲的和客戶端新傳過去的)的時間戳誰
1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 2 try { 3 this.read.lock(); 4 Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName()); 5 EurekaMonitors.REGISTER.increment(isReplication); 6 if (gMap == null) { 7 ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap(); 8 gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap); 9 if (gMap == null) { 10 gMap = gNewMap; 11 } 12 } 13 14 Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId()); 15 if (existingLease != null && existingLease.getHolder() != null) { 16 Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp(); 17 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 18 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 19 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 20 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 21 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 22 registrant = (InstanceInfo)existingLease.getHolder(); 23 } 24 } else { 25 Object var6 = this.lock; 26 synchronized(this.lock) { 27 if (this.expectedNumberOfRenewsPerMin > 0) { 28 this.expectedNumberOfRenewsPerMin += 2; 29 this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold()); 30 } 31 } 32 33 logger.debug("No previous lease information found; it is new registration"); 34 } 35 36 Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration); 37 if (existingLease != null) { 38 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 39 } 40 41 ((Map)gMap).put(registrant.getId(), lease); 42 AbstractInstanceRegistry.CircularQueue var20 = this.recentRegisteredQueue; 43 synchronized(this.recentRegisteredQueue) { 44 this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); 45 } 46 47 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 48 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId()); 49 if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) { 50 logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 51 this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 52 } 53 } 54 55 InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId()); 56 if (overriddenStatusFromMap != null) { 57 logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 58 registrant.setOverriddenStatus(overriddenStatusFromMap); 59 } 60 61 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication); 62 registrant.setStatusWithoutDirty(overriddenInstanceStatus); 63 if (InstanceStatus.UP.equals(registrant.getStatus())) { 64 lease.serviceUp(); 65 } 66 67 registrant.setActionType(ActionType.ADDED); 68 this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease)); 69 registrant.setLastUpdatedTimestamp(); 70 this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 71 logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication}); 72 } finally { 73 this.read.unlock(); 74 } 75 76 }
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; EurekaHttpResponse var10; try { WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); this.addExtraHeaders(requestBuilder); response = (ClientResponse)requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } var10 = eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", new Object[]{this.serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()}); } if (response != null) { response.close(); } } return var10; }
-- 判斷實例是否存在,若是存在就進行續約,若是不存在續約失敗
-- 續約(更新時間戳lastUpdateTimestamp)
public void renew() { this.lastUpdateTimestamp = System.currentTimeMillis() + this.duration; }
public void cancel() { if (this.evictionTimestamp <= 0L) { this.evictionTimestamp = System.currentTimeMillis(); } }
public boolean isExpired(long additionalLeaseMs) { return this.evictionTimestamp > 0L || System.currentTimeMillis() > this.lastUpdateTimestamp + this.duration + additionalLeaseMs; }
二 Ribbon
1. 一些分析
1 @Bean 2 @ConditionalOnMissingBean 3 public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) { 4 return new RestTemplateCustomizer() { 5 public void customize(RestTemplate restTemplate) { 6 List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors()); 7 list.add(loadBalancerInterceptor); 8 restTemplate.setInterceptors(list); 9 } 10 }; 11 }
LoadBalancerInterceptor 實際上 是一個 ClientHttpRequestInterceptor,源代碼,
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package org.springframework.cloud.client.loadbalancer; 7 8 import java.io.IOException; 9 import java.net.URI; 10 import org.springframework.http.HttpRequest; 11 import org.springframework.http.client.ClientHttpRequestExecution; 12 import org.springframework.http.client.ClientHttpRequestInterceptor; 13 import org.springframework.http.client.ClientHttpResponse; 14 import org.springframework.util.Assert; 15 16 public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { 17 private LoadBalancerClient loadBalancer; 18 private LoadBalancerRequestFactory requestFactory; 19 20 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { 21 this.loadBalancer = loadBalancer; 22 this.requestFactory = requestFactory; 23 } 24 25 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { 26 this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); 27 } 28 29 public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { 30 URI originalUri = request.getURI(); 31 String serviceName = originalUri.getHost(); 32 Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); 33 return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); 34 } 35 }
1 public ClientHttpResponse execute(HttpRequest request, final byte[] body) throws IOException { 2 if (this.iterator.hasNext()) { 3 ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next(); 4 return nextInterceptor.intercept(request, body, this); 5 } else { 6 ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), request.getMethod()); 7 Iterator var4 = request.getHeaders().entrySet().iterator(); 8 9 while(var4.hasNext()) { 10 Entry<String, List<String>> entry = (Entry)var4.next(); 11 List<String> values = (List)entry.getValue(); 12 Iterator var7 = values.iterator(); 13 14 while(var7.hasNext()) { 15 String value = (String)var7.next(); 16 delegate.getHeaders().add((String)entry.getKey(), value); 17 } 18 } 19 20 if (body.length > 0) { 21 if (delegate instanceof StreamingHttpOutputMessage) { 22 StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate; 23 streamingOutputMessage.setBody(new Body() { 24 public void writeTo(OutputStream outputStream) throws IOException { 25 StreamUtils.copy(body, outputStream); 26 } 27 }); 28 } else { 29 StreamUtils.copy(body, delegate.getBody()); 30 } 31 } 32 33 return delegate.execute(); 34 } 35 }
行代碼是ClientHttpRequestInterceptor.java中的public ClientHttpResponse intercept(
HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException
```properties DynamicServerListLoadBalancer:{ NFLoadBalancer: name = eureka - provider, current list of Servers = [localhost: 8072, localhost: 8071], Load balancer stats = Zone stats: { unknown = [Zone: unknown;Instance count: 2;Active connections count: 0;Circuit breaker tripped count: 0;Active connections per server: 0.0;] }, Server stats: [ [Server: localhost: 8072;Zone: UNKNOWN;Total Requests: 0;Successive connection failure: 0;Total blackout seconds: 0;Last connection made: Thu Jan 01 08: 00: 00 CST 1970;First connection made: Thu Jan 01 08: 00: 00 CST 1970;Active Connections: 0;total failure count in last(1000) msecs: 0;average resp time: 0.0;90 percentile resp time: 0.0;95 percentile resp time: 0.0;min resp time: 0.0;max resp time: 0.0;stddev resp time: 0.0], [Server: localhost: 8071;Zone: UNKNOWN;Total Requests: 0;Successive connection failure: 0;Total blackout seconds: 0;Last connection made: Thu Jan 01 08: 00: 00 CST 1970;First connection made: Thu Jan 01 08: 00: 00 CST 1970;Active Connections: 0;total failure count in last(1000) msecs: 0;average resp time: 0.0;90 percentile resp time: 0.0;95 percentile resp time: 0.0;min resp time: 0.0;max resp time: 0.0;stddev resp time: 0.0] ] }ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@5353dd09
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId); Server server = this.getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } else { RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server)); return this.execute(serviceId, ribbonServer, request); } }
public Server chooseServer(Object key) { if (this.counter == null) { this.counter = this.createCounter(); } this.counter.increment(); if (this.rule == null) { return null; } else { try { return this.rule.choose(key); } catch (Exception var3) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3}); return null; } } }