Eureka客戶端續約及服務端過時租約清理源碼解析

在以前的文章:EurekaClient自動裝配及啓動流程解析中,咱們提到了在構造DiscoveryClient時除了包含註冊流程以外,還調度了一個心跳線程:app

scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

其中HeartbeatThread線程以下:dom

private class HeartbeatThread implements Runnable {

        public void run() {
        //續約
            if (renew()) {
			  //續約成功時間戳更新
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

 boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
		  //發送續約請求
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
			  //從新註冊
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

這裏直接發出了續約請求,若是續約請求失敗則會嘗試再次去註冊ide

服務端接受續約請求

服務端接受續約請求的Controller在InstanceResource類中post

@PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
	  //續約
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

        // 續約失敗
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // 校驗客戶端與服務端的時間差別,若是存在問題則須要從新發起註冊
        Response response = null;
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }

能夠看到續約以後還有一個檢查時間差的問題,這個不詳細展開,繼續往下看續約的相關信息ui

public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
		  //集羣同步
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

這裏集羣同步的相關內容在以前的文章已經說過了,再也不展開,續約的核心處理在下面this

public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
   //獲取已存在的租約
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
   //租約不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
		  //獲取客戶端
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
		  //設置客戶端的狀態
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
				  //覆蓋當前狀態
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
		  //設置租約最後更新時間
            leaseToRenew.renew();
            return true;
        }
    }

對於看過以前文章的同窗來講總體流程比較簡單線程

服務端過時租約清理

在文章Eureka應用註冊與集羣數據同步源碼解析一文中你們應該對下面這行代碼比較熟悉debug

int registryCount = registry.syncUp();

上面這行代碼發起了集羣數據同步,而緊接着這行代碼的就是服務端的過時租約清理邏輯設計

registry.openForTraffic(applicationInfoManager, registryCount);

openForTraffic方法的最後調用了一個方法postInit,而在postInit方法中啓動了一個線程EvictionTask,這個線程就負責清理已通過期的租約code

evictionTimer.schedule(evictionTaskRef.get(),       
serverConfig.getEvictionIntervalTimerInMs(), 
serverConfig.getEvictionIntervalTimerInMs());

看一下這個線程

class EvictionTask extends TimerTask {

   @Override
   public void run() {
       try {
           //補償時間毫秒數
           long compensationTimeMs = getCompensationTimeMs();
           logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
           // 清理邏輯
           evict(compensationTimeMs);
       } catch (Throwable e) {
           logger.error("Could not run the evict task", e);
       }
   }

}

其中補償時間的獲取是這樣的:

long getCompensationTimeMs() {
            long currNanos = getCurrentTimeNano();
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }

            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            //當前時間 - 最後任務執行時間 - 任務執行頻率
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }

接着看清理的核心邏輯

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");

        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // 1. 得到全部的過時租約
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // 2. 計算容許清理的數量
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        
        // 3. 過時
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

整個過時的執行過程主要分爲如下3個步驟:

  1. 得到全部的過時租約 過時租約的計算方法爲isExpired
public boolean isExpired(long additionalLeaseMs) {    
    return (evictionTimestamp > 0 || System.currentTimeMillis() > 
(lastUpdateTimestamp + duration + additionalLeaseMs));
}

服務下線時間>0||當前時間>(最後更新時間+租約持續時間+補償時間)

  1. 計算容許清理的數量 getRenewalPercentThreshold()默認值爲0.85,也是就說默認狀況下每次清理最大容許過時數量和15%的全部註冊數量二者之間的最小值
  2. 過時 過時的清理是隨機進行的,這樣設計也是爲了不單個應用所有過時的。 過時的處理則和註冊的處理正好是相反的:
protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
            synchronized (recentCanceledQueue) {
                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            }
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                return true;
            }
        } finally {
            read.unlock();
        }
    }

1

相關文章
相關標籤/搜索