國慶長假結束後,筆者一直在於假期綜合症纏鬥,特別是週六上班。。。java
相信你們對Hystrix都很熟悉,它的源碼大量使用RxJava,正好筆者的老本行是Android開發工程師,之前也略微接觸過,想分享下本身看完Hystix的請求合併與請求緩存部分源碼的一些收穫。git
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.github
一、隔離:經過隔離,避免服務之間相互影響,一個服務不可用,不會影響別的服務,避免了服務雪崩。 二、降級:分佈式環境中,服務不可用的狀況沒法避免,降級機制能夠給出更加友好的交互(默認值、異常返回)。 三、熔斷:熔斷機制能夠避免在服務不可用時,服務調用方還在調用不可用的服務,致使資源消耗、耗時增長。 四、提供可視化的監控,Hystrix Dashboard。 四、固然,還有筆者今天要講的請求合併與請求緩存。緩存
Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.數據結構
一、請求緩存:CommandUsingRequestCache 二、請求合併:CommandCollapserGetValueForKeyapp
CommandUsingRequestCache
,繼承自HystrixCommand
,和通常的Command
一致。一、初始化
HystrixRequestContext
二、重寫getCacheKey
分佈式
HystrixRequestContext.initializeContext
代碼在HystrixRequestContext
中,從類名能夠看出這是個請求上下文,保存一些請求的信息。測試
從源碼能夠看出,new出一個HystrixRequestContext
,塞入ThreadLocal
變量中。ui
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();
/** * Call this at the beginning of each request (from parent thread) * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from * the parent thread. * <p> * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b> * <p> * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context. */
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state);
return state;
}
複製代碼
HystrixRequestContext
存儲上下文的數據結構是怎樣的呢?// 每一個HystrixRequestContext實例,都會有一個ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();
/** 刪除ConcurrentMap中存儲的全部鍵值對,若是初始化了HystrixRequestContext對象,沒有調用shutdown方法,確實會致使內存泄漏,由於state還在。 */
public void shutdown() {
if (state != null) {
for (HystrixRequestVariableDefault<?> v : state.keySet()) {
// for each RequestVariable we call 'remove' which performs the shutdown logic
try {
HystrixRequestVariableDefault.remove(this, v);
} catch (Throwable t) {
HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
}
}
// null out so it can be garbage collected even if the containing object is still
// being held in ThreadLocals on threads that weren't cleaned up
state = null;
}
}
複製代碼
ConcurrentHashMap
裏存的HystrixRequestVariableDefault
及靜態內部類HystrixRequestVariableDefault.LazyInitializer
又是什麼呢?HystrixRequestVariableDefault
其實就是存儲了泛型T
的value
,而且封裝了initialValue
、get
、set
方法。LazyInitializer
顧名思義就是爲了懶漢式初始化value
,而設計的內部類。// 做用一:做爲內部類調用HystrixRequestVariableDefault.initialValue方法,經過維護initialized布爾值,使HystrixRequestVariableDefault.initialValue方法只調用一次。
// 做用二:new一個LazyInitializer對象或LazyInitializer被垃圾回收時不會調用HystrixRequestVariableDefault.initialValue方法,也就是說對於業務初始化邏輯的影響被排除。
// 做用三:調用get方法時,能夠經過CAS樂觀鎖的方式實現value的獲取,具體請參照get方法。
static final class LazyInitializer<T> {
// @GuardedBy("synchronization on get() or construction")
private T value;
/* * Boolean to ensure only-once initialValue() execution instead of using * a null check in case initialValue() returns null */
// @GuardedBy("synchronization on get() or construction")
private boolean initialized = false;
private final HystrixRequestVariableDefault<T> rv;
// 不會調用HystrixRequestVariableDefault.initialValue,不會更新initialized值
private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
this.rv = rv;
}
// 不會調用HystrixRequestVariableDefault.initialValue,只能經過set方式調用
private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
this.rv = rv;
this.value = value;
this.initialized = true;
}
// 若是未初始化(沒有調用過set方法)過,則返回HystrixRequestVariableDefault.initialValue的值,初始化過則返回初始化的值
public synchronized T get() {
if (!initialized) {
value = rv.initialValue();
initialized = true;
}
return value;
}
}
複製代碼
ConcurrentHashMap
中取出對應的LazyInitializer
,若是爲空則使用CAS樂觀鎖的方式,new一個LazyInitializer
並存入ConcurrentHashMap
,最後返回調用LazyInitializer.get()
並返回public T get() {
// 當前線程的HystrixRequestContext爲null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 爲null
if (HystrixRequestContext.getContextForCurrentThread() == null) {
throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
// short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {
return (T) v.get();
}
/* * 樂觀鎖方式(CAS)new一個LazyInitializer,放進ConcurrentHashMap * 這裏值得注意的是,不調用LazyInitializer.get方法是不會執行HystrixRequestVariableDefault.initialValue,故當putIfAbsent失敗時,能夠樂觀地放棄該實例,使該實例被GC。 * 無論哪一個LazyInitializer實例的get方法被調用,HystrixRequestVariableDefault.initialValue也只會被調用一次。 */
LazyInitializer<T> l = new LazyInitializer<T>(this);
LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
if (existing == null) {
/* * We won the thread-race so can use 'l' that we just created. */
return l.get();
} else {
/* * We lost the thread-race so let 'l' be garbage collected and instead return 'existing' */
return (T) existing.get();
}
}
複製代碼
getCacheKey
重寫了AbstractCommand.getCacheKey
方法,AbstractCommand
爲HystrixCommand
的基類。
execute
方法,最終調用toObservable
方法,而toObservable
方法在AbstractCommand
中,所以咱們能夠初步判定在AbstractCommand.toObservable
方法中,會與HystrixRequestVariableDefault
或者其實現的接口產生關聯,進行緩存的讀取和寫入。*AbstractCommand.toObservable
的關鍵代碼以下:this
final String cacheKey = getCacheKey();
/* 若是開啓了緩存功能,從緩存讀取 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 緩存對象
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放進緩存
if (requestCacheEnabled && cacheKey != null) {
// 包裝成緩存Observable對象
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
複製代碼
HystrixRequestCache
與 HystrixRequestVariableDefault
之間的關聯了,AbstractCommand
構造器中經過HystrixRequestCache.getInstance
構造了HystrixRequestCache
對象。// 又是CAS,putIfAbsent。。。
private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
HystrixRequestCache c = caches.get(rcKey);
if (c == null) {
HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
if (existing == null) {
// we won so use the new one
c = newRequestCache;
} else {
// we lost so use the existing
c = existing;
}
}
return c;
}
複製代碼
HystrixRequestCache
的值是怎麼存儲的,看HystrixRequestCache.putIfAbsent
。HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包裝成緩存key
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
// 尋找緩存,關鍵代碼
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
}
}
// we either set it in the cache or do not have a cache key
return null;
}
複製代碼
requestVariableInstance.get(key)
爲HystrixRequestVariableHolder
中的方法。// 找到了關聯。。。這裏有HystrixRequestVariable
private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
//
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
/* * 內存泄漏檢測, */
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
// HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再從ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根據重寫的getCacheKey構造出ValueCacheKey,拿出緩存值。
return (T) requestVariableInstance.get(key).get();
}
複製代碼
最後,再總結下請求緩存機制,一個request對應一個HystrixRequestContext
、HystrixRequestVariable
中存儲緩存值,經過重寫getCacheKey
構造對應RVCacheKey
,經過HystrixRequestCache
的HystrixRequestVariableHolder
拿到HystrixRequestVariable
的值。
看了源碼才發現,做者有以下感覺:
一、各類ConcurrentHashMap 二、終於RxJava第一次看到在非Android領域運用 三、懶加載+CAS伴隨整個流程,後續也會考慮這種非鎖實現