聊聊eureka的PeerAwareInstanceRegistryImpl

本文主要研究一下eureka的PeerAwareInstanceRegistryImpljava

EurekaServerAutoConfiguration

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
        InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //......
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
            ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
            PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
            EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }
    //......
}
這裏主要關注PeerAwareInstanceRegistry,EurekaClient的配置見EurekaClientAutoConfiguration

InstanceRegistry

spring-cloud-netflix-eureka-server-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/server/InstanceRegistry.javanode

public class InstanceRegistry extends PeerAwareInstanceRegistryImpl
        implements ApplicationContextAware {
        //......
    private void handleCancelation(String appName, String id, boolean isReplication) {
        log("cancel " + appName + ", serverId " + id + ", isReplication " + isReplication);
        publishEvent(new EurekaInstanceCanceledEvent(this, appName, id, isReplication));
    }

    private void handleRegistration(InstanceInfo info, int leaseDuration,
            boolean isReplication) {
        log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                + ", leaseDuration " + leaseDuration + ", isReplication "
                + isReplication);
        publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
                isReplication));
    }
        
    private void log(String message) {
        if (log.isDebugEnabled()) {
            log.debug(message);
        }
    }

    private void publishEvent(ApplicationEvent applicationEvent) {
        this.ctxt.publishEvent(applicationEvent);
    }
}
這個繼承了PeerAwareInstanceRegistryImpl,重寫方法主要是在執行以前發佈相關的事件

PeerAwareInstanceRegistryImpl

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javaweb

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    //......
}
這個類實現了InstanceRegistry的openForTraffic、shutdown、statusUpdate、deleteStatusOverride等方法,以及PeerAwareInstanceRegistry的init、syncUp、shouldAllowAccess、register(InstanceInfo info, boolean isReplication)、statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication)等方法。
而AbstractInstanceRegistry主要是實現了LeaseManager的關於register、cancel、renew、evict等基本操做,以及LookupService的一些查詢操做
PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基礎上,新增了peer相關的處理以及RenewalThreshold的更新

replicateToPeers

以下圖所示,PeerAwareInstanceRegistryImpl重寫了cancel、register、renew、statusUpdate、deleteStatusOverride方法,首先調用super的操做,而後調用replicateToPeersspring

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
這個方法就是把instance信息複製給eureka server peers。其中isReplication用來標識此次請求是否是其餘節點複製過來的。
能夠看到若是peerEurekaNodes爲空,或者isReplication爲true的話,則不繼續往下

replicateInstanceActionsToPeers

/**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }
這裏根據不一樣的Action來調用PeerEurekaNode的不一樣方法

PeerEurekaNode

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.javasegmentfault

HttpReplicationClient

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.javaapp

protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/transport/JerseyReplicationClient.javaide

public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {

    private static final Logger logger = LoggerFactory.getLogger(JerseyReplicationClient.class);

    private final EurekaJerseyClient jerseyClient;
    private final ApacheHttpClient4 jerseyApacheClient;

    public JerseyReplicationClient(EurekaJerseyClient jerseyClient, String serviceUrl) {
        super(jerseyClient.getClient(), serviceUrl);
        this.jerseyClient = jerseyClient;
        this.jerseyApacheClient = jerseyClient.getClient();
    }

    @Override
    protected void addExtraHeaders(Builder webResource) {
        webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }

//......
}
這裏顯示設置了PeerEurekaNode.HEADER_REPLICATION的值爲true,也就是標識從這個client觸發的請求都是屬於replication行爲的,告訴目標eureka server不用再replicate避免死循環。

InstanceResource

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.javaui

/**
     * Handles cancellation of leases for this particular instance.
     *
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
     * @return response indicating whether the operation was a success or
     *         failure.
     */
    @DELETE
    public Response cancelLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        try {
            boolean isSuccess = registry.cancel(app.getName(), id,
                "true".equals(isReplication));

            if (isSuccess) {
                logger.debug("Found (Cancel): {} - {}", app.getName(), id);
                return Response.ok().build();
            } else {
                logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
                return Response.status(Status.NOT_FOUND).build();
            }
        } catch (Throwable e) {
            logger.error("Error (cancel): {} - {}", app.getName(), id, e);
            return Response.serverError().build();
        }

    }
這裏以cancel爲例,再串一下replication。這裏能夠發現把PeerEurekaNode.HEADER_REPLICATION的值傳遞到registry.cancel方法,而這個registry就是PeerAwareInstanceRegistryImpl,這樣就循環串起來了。

小結

PeerAwareInstanceRegistryImpl主要是在AbstractInstanceRegistry的基礎上,新增了peer相關的處理以及RenewalThreshold的更新。replicate給其餘peer的時候傳遞了PeerEurekaNode.HEADER_REPLICATION爲true的header,表示這個是replicate請求,這樣接收方就不會再replicate給他的peer。this

doc

相關文章
相關標籤/搜索