本文主要研究一下eureka的PeerAwareInstanceRegistryImpljava
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { //...... @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } //...... }
這裏主要關注PeerAwareInstanceRegistry,EurekaClient的配置見EurekaClientAutoConfiguration
spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/server/InstanceRegistry.javanode
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware { //...... private void handleCancelation(String appName, String id, boolean isReplication) { log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication); publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication)); } private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) { log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication); publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication)); } private void log(String message) { if (log.isDebugEnabled()) { log.debug(message); } } private void publishEvent(ApplicationEvent applicationEvent) { this.ctxt.publishEvent(applicationEvent); } }
這個繼承了PeerAwareInstanceRegistryImpl,重寫方法主要是在執行以前發佈相關的事件
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javaweb
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { //...... }
這個類實現了InstanceRegistry的openForTraffic、shutdown、statusUpdate、deleteStatusOverride等方法,以及PeerAwareInstanceRegistry的init、syncUp、shouldAllowAccess、register(InstanceInfo info, boolean isReplication)、statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication)等方法。
而AbstractInstanceRegistry主要是實現了LeaseManager的關於register、cancel、renew、evict等基本操做,以及LookupService的一些查詢操做
PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基礎上,新增了peer相關的處理以及RenewalThreshold的更新
以下圖所示,PeerAwareInstanceRegistryImpl重寫了cancel、register、renew、statusUpdate、deleteStatusOverride方法,首先調用super的操做,而後調用replicateToPeersspring
/** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. * */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
這個方法就是把instance信息複製給eureka server peers。其中isReplication用來標識此次請求是否是其餘節點複製過來的。
能夠看到若是peerEurekaNodes爲空,或者isReplication爲true的話,則不繼續往下
/** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
這裏根據不一樣的Action來調用PeerEurekaNode的不一樣方法
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.javasegmentfault
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.javaapp
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) { HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl); String targetHost = hostFromUrl(peerEurekaNodeUrl); if (targetHost == null) { targetHost = "host"; } return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig); }
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/transport/JerseyReplicationClient.javaide
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient { private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class); private final EurekaJerseyClient jerseyClient; private final ApacheHttpClient4 jerseyApacheClient; public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) { super(jerseyClient.getClient(), serviceUrl); this.jerseyClient = jerseyClient; this.jerseyApacheClient = jerseyClient.getClient(); } @Override protected void addExtraHeaders(Builder webResource) { webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); } //...... }
這裏顯示設置了PeerEurekaNode.HEADER_REPLICATION的值爲true,也就是標識從這個client觸發的請求都是屬於replication行爲的,告訴目標eureka server不用再replicate避免死循環。
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.javaui
/** * Handles cancellation of leases for this particular instance. * * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. * @return response indicating whether the operation was a success or * failure. */ @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); return Response.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } }
這裏以cancel爲例,再串一下replication。這裏能夠發現把PeerEurekaNode.HEADER_REPLICATION的值傳遞到registry.cancel方法,而這個registry就是PeerAwareInstanceRegistryImpl,這樣就循環串起來了。
PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基礎上,新增了peer相關的處理以及RenewalThreshold的更新。replicate給其餘peer的時候傳遞了PeerEurekaNode.HEADER_REPLICATION爲true的header,表示這個是replicate請求,這樣接收方就不會再replicate給他的peer。this