Spring Cloud Eureka源碼分析---服務註冊

本篇咱們着重分析Eureka服務端的邏輯實現,主要涉及到服務的註冊流程分析。java

在Eureka的服務治理中,會涉及到下面一些概念:算法

服務註冊:Eureka Client會經過發送REST請求的方式向Eureka Server註冊本身的服務,提供自身的元數據,好比 IP 地址、端口、運行情況指標的URL、主頁地址等信息。Eureka Server接收到註冊請求後,就會把這些元數據信息存儲在一個ConcurrentHashMap中。spring

服務續約:在服務註冊後,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處於可用狀態,防止被剔除。Eureka Client在默認的狀況下會每隔30秒發送一次心跳來進行服務續約。json

服務同步:Eureka Server之間會互相進行註冊,構建Eureka Server集羣,不一樣Eureka Server之間會進行服務同步,用來保證服務信息的一致性。緩存

獲取服務:服務消費者(Eureka Client)在啓動的時候,會發送一個REST請求給Eureka Server,獲取上面註冊的服務清單,而且緩存在Eureka Client本地,默認緩存30秒。同時,爲了性能考慮,Eureka Server也會維護一份只讀的服務清單緩存,該緩存每隔30秒更新一次。服務器

服務調用:服務消費者在獲取到服務清單後,就能夠根據清單中的服務列表信息,查找到其餘服務的地址,從而進行遠程調用。Eureka有Region和Zone的概念,一個Region能夠包含多個Zone,在進行服務調用時,優先訪問處於同一個Zone中的服務提供者。網絡

服務下線:當Eureka Client須要關閉或重啓時,就不但願在這個時間段內再有請求進來,因此,就須要提早先發送REST請求給Eureka Server,告訴Eureka Server本身要下線了,Eureka Server在收到請求後,就會把該服務狀態置爲下線(DOWN),並把該下線事件傳播出去。app

服務剔除:有時候,服務實例可能會由於網絡故障等緣由致使不能提供服務,而此時該實例也沒有發送請求給Eureka Server來進行服務下線,因此,還須要有服務剔除的機制。Eureka Server在啓動的時候會建立一個定時任務,每隔一段時間(默認60秒),從當前服務清單中把超時沒有續約(默認90秒)的服務剔除。負載均衡

自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網絡一段時間內發生了異常,全部的服務都沒可以進行續約,Eureka Server就把全部的服務都剔除了,這樣顯然不太合理。因此,就有了自我保護機制,當短期內,統計續約失敗的比例,若是達到必定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常後,再退出自我保護機制。ide

1. 基本原理:

  1. Eureka Server 提供服務註冊服務,各個節點啓動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務註冊表中將會存儲全部可用服務節點的信息,服務節點的信息能夠在界面中直觀的看到;
  2. Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的交互,客戶端同時也具有一個內置的、使用輪詢負載算法的負載均衡器;
  3. 在應用啓動後,將會向Eureka Server發送心跳(默認週期爲30秒),若是Eureka Server在多個心跳週期沒有收到某個節點的心跳,Eureka Server 將會從服務註冊表中把這個服務節點移除(默認90秒);
  4. Eureka Server之間將會經過複製的方式完成數據的同步;
  5. Eureka Client具備緩存的機制,即便全部的Eureka Server 都掛掉的話,客戶端依然能夠利用緩存中的信息消費其它服務的API;

上一篇中咱們搭建了一個簡單的Eureka客戶端和服務端。若是你有啓動過觀看啓動日誌不難發現:

這裏有個EurekaServerBootstrap類,啓動日誌中給出:Setting the eureka configuration..,Initialized server context。看起來這個應該是個啓動類,跟進去看一下,有個很顯眼的方法:

這個方法的調用先按住不表,咱們先從啓動類上添加的 EnableEurekaServer註解着手,看看爲何添加了一個註解就能激活 Rureka。

從server啓動類上的EnableEurekaServer註解進入:

  1. 接下來引用了EurekaServerMarkerConfiguration,看到在這個註解上有個註釋:啓用這個註解的目的是爲了激活:EurekaServerAutoConfiguration類;

  2. 進入EurekaServerAutoConfiguration看到在類頭部有一個註解:

    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)

    EurekaServerAutoConfiguration啓動的條件是EurekaServerMarkerConfiguration註解先加載。

上面這一張圖標識出了從啓動註解到預啓動類的流程,可是你會發現實際上 EurekaServerAutoConfiguration 也沒有作什麼事情:配置初始化,啓動一些基本的過濾器。一樣在類頭部的引用上有一個Import註解:

@Import(EurekaServerInitializerConfiguration.class)

因此在 EurekaServerAutoConfiguration 初始化的時候,會引用到 EurekaServerInitializerConfiguration,激活它的初始化。EurekaServerInitializerConfiguration 實現了SmartLifecycle.start方法,在spring 初始化的時候會被啓動,激活 run 方法。能夠看到在 run 方法中調用的就是:

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);

即咱們上面截圖中的EurekaServerBootstrap.contextInitialized()方法。

總體的調用流程以下:

具體的初始化信息見下圖:

2. 服務註冊實現

2.1 server端啓動時同步別的server上的client

在上面講到Eureka server啓動過程當中,啓動一個Eureka Client的時候,initEurekaServerContext()裏面會進行服務同步和服務剔除,syncUp()方法所屬的類是:PeerAwareInstanceRegistry,即server端的服務註冊邏輯都在這裏面。由於沒有使用AWS的服務器,因此默認實例化的實現類爲:PeerAwareInstanceRegistryImpl。

PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
    registry = new AwsInstanceRegistry(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
    awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
    awsBinder.start();
} else {
    registry = new PeerAwareInstanceRegistryImpl(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
}

PeerAwareInstanceRegistryImpl 繼承了一個抽象類 AbstractInstanceRegistry:

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    
}

AbstractInstanceRegistry中的實現邏輯是真正的服務註冊存儲所在地:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);

    private static final String[] EMPTY_STR_ARRAY = new String[0];
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
    protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
            .newBuilder().initialCapacity(500)
            .expireAfterAccess(1, TimeUnit.HOURS)
            .<String, InstanceStatus>build().asMap();

    
 ....
 ....
 ....
}

全部的服務實例信息都保存在 server 本地的map當中。因此在server端啓動的時候會去拉別的server上存儲的client實例,而後存儲到本地緩存。

2.2 client主動註冊

若是是某個client主動發出了註冊請求,那麼是如何註冊到服務端呢?

仍是查看日誌:啓動服務端,而後再啓動客戶端,查看服務端日誌:

這裏能看到剛纔啓動的客戶端已經在服務端註冊了,註冊邏輯走的類是:AbstractInstanceRegistry。

上面也提到 是服務註冊的邏輯實現類,完成保存客戶端信息的方法是:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        ......
    }

代碼就不貼了,主要實現的邏輯是保存當前註冊的客戶端信息。咱們知道客戶端是發送了一次http請求給服務端,那麼真正的註冊邏輯應該是從一個http請求的接收處進來的。跟着使用了register方法的地方去找,PeerAwareInstanceRegistryImpl裏面有調用:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    //將新節點信息告訴別的服務端
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

這裏沒有改寫父類的register邏輯,下面還多了一句:replicateToPeers,這裏主要作的邏輯是:給兄弟 server節點發送register 請求,告訴他們有客戶端來註冊。

繼續看誰調用了這裏,能夠找到:ApplicationResource 的addInstance方法調用了:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
......
  registry.register(info, "true".equals(isReplication));
  return Response.status(204).build();  // 204 to be backwards compatible
}

而這裏很顯然是一個接口,邏輯就很清晰了:

另外,咱們查看addInstance方法被誰調用的過程當中發現:PeerReplicationResource--->batchReplication 方法也調用了註冊的邏輯。

這個方法一看居然解答了以前個人疑惑:服務端之間是如何發送心跳的。原來實現是在這裏。經過dispatch方法來區分當前的調用是何種請求,

能夠看到,服務註冊,心跳檢測,服務取消,服務下線,服務剔除的入口都在這裏:

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                logger.error("{} request processing failed for batch item {}/{}",
                             instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
            }
        }
        return Response.ok(batchResponse).build();
    } catch (Throwable e) {
        logger.error("Cannot execute batch Request", e);
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
}


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        ApplicationResource applicationResource = createApplicationResource(instanceInfo);
        InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);

        String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
        String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
        String instanceStatus = toString(instanceInfo.getStatus());

        Builder singleResponseBuilder = new Builder();
        switch (instanceInfo.getAction()) {
            case Register:
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat:
                singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel:
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate:
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }

從這個入口進去,你們能夠跟蹤一下感興趣的邏輯。

相關文章
相關標籤/搜索