首先,我們都有一共識,便可以使用緩存來提高系統的訪問速度!java
現現在,分佈式緩存這麼強大,因此,大部分時候,咱們可能都不會去關注本地緩存了!redis
而在一塊兒高併發的場景,若是咱們一味使用nosql式的緩存,如 redis, 那麼也是好的吧!sql
可是有個問題咱們得考慮下: redis 這樣的緩存是快,可是它總有本身的瓶頸吧,若是什麼東西咱們都往裏面存儲,則在高併發場景下,應用瓶頸將受限於其緩存瓶頸吧!數據庫
因此,針對這種問題,在一些場景下,我們可使用本地緩存來存儲一些數據,從而避免每次都將請求擊穿到 redis 層面!express
本文考慮的是使用 本地緩存 做爲二級緩存存在,而非直接的充當緩存工具! 緩存
而使用本地緩存,則有幾個講究:安全
1. 緩存一致性問題;
2. 併發安全問題;網絡
所謂緩存一致性問題,就是本地緩存,是否和redis等緩存中間件的數據保持一致,若是不一致的表現超過了可接受的程度,則要這訪問速度也就沒啥意義了!數據結構
所謂併發安全問題,便是,當使用本地緩存時,本地的緩存訪問線程安全性問題,若是出現錯亂狀況,則嚴重了!併發
使用本地緩存,有什麼好處?
1. 減小訪問遠程緩存的網絡io,速度天然是要提高的;
2. 減小遠程緩存的併發請求,從而表現出更大的併發處理能力;
本地緩存,都有什麼應用場景?
1. 單機部署的應用我們就不說了;
2. 讀多寫少的場景;(這也緩存的應用場景)
3. 能夠容忍必定時間內的緩存不一致; (因涉及的本地緩存,分佈式機器結果必可能不一致)
4. 應用對緩存的請求量很是大的場景; (若是直接打到redis緩存, 則redis壓力巨大, 且應用響應速度將變慢)
因此,若是本身存在這樣的使用場景,不防也考慮下,如何使用這本地緩存,來提高響應速度吧!
若是要求本身來實現這兩級緩存功能,我想應該也是不能的!只要解決掉兩個問題便可:
1. 緩存過時策略;
2. 緩存安全性;
其中一最簡單直接的方式,就是使用一個定時刷新緩存的線程,在時間節點到達後,將緩存刪除便可; 另外一個安全問題,則可使用 synchronized 或 併發包下的鎖工具來實現便可。
可是真正作起來,可能不必定會簡單,這也不是我們想特別考慮的。
我們主要看下 guava 是如何來解決這種問題的?以其思路來開拓本身的設想!
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
首先,建立的guava緩存實例,應是全局共用的,不然就失去了緩存的意義;
其次,一些過時參數,應支持配置化;
例子以下:
@Component @Slf4j public class LocalEnhancedCacheHolder { @Value("${guava.cache.max.size}") private Integer maxCacheSize; @Value("${guava.cache.timeout}") private Integer guavaCacheTimeout; /** * 字符串類型取值, k->v 只支持字符串存取,避免複雜化 */ private LoadingCache<String, String> stringDbCacheContainer; /** * hash 數據緩存, 展現使用多key 做爲 guava 的緩存key */ private LoadingCache<HashDbItemEntry, byte[]> hashDbCacheContainer; @Resource private RedisTemplate redisTemplate; /** * 值爲空 字符串標識 */ public static final String EMPTY_VALUE_STRING = ""; /** * 值爲空 字節標識 */ public static final byte[] EMPTY_VALUE_BYTES = new byte[0]; @PostConstruct public void init() { Integer dbCount = 2; stringDbCacheContainer = CacheBuilder.newBuilder() .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS) .maximumSize(maxCacheSize) .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { log.info("【緩存】從redis獲取配置:{}", key); String value = redisTemplate.get(key); return StringUtils.defaultIfBlank(value, EMPTY_VALUE_STRING); } }); hashDbCacheContainer = CacheBuilder.newBuilder() .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS) .maximumSize(maxCacheSize / dbCount) .build(new CacheLoader<HashDbItemEntry, byte[]>() { @Override public byte[] load(HashDbItemEntry keyHolder) throws Exception { log.info("【緩存】從redis獲取配置:{}", keyHolder); byte[] valueBytes = redisTemplate.hgetValue( keyHolder.getBucketKey(), keyHolder.getSlotKey()); if(valueBytes == null) { valueBytes = EMPTY_VALUE_BYTES; } return valueBytes; } }); } /** * 獲取k-v中的緩存值 * * @param key 鍵 * @return 緩存值,沒有值時,返回 null */ public String getCache(String key) { try { return stringDbCacheContainer.get(key); } catch (ExecutionException e) { log.error("【緩存】獲取緩存異常:{}, ex:{}", key, e); throw new RuntimeException(e); } } /** * 放入緩存,此處暫只實現爲向redis寫入值 * * @param key 緩存key * @param value 緩存value */ public void putCache(String key, String value) { redisTemplate.set(key, value, 0L); } /** * 放入緩存帶超時時間設置,此處暫只實現爲向redis寫入值 * * @param key 緩存key * @param value 緩存value * @param timeout 超時時間,單位 s */ public void putCache(String key, String value, Long timeout) { redisTemplate.set(key, value, timeout); } /** * 刪除單個kv緩存 * * @param key 緩存鍵 */ public void removeCache(String key) { redisTemplate.remove(key); } /** * 批量刪除單個kv緩存 * * @param keyList 緩存鍵 列表,以 管道形式刪除,性能更高 */ public void removeCache(Collection<String> keyList) { redisTemplate.remove(keyList); } /** * 從hash數據庫中獲取緩存值 * * @param bucketKey 桶key, 對應一系列值 k->v * @param slotKey 槽key, 對應具體的緩存值 * @return 緩存值 */ public byte[] getCacheFromHash(String bucketKey, String slotKey) { HashDbItemEntry entry = new HashDbItemEntry(bucketKey, slotKey); try { return hashDbCacheContainer.get(entry); } catch (ExecutionException e) { log.error("【緩存】獲取緩存異常:{}, ex:{}", entry, e); throw new RuntimeException(e); } } /** * hash 數據結構存儲 * * value 暫不存儲相應值,只作查詢使用 */ class HashDbItemEntry { private String bucketKey; private String slotKey; private Object value; public HashDbItemEntry(String bucketKey, String slotKey) { this.bucketKey = bucketKey; this.slotKey = slotKey; } public String getBucketKey() { return bucketKey; } public String getSlotKey() { return slotKey; } public Object getValue() { return value; } // 必重寫 equals & hashCode, 不然緩存將沒法複用 @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; HashDbItemEntry that = (HashDbItemEntry) o; return Objects.equals(bucketKey, that.bucketKey) && Objects.equals(slotKey, that.slotKey) && Objects.equals(value, that.value); } @Override public int hashCode() { return Objects.hash(bucketKey, slotKey, value); } @Override public String toString() { return "HashDbItemEntry{" + "bucketKey='" + bucketKey + '\'' + ", slotKey='" + slotKey + '\'' + ", value=" + value + '}'; } } }
如上例子,展現了兩種緩存,一種是 簡單的 string -> string 的緩存, 另外一種是 (string, string) -> byte[] 的緩存; 無論怎麼樣,只是想說明,緩存的方式有多種!
咱們就以簡單的 string -> string 來講明吧!
stringDbCacheContainer = CacheBuilder.newBuilder() .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS) .maximumSize(maxCacheSize) .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { log.info("【緩存】從redis獲取配置:{}", key); String value = redisTemplate.get(key); return StringUtils.defaultIfBlank(value, EMPTY_VALUE_STRING); } });
如上,我們建立了一個緩存容器,它的最大容量是 maxCacheSize, 且每一個key將在 guavaCacheTimeout 後過時, 過時後將從 redisTemplate 中獲取數據!
如上,一個完整的 兩級緩存組件就完成了,你大能夠直接在項目進行相應的操做了!是否是很簡單?
@GwtCompatible(emulated = true) public final class CacheBuilder<K, V> { private static final int DEFAULT_INITIAL_CAPACITY = 16; private static final int DEFAULT_CONCURRENCY_LEVEL = 4; private static final int DEFAULT_EXPIRATION_NANOS = 0; private static final int DEFAULT_REFRESH_NANOS = 0; static final Supplier<? extends StatsCounter> NULL_STATS_COUNTER = Suppliers.ofInstance( new StatsCounter() { @Override public void recordHits(int count) {} @Override public void recordMisses(int count) {} @Override public void recordLoadSuccess(long loadTime) {} @Override public void recordLoadException(long loadTime) {} @Override public void recordEviction() {} @Override public CacheStats snapshot() { return EMPTY_STATS; } }); static final CacheStats EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0); static final Supplier<StatsCounter> CACHE_STATS_COUNTER = new Supplier<StatsCounter>() { @Override public StatsCounter get() { return new SimpleStatsCounter(); } }; enum NullListener implements RemovalListener<Object, Object> { INSTANCE; @Override public void onRemoval(RemovalNotification<Object, Object> notification) {} } enum OneWeigher implements Weigher<Object, Object> { INSTANCE; @Override public int weigh(Object key, Object value) { return 1; } } static final Ticker NULL_TICKER = new Ticker() { @Override public long read() { return 0; } }; private static final Logger logger = Logger.getLogger(CacheBuilder.class.getName()); static final int UNSET_INT = -1; boolean strictParsing = true; int initialCapacity = UNSET_INT; int concurrencyLevel = UNSET_INT; long maximumSize = UNSET_INT; long maximumWeight = UNSET_INT; Weigher<? super K, ? super V> weigher; Strength keyStrength; Strength valueStrength; long expireAfterWriteNanos = UNSET_INT; long expireAfterAccessNanos = UNSET_INT; long refreshNanos = UNSET_INT; Equivalence<Object> keyEquivalence; Equivalence<Object> valueEquivalence; RemovalListener<? super K, ? super V> removalListener; Ticker ticker; Supplier<? extends StatsCounter> statsCounterSupplier = NULL_STATS_COUNTER; // TODO(fry): make constructor private and update tests to use newBuilder CacheBuilder() {} /** * Constructs a new {@code CacheBuilder} instance with default settings, including strong keys, * strong values, and no automatic eviction of any kind. */ public static CacheBuilder<Object, Object> newBuilder() { return new CacheBuilder<Object, Object>(); } /** * Sets the minimum total size for the internal hash tables. For example, if the initial capacity * is {@code 60}, and the concurrency level is {@code 8}, then eight segments are created, each * having a hash table of size eight. Providing a large enough estimate at construction time * avoids the need for expensive resizing operations later, but setting this value unnecessarily * high wastes memory. * * @throws IllegalArgumentException if {@code initialCapacity} is negative * @throws IllegalStateException if an initial capacity was already set */ public CacheBuilder<K, V> initialCapacity(int initialCapacity) { checkState(this.initialCapacity == UNSET_INT, "initial capacity was already set to %s", this.initialCapacity); checkArgument(initialCapacity >= 0); this.initialCapacity = initialCapacity; return this; } int getInitialCapacity() { return (initialCapacity == UNSET_INT) ? DEFAULT_INITIAL_CAPACITY : initialCapacity; } /** * Guides the allowed concurrency among update operations. Used as a hint for internal sizing. The * table is internally partitioned to try to permit the indicated number of concurrent updates * without contention. Because assignment of entries to these partitions is not necessarily * uniform, the actual concurrency observed may vary. Ideally, you should choose a value to * accommodate as many threads as will ever concurrently modify the table. Using a significantly * higher value than you need can waste space and time, and a significantly lower value can lead * to thread contention. But overestimates and underestimates within an order of magnitude do not * usually have much noticeable impact. A value of one permits only one thread to modify the cache * at a time, but since read operations and cache loading computations can proceed concurrently, * this still yields higher concurrency than full synchronization. * * <p> Defaults to 4. <b>Note:</b>The default may change in the future. If you care about this * value, you should always choose it explicitly. * * <p>The current implementation uses the concurrency level to create a fixed number of hashtable * segments, each governed by its own write lock. The segment lock is taken once for each explicit * write, and twice for each cache loading computation (once prior to loading the new value, * and once after loading completes). Much internal cache management is performed at the segment * granularity. For example, access queues and write queues are kept per segment when they are * required by the selected eviction algorithm. As such, when writing unit tests it is not * uncommon to specify {@code concurrencyLevel(1)} in order to achieve more deterministic eviction * behavior. * * <p>Note that future implementations may abandon segment locking in favor of more advanced * concurrency controls. * * @throws IllegalArgumentException if {@code concurrencyLevel} is nonpositive * @throws IllegalStateException if a concurrency level was already set */ public CacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) { checkState(this.concurrencyLevel == UNSET_INT, "concurrency level was already set to %s", this.concurrencyLevel); checkArgument(concurrencyLevel > 0); this.concurrencyLevel = concurrencyLevel; return this; } int getConcurrencyLevel() { return (concurrencyLevel == UNSET_INT) ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel; } /** * Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict * an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the * cache evicts entries that are less likely to be used again. For example, the cache may evict an * entry because it hasn't been used recently or very often. * * <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the * cache. This can be useful in testing, or to disable caching temporarily without a code change. * * <p>This feature cannot be used in conjunction with {@link #maximumWeight}. * * @param size the maximum size of the cache * @throws IllegalArgumentException if {@code size} is negative * @throws IllegalStateException if a maximum size or weight was already set */ public CacheBuilder<K, V> maximumSize(long size) { checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s", this.maximumSize); checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s", this.maximumWeight); checkState(this.weigher == null, "maximum size can not be combined with weigher"); checkArgument(size >= 0, "maximum size must not be negative"); this.maximumSize = size; return this; } /** * Specifies the maximum weight of entries the cache may contain. Weight is determined using the * {@link Weigher} specified with {@link #weigher}, and use of this method requires a * corresponding call to {@link #weigher} prior to calling {@link #build}. * * <p>Note that the cache <b>may evict an entry before this limit is exceeded</b>. As the cache * size grows close to the maximum, the cache evicts entries that are less likely to be used * again. For example, the cache may evict an entry because it hasn't been used recently or very * often. * * <p>When {@code weight} is zero, elements will be evicted immediately after being loaded into * cache. This can be useful in testing, or to disable caching temporarily without a code * change. * * <p>Note that weight is only used to determine whether the cache is over capacity; it has no * effect on selecting which entry should be evicted next. * * <p>This feature cannot be used in conjunction with {@link #maximumSize}. * * @param weight the maximum total weight of entries the cache may contain * @throws IllegalArgumentException if {@code weight} is negative * @throws IllegalStateException if a maximum weight or size was already set * @since 11.0 */ @GwtIncompatible("To be supported") public CacheBuilder<K, V> maximumWeight(long weight) { checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s", this.maximumWeight); checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s", this.maximumSize); this.maximumWeight = weight; checkArgument(weight >= 0, "maximum weight must not be negative"); return this; } /** * Specifies the weigher to use in determining the weight of entries. Entry weight is taken * into consideration by {@link #maximumWeight(long)} when determining which entries to evict, and * use of this method requires a corresponding call to {@link #maximumWeight(long)} prior to * calling {@link #build}. Weights are measured and recorded when entries are inserted into the * cache, and are thus effectively static during the lifetime of a cache entry. * * <p>When the weight of an entry is zero it will not be considered for size-based eviction * (though it still may be evicted by other means). * * <p><b>Important note:</b> Instead of returning <em>this</em> as a {@code CacheBuilder} * instance, this method returns {@code CacheBuilder<K1, V1>}. From this point on, either the * original reference or the returned reference may be used to complete configuration and build * the cache, but only the "generic" one is type-safe. That is, it will properly prevent you from * building caches whose key or value types are incompatible with the types accepted by the * weigher already provided; the {@code CacheBuilder} type cannot do this. For best results, * simply use the standard method-chaining idiom, as illustrated in the documentation at top, * configuring a {@code CacheBuilder} and building your {@link Cache} all in a single statement. * * <p><b>Warning:</b> if you ignore the above advice, and use this {@code CacheBuilder} to build * a cache whose key or value type is incompatible with the weigher, you will likely experience * a {@link ClassCastException} at some <i>undefined</i> point in the future. * * @param weigher the weigher to use in calculating the weight of cache entries * @throws IllegalArgumentException if {@code size} is negative * @throws IllegalStateException if a maximum size was already set * @since 11.0 */ @GwtIncompatible("To be supported") public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> weigher( Weigher<? super K1, ? super V1> weigher) { checkState(this.weigher == null); if (strictParsing) { checkState(this.maximumSize == UNSET_INT, "weigher can not be combined with maximum size", this.maximumSize); } // safely limiting the kinds of caches this can produce @SuppressWarnings("unchecked") CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this; me.weigher = checkNotNull(weigher); return me; } // Make a safe contravariant cast now so we don't have to do it over and over. @SuppressWarnings("unchecked") <K1 extends K, V1 extends V> Weigher<K1, V1> getWeigher() { return (Weigher<K1, V1>) MoreObjects.firstNonNull(weigher, OneWeigher.INSTANCE); } /** * Specifies that each entry should be automatically removed from the cache once a fixed duration * has elapsed after the entry's creation, or the most recent replacement of its value. * * <p>When {@code duration} is zero, this method hands off to * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum * size or weight. This can be useful in testing, or to disable caching temporarily without a code * change. * * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or * write operations. Expired entries are cleaned up as part of the routine maintenance described * in the class javadoc. * * @param duration the length of time after an entry is created that it should be automatically * removed * @param unit the unit that {@code duration} is expressed in * @throws IllegalArgumentException if {@code duration} is negative * @throws IllegalStateException if the time to live or time to idle was already set */ public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) { checkState(expireAfterWriteNanos == UNSET_INT, "expireAfterWrite was already set to %s ns", expireAfterWriteNanos); checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit); this.expireAfterWriteNanos = unit.toNanos(duration); return this; } long getExpireAfterWriteNanos() { return (expireAfterWriteNanos == UNSET_INT) ? DEFAULT_EXPIRATION_NANOS : expireAfterWriteNanos; } /** * Specifies that each entry should be automatically removed from the cache once a fixed duration * has elapsed after the entry's creation, the most recent replacement of its value, or its last * access. Access time is reset by all cache read and write operations (including * {@code Cache.asMap().get(Object)} and {@code Cache.asMap().put(K, V)}), but not by operations * on the collection-views of {@link Cache#asMap}. * * <p>When {@code duration} is zero, this method hands off to * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum * size or weight. This can be useful in testing, or to disable caching temporarily without a code * change. * * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or * write operations. Expired entries are cleaned up as part of the routine maintenance described * in the class javadoc. * * @param duration the length of time after an entry is last accessed that it should be * automatically removed * @param unit the unit that {@code duration} is expressed in * @throws IllegalArgumentException if {@code duration} is negative * @throws IllegalStateException if the time to idle or time to live was already set */ public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) { checkState(expireAfterAccessNanos == UNSET_INT, "expireAfterAccess was already set to %s ns", expireAfterAccessNanos); checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit); this.expireAfterAccessNanos = unit.toNanos(duration); return this; } long getExpireAfterAccessNanos() { return (expireAfterAccessNanos == UNSET_INT) ? DEFAULT_EXPIRATION_NANOS : expireAfterAccessNanos; } /** * Specifies that active entries are eligible for automatic refresh once a fixed duration has * elapsed after the entry's creation, or the most recent replacement of its value. The semantics * of refreshes are specified in {@link LoadingCache#refresh}, and are performed by calling * {@link CacheLoader#reload}. * * <p>As the default implementation of {@link CacheLoader#reload} is synchronous, it is * recommended that users of this method override {@link CacheLoader#reload} with an asynchronous * implementation; otherwise refreshes will be performed during unrelated cache read and write * operations. * * <p>Currently automatic refreshes are performed when the first stale request for an entry * occurs. The request triggering refresh will make a blocking call to {@link CacheLoader#reload} * and immediately return the new value if the returned future is complete, and the old value * otherwise. * * <p><b>Note:</b> <i>all exceptions thrown during refresh will be logged and then swallowed</i>. * * @param duration the length of time after an entry is created that it should be considered * stale, and thus eligible for refresh * @param unit the unit that {@code duration} is expressed in * @throws IllegalArgumentException if {@code duration} is negative * @throws IllegalStateException if the refresh interval was already set * @since 11.0 */ @Beta @GwtIncompatible("To be supported (synchronously).") public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) { checkNotNull(unit); checkState(refreshNanos == UNSET_INT, "refresh was already set to %s ns", refreshNanos); checkArgument(duration > 0, "duration must be positive: %s %s", duration, unit); this.refreshNanos = unit.toNanos(duration); return this; } long getRefreshNanos() { return (refreshNanos == UNSET_INT) ? DEFAULT_REFRESH_NANOS : refreshNanos; } /** * Specifies a nanosecond-precision time source for use in determining when entries should be * expired. By default, {@link System#nanoTime} is used. * * <p>The primary intent of this method is to facilitate testing of caches which have been * configured with {@link #expireAfterWrite} or {@link #expireAfterAccess}. * * @throws IllegalStateException if a ticker was already set */ public CacheBuilder<K, V> ticker(Ticker ticker) { checkState(this.ticker == null); this.ticker = checkNotNull(ticker); return this; } Ticker getTicker(boolean recordsTime) { if (ticker != null) { return ticker; } return recordsTime ? Ticker.systemTicker() : NULL_TICKER; } /** * Specifies a listener instance that caches should notify each time an entry is removed for any * {@linkplain RemovalCause reason}. Each cache created by this builder will invoke this listener * as part of the routine maintenance described in the class documentation above. * * <p><b>Warning:</b> after invoking this method, do not continue to use <i>this</i> cache * builder reference; instead use the reference this method <i>returns</i>. At runtime, these * point to the same instance, but only the returned reference has the correct generic type * information so as to ensure type safety. For best results, use the standard method-chaining * idiom illustrated in the class documentation above, configuring a builder and building your * cache in a single statement. Failure to heed this advice can result in a {@link * ClassCastException} being thrown by a cache operation at some <i>undefined</i> point in the * future. * * <p><b>Warning:</b> any exception thrown by {@code listener} will <i>not</i> be propagated to * the {@code Cache} user, only logged via a {@link Logger}. * * @return the cache builder reference that should be used instead of {@code this} for any * remaining configuration and cache building * @throws IllegalStateException if a removal listener was already set */ @CheckReturnValue public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener( RemovalListener<? super K1, ? super V1> listener) { checkState(this.removalListener == null); // safely limiting the kinds of caches this can produce @SuppressWarnings("unchecked") CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this; me.removalListener = checkNotNull(listener); return me; } // Make a safe contravariant cast now so we don't have to do it over and over. @SuppressWarnings("unchecked") <K1 extends K, V1 extends V> RemovalListener<K1, V1> getRemovalListener() { return (RemovalListener<K1, V1>) MoreObjects.firstNonNull(removalListener, NullListener.INSTANCE); } /** * Enable the accumulation of {@link CacheStats} during the operation of the cache. Without this * {@link Cache#stats} will return zero for all statistics. Note that recording stats requires * bookkeeping to be performed with each operation, and thus imposes a performance penalty on * cache operation. * * @since 12.0 (previously, stats collection was automatic) */ public CacheBuilder<K, V> recordStats() { statsCounterSupplier = CACHE_STATS_COUNTER; return this; } boolean isRecordingStats() { return statsCounterSupplier == CACHE_STATS_COUNTER; } Supplier<? extends StatsCounter> getStatsCounterSupplier() { return statsCounterSupplier; } /** * Builds a cache, which either returns an already-loaded value for a given key or atomically * computes or retrieves it using the supplied {@code CacheLoader}. If another thread is currently * loading the value for this key, simply waits for that thread to finish and returns its * loaded value. Note that multiple threads can concurrently load values for distinct keys. * * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be * invoked again to create multiple independent caches. * * @param loader the cache loader used to obtain new values * @return a cache having the requested features */ public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build( CacheLoader<? super K1, V1> loader) { checkWeightWithWeigher(); return new LocalCache.LocalLoadingCache<K1, V1>(this, loader); } /** * Builds a cache which does not automatically load values when keys are requested. * * <p>Consider {@link #build(CacheLoader)} instead, if it is feasible to implement a * {@code CacheLoader}. * * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be * invoked again to create multiple independent caches. * * @return a cache having the requested features * @since 11.0 */ public <K1 extends K, V1 extends V> Cache<K1, V1> build() { checkWeightWithWeigher(); checkNonLoadingCache(); return new LocalCache.LocalManualCache<K1, V1>(this); } }
如上,使用建造者模式建立 LoadingCache<K, V> 緩存; 設置好 最大值,過時時間等參數;
其實就是一個get方法而已! stringDbCacheContainer.get(key);
// com.google.common.cache.LocalCache // LoadingCache methods @Override public V get(K key) throws ExecutionException { // 兩種數據來源,一是直接獲取,二是調用 load() 方法加載數據 return localCache.getOrLoad(key); } // com.google.common.cache.LocalCache V getOrLoad(K key) throws ExecutionException { return get(key, defaultLoader); } V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException { int hash = hash(checkNotNull(key)); // 還記得 ConcurrentHashMap 嗎? 先定位segment, 再定位 entry return segmentFor(hash).get(key, hash, loader); } Segment<K, V> segmentFor(int hash) { // TODO(fry): Lazily create segments? return segments[(hash >>> segmentShift) & segmentMask]; } // 核心取數邏輯在此get 中 // loading V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don't call getLiveEntry, which would ignore loading values ReferenceEntry<K, V> e = getEntry(key, hash); if (e != null) { // 若是存在值,則依據 ticker 進行判斷是否過時,從而直接返回值,具體過時邏輯稍後再說 long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference<K, V> valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } } // 初次加載或過時以後,進入加載邏輯,重要 // at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } } // static class Segment<K, V> extends ReentrantLock // 整個 Segment 繼承了 ReentrantLock, 因此 LocalCache 的鎖是依賴於 ReentrantLock 實現的 V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { ReferenceEntry<K, V> e; ValueReference<K, V> valueReference = null; LoadingValueReference<K, V> loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); // 在更新值前,先把過時數據清除 preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); // 處理 hash 碰撞時的鏈表查詢 for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let's accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don't consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } // 若是是第一次加載,則先建立 Entry, 進入 load() 邏輯 if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. // 同步加載數據源值, 從 loader 中處理 synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { // 記錄未命中計數,默認爲空 statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. // 若是有線程正在更新緩存,則等待結果便可,具體實現就是調用 Future.get() return waitForLoadingValue(e, key, valueReference); } } // 加載原始值 // at most one of loadSync.loadAsync may be called for any given LoadingValueReference V loadSync(K key, int hash, LoadingValueReference<K, V> loadingValueReference, CacheLoader<? super K, V> loader) throws ExecutionException { // loadingValueReference中保存了回調引用,加載原始值 ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader); // 存儲數據入緩存,以便下次使用 return getAndRecordStats(key, hash, loadingValueReference, loadingFuture); } // 從 loader 中加載數據, public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) { stopwatch.start(); V previousValue = oldValue.get(); try { // 若是原來沒有值,則直接加載後返回 if (previousValue == null) { V newValue = loader.load(key); return set(newValue) ? futureValue : Futures.immediateFuture(newValue); } // 不然通常爲無過時時間的數據進行 reload, 若是 reload() 的結果爲空,則直接返回 // 須重寫 reload() 實現 ListenableFuture<V> newValue = loader.reload(key, previousValue); if (newValue == null) { return Futures.immediateFuture(null); } // To avoid a race, make sure the refreshed value is set into loadingValueReference // *before* returning newValue from the cache query. return Futures.transform(newValue, new Function<V, V>() { @Override public V apply(V newValue) { LoadingValueReference.this.set(newValue); return newValue; } }); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } return setException(t) ? futureValue : fullyFailedFuture(t); } } // com.google.common.util.concurrent.Uninterruptibles /** * Waits uninterruptibly for {@code newValue} to be loaded, and then records loading stats. */ V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference, ListenableFuture<V> newValue) throws ExecutionException { V value = null; try { // 同步等待加載結果,注意,此處返回值不容許爲null, 不然將報異常,這多是爲了規避緩存攻擊漏洞吧 value = getUninterruptibly(newValue); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } // 加載成功記錄,此處擴展點,默認爲空 statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos()); // 最後將值存入緩存容器中,返回(論hash的重要性) storeLoadedValue(key, hash, loadingValueReference, value); return value; } finally { if (value == null) { statsCounter.recordLoadException(loadingValueReference.elapsedNanos()); removeLoadingValue(key, hash, loadingValueReference); } } } /** * Invokes {@code future.}{@link Future#get() get()} uninterruptibly. * To get uninterruptibility and remove checked exceptions, see * {@link Futures#getUnchecked}. * * <p>If instead, you wish to treat {@link InterruptedException} uniformly * with other exceptions, see {@link Futures#get(Future, Class) Futures.get} * or {@link Futures#makeChecked}. * * @throws ExecutionException if the computation threw an exception * @throws CancellationException if the computation was cancelled */ public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
如上,就是獲取一個緩存的過程。總結下來就是:
1. 先使用hash定位到 segment中,而後嘗試直接到 map中獲取結果;
2. 若是沒有找到或者已過時,則調用客戶端的load()方法加載原始數據;
3. 將結果存入 segment.map 中,本地緩存生效;
4. 記錄命中狀況,讀取計數;
其實剛剛咱們在看get()方法時,就看到了一些端倪。
要確認兩點: 1. 是否有建立異步清理線程進行過時數據清理? 2. 清理過程當中,原始數據如何自處?
其實guava的清理時機是在加載數據以前進行的!
// com.google.common.cache.LocalCache // static class Segment<K, V> extends ReentrantLock // 整個 Segment 繼承了 ReentrantLock, 因此 LocalCache 的鎖是依賴於 ReentrantLock 實現的 V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { ReferenceEntry<K, V> e; ValueReference<K, V> valueReference = null; LoadingValueReference<K, V> loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); // 在更新值前,先把過時數據清除 preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); // 處理 hash 碰撞時的鏈表查詢 for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let's accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don't consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } // 若是是第一次加載,則先建立 Entry, 進入 load() 邏輯 if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. // 同步加載數據源值, 從 loader 中處理 synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { // 記錄未命中計數,默認爲空 statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); } } // 咱們來細看下 preWriteCleanup(now); 是如何清理過時數據的 /** * Performs routine cleanup prior to executing a write. This should be called every time a * write thread acquires the segment lock, immediately after acquiring the lock. * * <p>Post-condition: expireEntries has been run. */ @GuardedBy("this") void preWriteCleanup(long now) { runLockedCleanup(now); } void runLockedCleanup(long now) { // 再次確保清理數據時,鎖是存在的 if (tryLock()) { try { // 當存在特殊類型數據時,能夠先進行清理 drainReferenceQueues(); // 清理過時數據,按時間清理 expireEntries(now); // calls drainRecencyQueue // 讀計數清零 readCount.set(0); } finally { unlock(); } } } /** * Drain the key and value reference queues, cleaning up internal entries containing garbage * collected keys or values. */ @GuardedBy("this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } } @GuardedBy("this") void expireEntries(long now) { // 更新最近的訪問隊列 drainRecencyQueue(); ReferenceEntry<K, V> e; // 從頭部開始取元素,若是過時就進行清理 // 寫隊列超時: 清理, 訪問隊列超時: 清理 while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } } @Override public ReferenceEntry<K, V> peek() { ReferenceEntry<K, V> next = head.getNextInAccessQueue(); return (next == head) ? null : next; } // 清理指定類型的元素,如 過時元素 @GuardedBy("this") boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; // 調用 removeValueFromChain, 清理具體元素 ReferenceEntry<K, V> newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; } @GuardedBy("this") @Nullable ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first, ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference, RemovalCause cause) { enqueueNotification(key, hash, valueReference, cause); // 清理兩隊列 writeQueue.remove(entry); accessQueue.remove(entry); if (valueReference.isLoading()) { valueReference.notifyNewValue(null); return first; } else { return removeEntryFromChain(first, entry); } } @GuardedBy("this") @Nullable ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first, ReferenceEntry<K, V> entry) { int newCount = count; // 普通狀況,則直接返回 next 元素鏈便可 // 針對有first != entry 的狀況,則依次將 first 移動到隊尾,而後跳到下一個元素返回 ReferenceEntry<K, V> newFirst = entry.getNext(); for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) { // 將first鏈表倒轉到 newFirst 尾部 ReferenceEntry<K, V> next = copyEntry(e, newFirst); if (next != null) { newFirst = next; } else { removeCollectedEntry(e); newCount--; } } this.count = newCount; return newFirst; }
到此,咱們就完整的看到了一個 key 的過時處理流程了。總結就是:
1. 在讀取的時候,觸發清理操做;
2. 使用 ReentrantLock 來進行線程安全的更新;
3. 讀取計數器清零,元素數量減小;
這個和普通的map的put方法同樣,簡單看下便可!
// com.google.common.cache.LocalCache$LocalManualCache @Override public void put(K key, V value) { localCache.put(key, value); } // com.google.common.cache.LocalCache @Override public V put(K key, V value) { checkNotNull(key); checkNotNull(value); int hash = hash(key); return segmentFor(hash).put(key, hash, value, false); } // com.google.common.cache.LocalCache$Segment @Nullable V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count + 1; if (newCount > this.threshold) { // ensure capacity expand(); newCount = this.count + 1; } AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); // Look for an existing entry. for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { // We found an existing entry. ValueReference<K, V> valueReference = e.getValueReference(); V entryValue = valueReference.get(); if (entryValue == null) { ++modCount; if (valueReference.isActive()) { enqueueNotification(key, hash, valueReference, RemovalCause.COLLECTED); setValue(e, key, value, now); newCount = this.count; // count remains unchanged } else { setValue(e, key, value, now); newCount = this.count + 1; } this.count = newCount; // write-volatile evictEntries(); return null; } else if (onlyIfAbsent) { // Mimic // "if (!map.containsKey(key)) ... // else return map.get(key); recordLockedRead(e, now); return entryValue; } else { // clobber existing entry, count remains unchanged ++modCount; enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED); setValue(e, key, value, now); evictEntries(); return entryValue; } } } // Create a new entry. ++modCount; ReferenceEntry<K, V> newEntry = newEntry(key, hash, first); setValue(newEntry, key, value, now); table.set(index, newEntry); newCount = this.count + 1; this.count = newCount; // write-volatile evictEntries(); return null; } finally { unlock(); postWriteCleanup(); } }
就這樣,基於guava的二級緩存功能就搞定了。
老話:感謝你遇到的每個bug!