Spring Cloud Eureka 全解 (3) - 核心流程-服務註冊與取消詳解

本文基於SpringCloud-Dalston.SR5node

關於服務註冊

開啓/關閉服務註冊配置:eureka.client.register-with-eureka = true (默認)spring

何時註冊?

  1. 應用第一次啓動時,初始化EurekaClient時,應用狀態改變:從STARTING變爲UP會觸發這個Listener,調用instanceInfoReplicator.onDemandUpdate(); 能夠推測出,實例狀態改變時,也會經過註冊接口更新實例狀態信息
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};
  1. 定時任務,若是InstanceInfo發生改變,也會經過註冊接口更新信息
public void run() {
    try {
        discoveryClient.refreshInstanceInfo();
        //若是實例信息發生改變,則須要調用register更新InstanceInfo
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}
  1. 在定時renew時,若是renew接口返回404(表明這個實例在EurekaServer上面找不到),多是以前註冊失敗或者註冊過時致使的。這時須要調用register從新註冊
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        //若是renew接口返回404(表明這個實例在EurekaServer上面找不到),多是以前註冊失敗或者註冊過時致使的
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

向Eureka發送註冊請求EurekaServer發生了什麼?

主要有兩個存儲,一個是以前提到過的registry,還有一個最近變化隊列,後面咱們會知道,這個最近變化隊列裏面就是客戶端獲取增量實例信息的內容:緩存

# 總體註冊信息緩存
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
# 最近變化隊列
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

EurekaServer收到實例註冊主要分兩步:app

  • 調用父類方法註冊
  • 同步到其餘EurekaServer實例
public void register(InstanceInfo info, boolean isReplication) {
    int leaseDuration = 90;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //調用父類方法註冊
    super.register(info, leaseDuration, isReplication);
    //同步到其餘EurekaServer實例
    this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}

咱們先看同步到其餘EurekaServer實例ide

其實就是,註冊到的EurekaServer再依次調用其餘集羣內的EurekaServer的Register方法將實例信息同步過去this

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

而後看看調用父類方法註冊:url

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        //register雖然看上去好像是修改,可是這裏用的是讀鎖,後面會解釋
        read.lock();
        //從registry中查看這個app是否存在
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        //不存在就建立
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //查看這個app的這個實例是否已存在
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        
        if (existingLease != null && (existingLease.getHolder() != null)) {
            //若是已存在,對比時間戳,保留比較新的實例信息......
        } else {
            // 若是不存在,證實是一個新的實例
            //更新自我保護監控變量的值的代碼.....
            
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        //放入registry
        gMap.put(registrant.getId(), lease);
        
        //加入最近修改的記錄隊列
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        //初始化狀態,記錄時間等相關代碼......
        
        //主動讓Response緩存失效
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

總結起來,就是主要三件事:pwa

1.將實例註冊信息放入或者更新registrydebug

2.將實例註冊信息加入最近修改的記錄隊列code

3.主動讓Response緩存失效

咱們來類比下服務取消

服務取消CANCEL

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        //cancel雖然看上去好像是修改,可是這裏用的是讀鎖,後面會解釋
        read.lock();
        
        //從registry中剔除這個實例
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        if (leaseToCancel == null) {
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            //改變狀態,記錄狀態修改時間等相關代碼......
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                //加入最近修改的記錄隊列
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
            }
            //主動讓Response緩存失效
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            return true;
        }
    } finally {
        read.unlock();
    }
}

總結起來,也是主要三件事:

1.從registry中剔除這個實例

2.將實例註冊信息加入最近修改的記錄隊列

3.主動讓Response緩存失效

這裏咱們注意到了這個最近修改隊列,咱們來詳細看看

最近修改隊列

這個最近修改隊列和消費者定時獲取服務實例列表有着密切的關係

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

這個RetentionTimeInMSInDeltaQueue默認是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默認是180s,官網寫錯了),能夠看出這個隊列是一個長度爲180s的滑動窗口,保存最近180s之內的應用實例信息修改,後面咱們會看到,客戶端調用獲取增量信息,實際上就是從這個queue中讀取,因此可能一段時間內讀取到的信息都是同樣的。

相關文章
相關標籤/搜索