Hystrix請求合併與請求緩存(一):請求緩存

前言

國慶長假結束後,筆者一直在於假期綜合症纏鬥,特別是週六上班。。。java

相信你們對Hystrix都很熟悉,它的源碼大量使用RxJava,正好筆者的老本行是Android開發工程師,之前也略微接觸過,想分享下本身看完Hystix的請求合併與請求緩存部分源碼的一些收穫。git

Hystrix簡介

  • Hystrix由Netflix開源,官方定義以下:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.github

  • 筆者理解:在分佈式環境中,錯誤不可避免,Hystrix提供了一種隔離、降級、熔斷等機制。

一、隔離:經過隔離,避免服務之間相互影響,一個服務不可用,不會影響別的服務,避免了服務雪崩。 二、降級:分佈式環境中,服務不可用的狀況沒法避免,降級機制能夠給出更加友好的交互(默認值、異常返回)。 三、熔斷:熔斷機制能夠避免在服務不可用時,服務調用方還在調用不可用的服務,致使資源消耗、耗時增長。 四、提供可視化的監控,Hystrix Dashboard。 四、固然,還有筆者今天要講的請求合併與請求緩存緩存

  • 請求合併與請求緩存,對應於官方給出的**What does it do?**的第3項:

Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.數據結構

  • 如下都是經過官方給的測試用例做爲入口,查找源碼並進行分析。

一、請求緩存:CommandUsingRequestCache 二、請求合併:CommandCollapserGetValueForKeyapp

請求緩存

  • 請求緩存的例子在CommandUsingRequestCache,繼承自HystrixCommand,和通常的Command一致。
  • 那麼,使用緩存和不使用緩存代碼層面有何不一樣呢?

一、初始化HystrixRequestContext 二、重寫getCacheKey分佈式

HystrixRequestContext

  • HystrixRequestContext.initializeContext代碼在HystrixRequestContext中,從類名能夠看出這是個請求上下文,保存一些請求的信息。測試

  • 從源碼能夠看出,new出一個HystrixRequestContext,塞入ThreadLocal變量中。ui

private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

/** * Call this at the beginning of each request (from parent thread) * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from * the parent thread. * <p> * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b> * <p> * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context. */
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state);
        return state;
    }
複製代碼
  • 那麼,HystrixRequestContext存儲上下文的數據結構是怎樣的呢?
// 每一個HystrixRequestContext實例,都會有一個ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

/** 刪除ConcurrentMap中存儲的全部鍵值對,若是初始化了HystrixRequestContext對象,沒有調用shutdown方法,確實會致使內存泄漏,由於state還在。 */
    public void shutdown() {
        if (state != null) {
            for (HystrixRequestVariableDefault<?> v : state.keySet()) {
                // for each RequestVariable we call 'remove' which performs the shutdown logic
                try {
                    HystrixRequestVariableDefault.remove(this, v);
                } catch (Throwable t) {
                    HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
                }
            }
            // null out so it can be garbage collected even if the containing object is still
            // being held in ThreadLocals on threads that weren't cleaned up
            state = null;
        }
    }
複製代碼
  • 這個ConcurrentHashMap裏存的HystrixRequestVariableDefault及靜態內部類HystrixRequestVariableDefault.LazyInitializer又是什麼呢?

HystrixRequestVariableDefault

  • HystrixRequestVariableDefault其實就是存儲了泛型Tvalue,而且封裝了initialValuegetset方法。
  • LazyInitializer顧名思義就是爲了懶漢式初始化value,而設計的內部類。
// 做用一:做爲內部類調用HystrixRequestVariableDefault.initialValue方法,經過維護initialized布爾值,使HystrixRequestVariableDefault.initialValue方法只調用一次。
// 做用二:new一個LazyInitializer對象或LazyInitializer被垃圾回收時不會調用HystrixRequestVariableDefault.initialValue方法,也就是說對於業務初始化邏輯的影響被排除。
// 做用三:調用get方法時,能夠經過CAS樂觀鎖的方式實現value的獲取,具體請參照get方法。
static final class LazyInitializer<T> {
        // @GuardedBy("synchronization on get() or construction")
        private T value;

        /* * Boolean to ensure only-once initialValue() execution instead of using * a null check in case initialValue() returns null */
        // @GuardedBy("synchronization on get() or construction")
        private boolean initialized = false;

        private final HystrixRequestVariableDefault<T> rv;

		// 不會調用HystrixRequestVariableDefault.initialValue,不會更新initialized值
        private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
            this.rv = rv;
        }

		// 不會調用HystrixRequestVariableDefault.initialValue,只能經過set方式調用
        private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
            this.rv = rv;
            this.value = value;
            this.initialized = true;
        }
		// 若是未初始化(沒有調用過set方法)過,則返回HystrixRequestVariableDefault.initialValue的值,初始化過則返回初始化的值
        public synchronized T get() {
            if (!initialized) {
                value = rv.initialValue();
                initialized = true;
            }
            return value;
        }
    }
複製代碼
  • get方法,先從ConcurrentHashMap中取出對應的LazyInitializer,若是爲空則使用CAS樂觀鎖的方式,new一個LazyInitializer並存入ConcurrentHashMap,最後返回調用LazyInitializer.get()並返回
public T get() {
		// 當前線程的HystrixRequestContext爲null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 爲null
        if (HystrixRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;

        // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
        LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }

        /* * 樂觀鎖方式(CAS)new一個LazyInitializer,放進ConcurrentHashMap * 這裏值得注意的是,不調用LazyInitializer.get方法是不會執行HystrixRequestVariableDefault.initialValue,故當putIfAbsent失敗時,能夠樂觀地放棄該實例,使該實例被GC。 * 無論哪一個LazyInitializer實例的get方法被調用,HystrixRequestVariableDefault.initialValue也只會被調用一次。 */
        LazyInitializer<T> l = new LazyInitializer<T>(this);
        LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
        if (existing == null) {
            /* * We won the thread-race so can use 'l' that we just created. */
            return l.get();
        } else {
            /* * We lost the thread-race so let 'l' be garbage collected and instead return 'existing' */
            return (T) existing.get();
        }
    }
複製代碼

各種之間的關係

  • 一個request(不侷限於一個線程) -> HystrixRequestContext -> ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer>
  • 也就是說每一個request都有一個ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer> map。

獲取緩存

  • getCacheKey重寫了AbstractCommand.getCacheKey方法,AbstractCommandHystrixCommand的基類。
    enter image description here
  • 根據上圖,咱們能夠看出execute方法,最終調用toObservable方法,而toObservable方法在AbstractCommand中,所以咱們能夠初步判定在AbstractCommand.toObservable方法中,會與HystrixRequestVariableDefault或者其實現的接口產生關聯,進行緩存的讀取和寫入。

*AbstractCommand.toObservable的關鍵代碼以下:this

final String cacheKey = getCacheKey();

                /* 若是開啓了緩存功能,從緩存讀取 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

				// 緩存對象
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放進緩存
                if (requestCacheEnabled && cacheKey != null) {
                    // 包裝成緩存Observable對象
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
複製代碼
  • 接下來,咱們就只要尋找HystrixRequestCacheHystrixRequestVariableDefault之間的關聯了,AbstractCommand構造器中經過HystrixRequestCache.getInstance構造了HystrixRequestCache對象。
// 又是CAS,putIfAbsent。。。
 private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
        HystrixRequestCache c = caches.get(rcKey);
        if (c == null) {
            HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
            HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
            if (existing == null) {
                // we won so use the new one
                c = newRequestCache;
            } else {
                // we lost so use the existing
                c = existing;
            }
        }
        return c;
    }
複製代碼
  • 來看HystrixRequestCache的值是怎麼存儲的,看HystrixRequestCache.putIfAbsent
HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
		// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包裝成緩存key
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            // 尋找緩存,關鍵代碼
            ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
            if (alreadySet != null) {
                // someone beat us so we didn't cache this
                return alreadySet;
            }
        }
        // we either set it in the cache or do not have a cache key
        return null;
    }
複製代碼
  • requestVariableInstance.get(key)HystrixRequestVariableHolder中的方法。
// 找到了關聯。。。這裏有HystrixRequestVariable
 private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
 // 
 public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
        
        RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
        HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
        if (rvInstance == null) {
            requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
            /* * 內存泄漏檢測, */
            if (requestVariableInstance.size() > 100) {
                logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
            }
        }
        // HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再從ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根據重寫的getCacheKey構造出ValueCacheKey,拿出緩存值。
        return (T) requestVariableInstance.get(key).get();
    }
複製代碼

獲取緩存過程當中各個對象的對應關係

  • 一個commandKey
  • 一個HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>
  • 一個ConcurrentHashMap<RVCacheKey, HystrixRequestVariable> requestVariableInstance = new ConcurrentHashMap>()

請求緩存總結

最後,再總結下請求緩存機制,一個request對應一個HystrixRequestContextHystrixRequestVariable中存儲緩存值,經過重寫getCacheKey構造對應RVCacheKey,經過HystrixRequestCacheHystrixRequestVariableHolder拿到HystrixRequestVariable的值。

總結

看了源碼才發現,做者有以下感覺:

一、各類ConcurrentHashMap 二、終於RxJava第一次看到在非Android領域運用 三、懶加載+CAS伴隨整個流程,後續也會考慮這種非鎖實現

參考文獻

相關文章
相關標籤/搜索