在咱們編程的過程當中會遇到一些在程序中須要重試使用的數據,在這種狀況下咱們就能夠考慮利用緩存(內存)的優點來提供程序訪問這些數據的一個性能了。利用了緩存能夠在必定程度上緩解很大的性能消耗:java
網絡傳輸開銷算法
數據序列化反序列話spring
數據庫、文件系統數據訪問慢數據庫
緩存器是利用內存進行數據存儲的,在存儲容量上有必定的限制,因此咱們在咱們使用緩存的時候也分兩種場景:編程
全量數據緩存數組
緩存熱數據,這也是基於緩存容量的一個考慮緩存
好了本篇咱們就來聊聊寫程序過程當中常能用到的本地緩存的方式。網絡
緩存數據的存儲格式通常都是以Key-Value的方式,那這裏咱們主要來討論下Map的實現ConcurrentHashMap實現的緩存。數據結構
String key = StringUtils.EMPTY; ConcurrentMap<String, String> localCache = new ConcurrentHashMap(); if(StringUtils.isEmpty(localCache.get(key))) { String value = queryFromDB(key); localCache.put(key,value); return value; } return localCache.get(key);
這樣就能構造一個很是簡單的緩存。app
注意:這個緩存仍是有很是多的問題
沒有一個清除緩存的策略,最終全部被訪問過得數據都會全量給緩存起來,直到顯式清除。
同時緩存沒命中的狀況下須要應用顯式去加載(queryFromDB
)。
好了主角要登場了,先簡單介紹下這個cache的一些用法,這個cache比較好的解決了我上面提到經過Map用做緩存的兩個缺陷。
LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .removalListener(MY_LISTENER) .build( new CacheLoader<Key, Graph>() { public Graph load(Key key) throws AnyException { return createExpensiveGraph(key); } });
經過這種方式一個緩存就已經建立好了,上面定義的load函數在緩存中不存在key對應的value的時候會去執行將數據load放到緩存中。
其底層存儲採用基於數組的java.util.concurrent.atomic.AtomicReferenceArray
進行緩存元素的存取。
先分析下load函數是怎麼被執行的:graphs.getUnchecked(new Key());
從緩存中獲取數據,若是沒有進行put操做,首次get的時候緩存中沒有其緩存值,這個時候必然要觸發load函數進行value load了,那咱們就從get函數進行深刻分析(分析源碼基於16.0.1)。
com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader<? super K,V>) V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don't call getLiveEntry, which would ignore loading values ReferenceEntry<K, V> e = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference<K, V> valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } } // at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }
首次調用會執行lockedGetOrLoad函數
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { ReferenceEntry<K, V> e; ValueReference<K, V> valueReference = null; LoadingValueReference<K, V> loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let's accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don't consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); } }
最後調用loadSync(key, hash, loadingValueReference, loader);
進行進行數據load。
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) { stopwatch.start(); V previousValue = oldValue.get(); try { if (previousValue == null) { V newValue = loader.load(key); return set(newValue) ? futureValue : Futures.immediateFuture(newValue); } ListenableFuture<V> newValue = loader.reload(key, previousValue); if (newValue == null) { return Futures.immediateFuture(null); } // To avoid a race, make sure the refreshed value is set into loadingValueReference // *before* returning newValue from the cache query. return Futures.transform(newValue, new Function<V, V>() { @Override public V apply(V newValue) { LoadingValueReference.this.set(newValue); return newValue; } }); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } return setException(t) ? futureValue : fullyFailedFuture(t); } }
執行loader.load
將數據load進緩存,可能你會想若是這個時候從DB或其餘非內存存儲中也沒找到數據,這個時候LocalLoadingCache
是怎麼處理的呢?其實在這種狀況下只須要throw異常信息就好,這樣LocalLoadingCache
會放棄緩存。
可是讀源代碼細心的你可能會發如今lockedGetOrLoad
中會先newEntry後面才load
if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); }
其實實現很簡單他在cache到異常信息後又會對緩存中的entry進行remove操做,當時找這段異常被cache的代碼也是找了好久時間了。
com.google.common.cache.LocalCache.Segment#getAndRecordStats V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference, ListenableFuture<V> newValue) throws ExecutionException { V value = null; try { value = getUninterruptibly(newValue); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos()); storeLoadedValue(key, hash, loadingValueReference, value); return value; } finally { if (value == null) { statsCounter.recordLoadException(loadingValueReference.elapsedNanos()); removeLoadingValue(key, hash, loadingValueReference); } } }
執行removeLoadingValue將load異常後的key刪除。
從用法那小結能夠看到咱們在建立緩存的時候除了load還有一些其餘特性以下:
maximumSize(10000) expireAfterWrite(10, TimeUnit.MINUTES)
這又是什麼意思呢?這其實就是LocalLoadingCache
提供的緩存策略。
maximumSize(10000)
設置緩存能保存的最多元素數量。expireAfterWrite(10, TimeUnit.MINUTES)
設置元素在寫後多久進行銷燬。
其實還有maximumWeight、expireAfterAccess
兩種元素過時策略。
maximumSize
是maximumWeight
的一種特殊形式,將全部的元素設置weight爲1,也即就轉化爲能存儲元素個數的上限值了。
expireAfterAccess
和expireAfterWrite
基本就一個意思,只是內部用了兩種不一樣的計數方式(經過不一樣的queue進行管理,被訪問/修改進行入隊操做)進行訪問、寫操做的記錄。
很少說讓源碼說話。
根據過時時間進行緩存的淘汰策略思路:在進行get/put操做完成後對隊列(每次對緩存的操做頭會被其記錄下來)進行一次遍歷,而後按照過時時間淘汰過時的元素。
根據元素個數上限進行清理的策略思路:在load新緩存值的時候比對下是否緩存容量(元素個數)已經達到上限,若是達到上限按照LRU算法進行淘汰元素。
過時時間淘汰策略
從分析load那小結咱們已經展現過get的代碼,其中最後finally中有段postReadCleanup();
方法,深刻下去方法體就否則看出:
@GuardedBy("Segment.this") void expireEntries(long now) { drainRecencyQueue(); ReferenceEntry<K, V> e; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }
進行過時key清除策略,從這段代碼也能看出我爲何說expireAfterAccess
和expireAfterWrite
基本就一個意思了吧。
其實還有一種清除緩存的策略:基於引用的回收可是還沒研究清除不便多說,這個策略清除的時機和過時時間策略同樣。
@GuardedBy("Segment.this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } }
容量回收策略
在新key對應的value load完後須要將value存放到緩存中去,插入完成後會進行容量的check若是超過容量限制會執行淘汰策略。對應源碼:
com.google.common.cache.LocalCache.Segment#storeLoadedValue boolean storeLoadedValue(K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) { lock(); try { long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count + 1; if (newCount > this.threshold) { // ensure capacity expand(); newCount = this.count + 1; } AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { ValueReference<K, V> valueReference = e.getValueReference(); V entryValue = valueReference.get(); // replace the old LoadingValueReference if it's live, otherwise // perform a putIfAbsent if (oldValueReference == valueReference || (entryValue == null && valueReference != UNSET)) { ++modCount; if (oldValueReference.isActive()) { RemovalCause cause = (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED; enqueueNotification(key, hash, oldValueReference, cause); newCount--; } setValue(e, key, newValue, now); this.count = newCount; // write-volatile evictEntries(); return true; } // the loaded value was already clobbered valueReference = new WeightedStrongValueReference<K, V>(newValue, 0); enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED); return false; } } ++modCount; ReferenceEntry<K, V> newEntry = newEntry(key, hash, first); setValue(newEntry, key, newValue, now); table.set(index, newEntry); this.count = newCount; // write-volatile evictEntries(); return true; } finally { unlock(); postWriteCleanup(); } }
上面的存儲操做最終在進行setValue後會執行:
com.google.common.cache.LocalCache.Segment#evictEntries @GuardedBy("Segment.this") void evictEntries() { if (!map.evictsBySize()) { return; } drainRecencyQueue(); while (totalWeight > maxSegmentWeight) { ReferenceEntry<K, V> e = getNextEvictable(); if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } } // TODO(fry): instead implement this with an eviction head ReferenceEntry<K, V> getNextEvictable() { for (ReferenceEntry<K, V> e : accessQueue) { int weight = e.getValueReference().getWeight(); if (weight > 0) { return e; } } throw new AssertionError(); }
這裏最終會根據LRU從緩存中將最近沒有使用過的元素進行剔除操做。
在LocalLoadingCache中提供了在元素被移除的時候供應用進行回調的函數,這個函數經過removalListener進行註冊,當有元素從緩存中淘汰後就會觸發其進行調用。
接着上面移除元素進行分析函數removeEntry
@GuardedBy("Segment.this") boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; ReferenceEntry<K, V> newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
最終會調用
@GuardedBy("Segment.this") void enqueueNotification(@Nullable K key, int hash, ValueReference<K, V> valueReference, RemovalCause cause) { totalWeight -= valueReference.getWeight(); if (cause.wasEvicted()) { statsCounter.recordEviction(); } if (map.removalNotificationQueue != DISCARDING_QUEUE) { V value = valueReference.get(); RemovalNotification<K, V> notification = new RemovalNotification<K, V>(key, value, cause); map.removalNotificationQueue.offer(notification); } }
將創建一個RemovalNotification隊列進行保存刪除元素。
在讀/寫完成後會進行通知
com.google.common.cache.LocalCache.Segment#postWriteCleanup /** * Performs routine cleanup following a write. */ void postWriteCleanup() { runUnlockedCleanup(); } void cleanUp() { long now = map.ticker.read(); runLockedCleanup(now); runUnlockedCleanup(); }
runUnlockedCleanup
源碼會回調com.google.common.cache.RemovalListener#onRemoval
進行緩存元素刪除後置處理。
void processPendingNotifications() { RemovalNotification<K, V> notification; while ((notification = removalNotificationQueue.poll()) != null) { try { removalListener.onRemoval(notification); } catch (Throwable e) { logger.log(Level.WARNING, "Exception thrown by removal listener", e); } } }
以爲圖不夠清晰能夠點擊查看大圖。
本篇也主要是對LocalLoadingCache從運用這個層次更向前走了一步,對咱們使用過程其邏輯背後的實現進行了必定深刻分析。我在初次看到這個方式也是很疑惑其底層究竟是如何實現的,因而有了這篇文章,經過源碼進行跟蹤分析其背後的實現邏輯。
後面還會分析org.springframework.cache.guava.GuavaCacheManager
如何將GuavaCache進行管理的,經過和spring更好的結合而消除顯式調用cache get/put的方式。