GuavaCache

Google Guava LocalLoadingCache

前言

在咱們編程的過程當中會遇到一些在程序中須要重試使用的數據,在這種狀況下咱們就能夠考慮利用緩存(內存)的優點來提供程序訪問這些數據的一個性能了。利用了緩存能夠在必定程度上緩解很大的性能消耗:java

  • 網絡傳輸開銷算法

  • 數據序列化反序列話spring

  • 數據庫、文件系統數據訪問慢數據庫

緩存器是利用內存進行數據存儲的,在存儲容量上有必定的限制,因此咱們在咱們使用緩存的時候也分兩種場景:編程

  • 全量數據緩存數組

  • 緩存熱數據,這也是基於緩存容量的一個考慮緩存

好了本篇咱們就來聊聊寫程序過程當中常能用到的本地緩存的方式。網絡

JDK提供的數據結構(Map)

緩存數據的存儲格式通常都是以Key-Value的方式,那這裏咱們主要來討論下Map的實現ConcurrentHashMap實現的緩存。數據結構

String key = StringUtils.EMPTY;
ConcurrentMap<String, String> localCache  = new ConcurrentHashMap();
if(StringUtils.isEmpty(localCache.get(key))) {
    String value = queryFromDB(key);
    localCache.put(key,value);
    return value;
}
return localCache.get(key);

這樣就能構造一個很是簡單的緩存。app

注意:這個緩存仍是有很是多的問題

  • 沒有一個清除緩存的策略,最終全部被訪問過得數據都會全量給緩存起來,直到顯式清除。

  • 同時緩存沒命中的狀況下須要應用顯式去加載(queryFromDB )。

LocalLoadingCache

好了主角要登場了,先簡單介紹下這個cache的一些用法,這個cache比較好的解決了我上面提到經過Map用做緩存的兩個缺陷。

用法

LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
    .maximumSize(10000)
    .expireAfterWrite(10, TimeUnit.MINUTES)
    .removalListener(MY_LISTENER)
    .build(
        new CacheLoader<Key, Graph>() {
          public Graph load(Key key) throws AnyException {
            return createExpensiveGraph(key);
          }
        });

經過這種方式一個緩存就已經建立好了,上面定義的load函數在緩存中不存在key對應的value的時候會去執行將數據load放到緩存中。

其底層存儲採用基於數組的java.util.concurrent.atomic.AtomicReferenceArray進行緩存元素的存取。

load如何被加載

先分析下load函數是怎麼被執行的:graphs.getUnchecked(new Key());從緩存中獲取數據,若是沒有進行put操做,首次get的時候緩存中沒有其緩存值,這個時候必然要觸發load函數進行value load了,那咱們就從get函數進行深刻分析(分析源碼基於16.0.1)。

com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader<? super K,V>)

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // don't call getLiveEntry, which would ignore loading values
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            V value = getLiveValue(e, now);
            if (value != null) {
              recordRead(e, now);
              statsCounter.recordHits(1);
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // at this point e is either null or expired;
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
    }

首次調用會執行lockedGetOrLoad函數

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
        throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;

      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
              V value = valueReference.get();
              if (value == null) {
                enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) {
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accomodate an incorrect expiration queue.
                enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
              } else {
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don't consider refresh
                return value;
              }

              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }

        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference<K, V>();

          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        postWriteCleanup();
      }

      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
      }
    }

最後調用loadSync(key, hash, loadingValueReference, loader);進行進行數據load。

public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      stopwatch.start();
      V previousValue = oldValue.get();
      try {
        if (previousValue == null) {
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return Futures.transform(newValue, new Function<V, V>() {
          @Override
          public V apply(V newValue) {
            LoadingValueReference.this.set(newValue);
            return newValue;
          }
        });
      } catch (Throwable t) {
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return setException(t) ? futureValue : fullyFailedFuture(t);
      }
    }

執行loader.load將數據load進緩存,可能你會想若是這個時候從DB或其餘非內存存儲中也沒找到數據,這個時候LocalLoadingCache是怎麼處理的呢?其實在這種狀況下只須要throw異常信息就好,這樣LocalLoadingCache會放棄緩存。

可是讀源代碼細心的你可能會發如今lockedGetOrLoad中會先newEntry後面才load

if (createNewEntry) {
      loadingValueReference = new LoadingValueReference<K, V>();

      if (e == null) {
        e = newEntry(key, hash, first);
        e.setValueReference(loadingValueReference);
        table.set(index, e);
      } else {
        e.setValueReference(loadingValueReference);
      }
    } finally {
        unlock();
        postWriteCleanup();
}

  if (createNewEntry) {
    try {
      // Synchronizes on the entry to allow failing fast when a recursive load is
      // detected. This may be circumvented when an entry is copied, but will fail fast most
      // of the time.
      synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
      }
    } finally {
      statsCounter.recordMisses(1);
    }
  } else {
    // The entry already exists. Wait for loading.
    return waitForLoadingValue(e, key, valueReference);
  }

其實實現很簡單他在cache到異常信息後又會對緩存中的entry進行remove操做,當時找這段異常被cache的代碼也是找了好久時間了。

com.google.common.cache.LocalCache.Segment#getAndRecordStats

V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference,
        ListenableFuture<V> newValue) throws ExecutionException {
      V value = null;
      try {
        value = getUninterruptibly(newValue);
        if (value == null) {
          throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
        storeLoadedValue(key, hash, loadingValueReference, value);
        return value;
      } finally {
        if (value == null) {
          statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
          removeLoadingValue(key, hash, loadingValueReference);
        }
      }
    }

執行removeLoadingValue將load異常後的key刪除。

緩存策略

從用法那小結能夠看到咱們在建立緩存的時候除了load還有一些其餘特性以下:

maximumSize(10000)
expireAfterWrite(10, TimeUnit.MINUTES)

這又是什麼意思呢?這其實就是LocalLoadingCache提供的緩存策略。

maximumSize(10000) 設置緩存能保存的最多元素數量。
expireAfterWrite(10, TimeUnit.MINUTES) 設置元素在寫後多久進行銷燬。

其實還有maximumWeight、expireAfterAccess兩種元素過時策略。

  • maximumSizemaximumWeight的一種特殊形式,將全部的元素設置weight爲1,也即就轉化爲能存儲元素個數的上限值了。

  • expireAfterAccessexpireAfterWrite基本就一個意思,只是內部用了兩種不一樣的計數方式(經過不一樣的queue進行管理,被訪問/修改進行入隊操做)進行訪問、寫操做的記錄。

很少說讓源碼說話。

  • 根據過時時間進行緩存的淘汰策略思路:在進行get/put操做完成後對隊列(每次對緩存的操做頭會被其記錄下來)進行一次遍歷,而後按照過時時間淘汰過時的元素。

  • 根據元素個數上限進行清理的策略思路:在load新緩存值的時候比對下是否緩存容量(元素個數)已經達到上限,若是達到上限按照LRU算法進行淘汰元素。

過時時間淘汰策略

從分析load那小結咱們已經展現過get的代碼,其中最後finally中有段postReadCleanup();方法,深刻下去方法體就否則看出:

@GuardedBy("Segment.this")
void expireEntries(long now) {
  drainRecencyQueue();

  ReferenceEntry<K, V> e;
  while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
  while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
}

進行過時key清除策略,從這段代碼也能看出我爲何說expireAfterAccessexpireAfterWrite基本就一個意思了吧。

其實還有一種清除緩存的策略:基於引用的回收可是還沒研究清除不便多說,這個策略清除的時機和過時時間策略同樣。

@GuardedBy("Segment.this")
void drainReferenceQueues() {
  if (map.usesKeyReferences()) {
    drainKeyReferenceQueue();
  }
  if (map.usesValueReferences()) {
    drainValueReferenceQueue();
  }
}

容量回收策略

在新key對應的value load完後須要將value存放到緩存中去,插入完成後會進行容量的check若是超過容量限制會執行淘汰策略。對應源碼:

com.google.common.cache.LocalCache.Segment#storeLoadedValue

boolean storeLoadedValue(K key, int hash, LoadingValueReference<K, V> oldValueReference,
        V newValue) {
      lock();
      try {
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count + 1;
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();
            // replace the old LoadingValueReference if it's live, otherwise
            // perform a putIfAbsent
            if (oldValueReference == valueReference
                || (entryValue == null && valueReference != UNSET)) {
              ++modCount;
              if (oldValueReference.isActive()) {
                RemovalCause cause =
                    (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
                enqueueNotification(key, hash, oldValueReference, cause);
                newCount--;
              }
              setValue(e, key, newValue, now);
              this.count = newCount; // write-volatile
              evictEntries();
              return true;
            }

            // the loaded value was already clobbered
            valueReference = new WeightedStrongValueReference<K, V>(newValue, 0);
            enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);
            return false;
          }
        }

        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, newValue, now);
        table.set(index, newEntry);
        this.count = newCount; // write-volatile
        evictEntries();
        return true;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }

上面的存儲操做最終在進行setValue後會執行:

com.google.common.cache.LocalCache.Segment#evictEntries

@GuardedBy("Segment.this")
void evictEntries() {
  if (!map.evictsBySize()) {
    return;
  }

  drainRecencyQueue();
  while (totalWeight > maxSegmentWeight) {
    ReferenceEntry<K, V> e = getNextEvictable();
    if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
      throw new AssertionError();
    }
  }
}
// TODO(fry): instead implement this with an eviction head
ReferenceEntry<K, V> getNextEvictable() {
  for (ReferenceEntry<K, V> e : accessQueue) {
    int weight = e.getValueReference().getWeight();
    if (weight > 0) {
      return e;
    }
  }
  throw new AssertionError();
}

這裏最終會根據LRU從緩存中將最近沒有使用過的元素進行剔除操做。

最後說下removalListener

在LocalLoadingCache中提供了在元素被移除的時候供應用進行回調的函數,這個函數經過removalListener進行註冊,當有元素從緩存中淘汰後就會觸發其進行調用。

接着上面移除元素進行分析函數removeEntry

@GuardedBy("Segment.this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
  int newCount = this.count - 1;
  AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
  int index = hash & (table.length() - 1);
  ReferenceEntry<K, V> first = table.get(index);

  for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
    if (e == entry) {
      ++modCount;
      ReferenceEntry<K, V> newFirst = removeValueFromChain(
          first, e, e.getKey(), hash, e.getValueReference(), cause);
      newCount = this.count - 1;
      table.set(index, newFirst);
      this.count = newCount; // write-volatile
      return true;
    }
  }

  return false;
}

最終會調用

@GuardedBy("Segment.this")
void enqueueNotification(@Nullable K key, int hash, ValueReference<K, V> valueReference,
    RemovalCause cause) {
  totalWeight -= valueReference.getWeight();
  if (cause.wasEvicted()) {
    statsCounter.recordEviction();
  }
  if (map.removalNotificationQueue != DISCARDING_QUEUE) {
    V value = valueReference.get();
    RemovalNotification<K, V> notification = new RemovalNotification<K, V>(key, value, cause);
    map.removalNotificationQueue.offer(notification);
  }
}

將創建一個RemovalNotification隊列進行保存刪除元素。

在讀/寫完成後會進行通知

com.google.common.cache.LocalCache.Segment#postWriteCleanup

 /**
 * Performs routine cleanup following a write.
 */
void postWriteCleanup() {
  runUnlockedCleanup();
}

void cleanUp() {
  long now = map.ticker.read();
  runLockedCleanup(now);
  runUnlockedCleanup();
}

runUnlockedCleanup源碼會回調com.google.common.cache.RemovalListener#onRemoval進行緩存元素刪除後置處理。

void processPendingNotifications() {
    RemovalNotification<K, V> notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
  }

最後類圖一張

clipboard.png
以爲圖不夠清晰能夠點擊查看大圖。

總結

本篇也主要是對LocalLoadingCache從運用這個層次更向前走了一步,對咱們使用過程其邏輯背後的實現進行了必定深刻分析。我在初次看到這個方式也是很疑惑其底層究竟是如何實現的,因而有了這篇文章,經過源碼進行跟蹤分析其背後的實現邏輯。

後面還會分析org.springframework.cache.guava.GuavaCacheManager如何將GuavaCache進行管理的,經過和spring更好的結合而消除顯式調用cache get/put的方式。

相關文章
相關標籤/搜索