不建議爲了看源碼而看源碼,這樣作無心義!其次,有時候瞭解工做機制便可,不必定得深刻源碼的每個細節。更多的時候,看源碼,只是工做須要。所以,源碼分析,不會貼大量的源碼,而只是給了流程 + 代碼入口。看源碼的工做,仍是得本身去看。java
EurekaServerConfig:eureka server 配置類的 抽象json
CodecWrapper:定義如何編碼,解碼。可經過 eureka server 的 jsonCodecName 修改 CodecWrapperapi
CodecWrappers:CodecWrapper 工具類緩存
CodecWrapper:編解碼組件網絡
ServerCodecs:獲取 編碼、解碼器app
PeerEurekaNodes:管理 PeerEurekaNode框架
PeerEurekaNode:表示一個 eureka-server函數
EurekaServerContext: 容器,上下文,可獲取本地 Eureka 相關的類(被容器管理,可注入)工具
InstanceInfo:實例的抽象,即一個 client源碼分析
ReplicationClientAdditionalFilters:向別的 eureka-server 發起請求時,會執行過濾。
RefreshablePeerEurekaNodes:PeerEurekaNodes 的繼承類, eureka-server 注入的 PeerEurekaNodes 實際上是 RefreshablePeerEurekaNodes 。 見名知意,即 可刷新的 PeerEurekaNodes 。 是否刷新邏輯,見:RefreshablePeerEurekaNodes#onApplicationEvent 。
EurekaHttpClient:發送 http 請求抽象。
ResponseCache:響應請求緩存類。緩存的類型,參考 com.netflix.eureka.registry.Key
netflix 封裝了一個相似線程池的任務組件,用於提交 eureka-server 之間的操做任務。
AcceptorExecutor:任務接收器
TaskDispatcher:任務分發器
TaskDispatchers:任務分發器的工廠類
TaskProcessor:任務處理器
TaskHolder:對任務的再封裝,依賴 task。
大體的交互圖以下:
代碼入口:ApplicationResource#addInstance
ConcurrentHashMap<String /* appName */, Map<String /* instanceId=ip + ":" + appName + ":" + port */, Lease<InstanceInfo>>>
代碼入口:InstanceResource#renewLease
流程比較簡單,主要是修改 Lease 的 lastUpdateTimestamp 字段。一樣的會向其餘 eureka-server 節點發送續期請求。
代碼入口:ApplicationsResource#getContainers
MeasuredRate renewsLastMin; // 統計每分鐘的心跳包 volatile int numberOfRenewsPerMinThreshold; // 每分鐘 client 應該續期的最小次數 volatile int expectedNumberOfClientsSendingRenews; //註冊的 client 數量
eureka-server 在必定時間內(默認90s)沒有收到 client 心跳,會剔除該實例。可是發生網絡分區故障時,client 沒法與 server 通訊,此時不該該剔除該 client。所以有了 eureka-server 的自我保護機制。 eureka-server 在進入自我保護機制時,不會剔除 client。當網絡故障恢復以後,會自動退出自我保護機制。
AbstractInstanceRegistry
MeasuredRate renewsLastMin; // 統計每分鐘的心跳包 volatile int numberOfRenewsPerMinThreshold; // 每分鐘 client 應該續期的最小次數 volatile int expectedNumberOfClientsSendingRenews; //註冊的 client 數量
numberOfRenewsPerMinThreshold 參數的計算依賴於 expectedNumberOfClientsSendingRenews。
先看, expectedNumberOfClientsSendingRenews 相關的方法
PeerAwareInstanceRegistryImpl#openForTraffic()
// 初始化 expectedNumberOfClientsSendingRenews = 1 this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold();
AbstractInstanceRegistry#register()
synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } }
PeerAwareInstanceRegistryImpl#updateRenewalThreshold
synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold or if self preservation is disabled. if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); } }
PeerAwareInstanceRegistryImpl#cancel
synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } }
PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask
每隔15分鐘,調用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold 更新 expectedNumberOfClientsSendingRenews
每次更新了 expectedNumberOfClientsSendingRenews,必然會調用 updateRenewsPerMinThreshold() 方法,更新 numberOfRenewsPerMinThreshold。
AbstractInstanceRegistry#updateRenewsPerMinThreshold
protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); // this.expectedNumberOfClientsSendingRenews * (60.0 / 30 ) * 0.85 }
renewsLastmin,在 client 續期時,會調用 MeasuredRate#increment(), 將 currentBucket 值 + 1
服務端接收客戶端續期請求 代碼入口:AbstractInstanceRegistry#renew
MeasuredRate#increment
public void increment() { currentBucket.incrementAndGet(); }
MeasureRate 在初始化時,會啓動一個定時器,每隔 60s。便會將 currentBucket 清 0
代碼入口: MeasuredRate#start
MeasureRate 初始化代碼入口:AbstractInstanceRegistry 構造函數
在啓動 eureka-server 時,會初始化一個定時器,每隔 60s 剔除未及時發送心跳包的 client。
代碼入口:AbstractInstanceRegistry#postInit
真正執行剔除邏輯的代碼入口:AbstractInstanceRegistry#evict(long additionalLeaseMs)
public void evict(long additionalLeaseMs) { // ... if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // ... }
PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
上面介紹了自我保護機制的相關參數,當 isLeaseExpirationEnabled() 返回 false 時,會剔除過時的 client。從方法看,只要開啓自我保護,必定返回 true,所以不會剔除過時的 client。
更多內容,關注公衆號