Eureka Server Replicate

      爲了方便說明,就把上篇博客的圖再貼一遍了。java

      上篇說道Application Service向Eureka Server註冊服務的過程,在完成註冊以後,因爲Eureka Server是對等集羣,其餘Server也須要同步這一註冊信息。與zookeeper相比,Eureka並不追求很強的一致性,而是認爲A(可用性)和P(分區容錯性)更重要。node

      基於此,Eureka採用複製的方式進行註冊信息的同步。app



一、在完成註冊方法以後,進行復制。ide

PeerAwareInstanceRegistryImpl類
this

 public void register(final InstanceInfo info, final boolean isReplication) {        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }


二、具體的複製步驟url

複製方法,複製eureka的全部action操做除了流量。
 private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // 若是已經複製過,就再也不復制
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
		//遍歷eureka集羣中的全部節點,進行復制操做
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }



Tips:

        Eureka Server的複製不會進行第二次,能夠參考上面的代碼。spa

        判斷isReplication的值,若是複製過,則再也不復制。若是沒有複製過,遍歷集羣中的node節點個數(即Eureka Server集羣),依次進行復制操做。pwa


具體的複製action,包括取消、註冊、心跳、狀態更新等。code

 private void replicateInstanceActionsToPeers(Action action, String appName,                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }


思考:
         peerEurekaNodes 如何獲取集羣中的全部 node 節點?

首先,在PeerAwareInstanceRegistryImpl類中的init方法中找到了peerEurekaNodes的賦值。orm

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        initializedResponseCache();
        scheduleRenewalThresholdUpdateTask();
        initRemoteRegionRegistry();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

在其父類中AwsInstanceRegistry也有其實現。

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        super.init(peerEurekaNodes);
        this.awsAsgUtil = new AwsAsgUtil(serverConfig, clientConfig, this);
        // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
        // then we see if our ASG is UP, then we check the status of a potentially existing lease.
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
                new OverrideExistsRule(overriddenInstanceStatusMap), new AsgEnabledRule(this.awsAsgUtil),
                new LeaseExistsRule());
    }

繼續往上找,DefaultEurekaServerContext類中,

 @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }

最後,EurekaBootStrap中進行了調用。

 protected void initEurekaServerContext() throws Exception {
  
	//省略配置信息
        logger.info("Initializing the eureka client...");
  	
	//建立node組
        PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClientConfig,
                serverCodecs,
                applicationInfoManager
        );

        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );

        EurekaServerContextHolder.initialize(serverContext);

        serverContext.initialize();
        logger.info("Initialized server context");

        // Copy registry from neighboring eureka node
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);

        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }
相關文章
相關標籤/搜索