本篇咱們着重分析Eureka服務端的邏輯實現,主要涉及到服務的註冊流程分析。java
在Eureka的服務治理中,會涉及到下面一些概念:算法
服務註冊:Eureka Client會經過發送REST請求的方式向Eureka Server註冊本身的服務,提供自身的元數據,好比 IP 地址、端口、運行情況指標的URL、主頁地址等信息。Eureka Server接收到註冊請求後,就會把這些元數據信息存儲在一個ConcurrentHashMap中。spring
服務續約:在服務註冊後,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處於可用狀態,防止被剔除。Eureka Client在默認的狀況下會每隔30秒發送一次心跳來進行服務續約。json
服務同步:Eureka Server之間會互相進行註冊,構建Eureka Server集羣,不一樣Eureka Server之間會進行服務同步,用來保證服務信息的一致性。緩存
獲取服務:服務消費者(Eureka Client)在啓動的時候,會發送一個REST請求給Eureka Server,獲取上面註冊的服務清單,而且緩存在Eureka Client本地,默認緩存30秒。同時,爲了性能考慮,Eureka Server也會維護一份只讀的服務清單緩存,該緩存每隔30秒更新一次。服務器
服務調用:服務消費者在獲取到服務清單後,就能夠根據清單中的服務列表信息,查找到其餘服務的地址,從而進行遠程調用。Eureka有Region和Zone的概念,一個Region能夠包含多個Zone,在進行服務調用時,優先訪問處於同一個Zone中的服務提供者。網絡
服務下線:當Eureka Client須要關閉或重啓時,就不但願在這個時間段內再有請求進來,因此,就須要提早先發送REST請求給Eureka Server,告訴Eureka Server本身要下線了,Eureka Server在收到請求後,就會把該服務狀態置爲下線(DOWN),並把該下線事件傳播出去。app
服務剔除:有時候,服務實例可能會由於網絡故障等緣由致使不能提供服務,而此時該實例也沒有發送請求給Eureka Server來進行服務下線,因此,還須要有服務剔除的機制。Eureka Server在啓動的時候會建立一個定時任務,每隔一段時間(默認60秒),從當前服務清單中把超時沒有續約(默認90秒)的服務剔除。負載均衡
自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網絡一段時間內發生了異常,全部的服務都沒可以進行續約,Eureka Server就把全部的服務都剔除了,這樣顯然不太合理。因此,就有了自我保護機制,當短期內,統計續約失敗的比例,若是達到必定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常後,再退出自我保護機制。ide
上一篇中咱們搭建了一個簡單的Eureka客戶端和服務端。若是你有啓動過觀看啓動日誌不難發現:
這裏有個EurekaServerBootstrap
類,啓動日誌中給出:Setting the eureka configuration..,Initialized server context
。看起來這個應該是個啓動類,跟進去看一下,有個很顯眼的方法:
這個方法的調用先按住不表,咱們先從啓動類上添加的 EnableEurekaServer
註解着手,看看爲何添加了一個註解就能激活 Rureka。
從server啓動類上的EnableEurekaServer
註解進入:
接下來引用了EurekaServerMarkerConfiguration
,看到在這個註解上有個註釋:啓用這個註解的目的是爲了激活:EurekaServerAutoConfiguration類;
進入EurekaServerAutoConfiguration看到在類頭部有一個註解:
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
即EurekaServerAutoConfiguration
啓動的條件是EurekaServerMarkerConfiguration
註解先加載。
上面這一張圖標識出了從啓動註解到預啓動類的流程,可是你會發現實際上 EurekaServerAutoConfiguration 也沒有作什麼事情:配置初始化,啓動一些基本的過濾器。一樣在類頭部的引用上有一個Import註解:
@Import(EurekaServerInitializerConfiguration.class)
因此在 EurekaServerAutoConfiguration 初始化的時候,會引用到 EurekaServerInitializerConfiguration,激活它的初始化。EurekaServerInitializerConfiguration 實現了SmartLifecycle.start方法,在spring 初始化的時候會被啓動,激活 run 方法。能夠看到在 run 方法中調用的就是:
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
即咱們上面截圖中的EurekaServerBootstrap.contextInitialized()
方法。
總體的調用流程以下:
具體的初始化信息見下圖:
在上面講到Eureka server啓動過程當中,啓動一個Eureka Client的時候,initEurekaServerContext()
裏面會進行服務同步和服務剔除,syncUp()方法所屬的類是:PeerAwareInstanceRegistry,即server端的服務註冊邏輯都在這裏面。由於沒有使用AWS的服務器,因此默認實例化的實現類爲:PeerAwareInstanceRegistryImpl。
PeerAwareInstanceRegistry registry; if (isAws(applicationInfoManager.getInfo())) { registry = new AwsInstanceRegistry( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager); awsBinder.start(); } else { registry = new PeerAwareInstanceRegistryImpl( eurekaServerConfig, eurekaClient.getEurekaClientConfig(), serverCodecs, eurekaClient ); }
PeerAwareInstanceRegistryImpl 繼承了一個抽象類 AbstractInstanceRegistry:
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { }
AbstractInstanceRegistry中的實現邏輯是真正的服務註冊存儲所在地:
public abstract class AbstractInstanceRegistry implements InstanceRegistry { private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class); private static final String[] EMPTY_STR_ARRAY = new String[0]; private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>(); protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder .newBuilder().initialCapacity(500) .expireAfterAccess(1, TimeUnit.HOURS) .<String, InstanceStatus>build().asMap(); .... .... .... }
全部的服務實例信息都保存在 server 本地的map當中。因此在server端啓動的時候會去拉別的server上存儲的client實例,而後存儲到本地緩存。
若是是某個client主動發出了註冊請求,那麼是如何註冊到服務端呢?
仍是查看日誌:啓動服務端,而後再啓動客戶端,查看服務端日誌:
這裏能看到剛纔啓動的客戶端已經在服務端註冊了,註冊邏輯走的類是:AbstractInstanceRegistry。
上面也提到 是服務註冊的邏輯實現類,完成保存客戶端信息的方法是:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { ...... }
代碼就不貼了,主要實現的邏輯是保存當前註冊的客戶端信息。咱們知道客戶端是發送了一次http請求給服務端,那麼真正的註冊邏輯應該是從一個http請求的接收處進來的。跟着使用了register方法的地方去找,PeerAwareInstanceRegistryImpl裏面有調用:
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); //將新節點信息告訴別的服務端 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
這裏沒有改寫父類的register邏輯,下面還多了一句:replicateToPeers,這裏主要作的邏輯是:給兄弟 server節點發送register 請求,告訴他們有客戶端來註冊。
繼續看誰調用了這裏,能夠找到:ApplicationResource 的addInstance方法調用了:
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { ...... registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
而這裏很顯然是一個接口,邏輯就很清晰了:
另外,咱們查看addInstance方法被誰調用的過程當中發現:PeerReplicationResource--->batchReplication 方法也調用了註冊的邏輯。
這個方法一看居然解答了以前個人疑惑:服務端之間是如何發送心跳的。原來實現是在這裏。經過dispatch方法來區分當前的調用是何種請求,
能夠看到,服務註冊,心跳檢測,服務取消,服務下線,服務剔除的入口都在這裏:
@Path("batch") @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { try { batchResponse.addResponse(dispatch(instanceInfo)); } catch (Exception e) { batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); logger.error("{} request processing failed for batch item {}/{}", instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e); } } return Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error("Cannot execute batch Request", e); return Response.status(Status.INTERNAL_SERVER_ERROR).build(); } } private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case Register: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); break; case Heartbeat: singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); break; case Cancel: singleResponseBuilder = handleCancel(resource); break; case StatusUpdate: singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); break; case DeleteStatusOverride: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); break; } return singleResponseBuilder.build(); }
從這個入口進去,你們能夠跟蹤一下感興趣的邏輯。