咱們使用緩存的主要目是提高查詢速度和保護數據庫等稀缺資源不被佔滿。而緩存最多見的問題是緩存穿透、擊穿和雪崩,在高併發下這三種狀況都會有大量請求落到數據庫,致使數據庫資源佔滿,引發數據庫故障。今天我主要分享一下layering-cache緩存框架在這個三個問題上的實踐方案。java
在高併發下,查詢一個不存在的值時,緩存不會被命中,致使大量請求直接落到數據庫上,如活動系統裏面查詢一個不存在的活動。git
在高併發下,對一個特定的值進行查詢,可是這個時候緩存正好過時了,緩存沒有命中,致使大量請求直接落到數據庫上,如活動系統裏面查詢活動信息,可是在活動進行過程當中活動緩存忽然過時了。github
在高併發下,大量的緩存key在同一時間失效,致使大量的請求落到數據庫上,如活動系統裏面同時進行着很是多的活動,可是在某個時間點全部的活動緩存所有過時。redis
在layering-cache裏面結合了緩存NULL值,緩存預熱,限流、分級緩存和間接的實現"永不過時"等幾種方案來應對緩存穿透、擊穿和雪崩問題。算法
應對緩存穿透最有效的方法是直接緩存NULL值,可是緩存NULL的時間不能太長,不然NULL數據長時間得不到更新,也不能過短,不然達不到防止緩存擊穿的效果。數據庫
我在layering-cache對NULL值進行了特殊處理,一級緩存不容許存NULL值,二級緩存能夠配置緩存是否容許存NULL值,若是配置能夠容許存NULL值,框架還支持配置緩存非空值和NULL值之間的過時時間倍率,這使得咱們能精準的控制每個緩存的NULL值過時時間,控制粒度很是細。當NULL緩存過時我還可使用限流,緩存預熱等手段來防止穿透。緩存
示例:併發
@Cacheable(value = "people", key = "#person.id", depict = "用戶信息緩存", firstCache = @FirstCache(expireTime = 10, timeUnit = TimeUnit.MINUTES), secondaryCache = @SecondaryCache(expireTime = 10, timeUnit = TimeUnit.HOURS, isAllowNullValue = true, magnification = 10)) public Person findOne(Person person) { Person p = personRepository.findOne(Example.of(person)); logger.info("爲id、key爲:" + p.getId() + "數據作了緩存"); return p; }
在這個例子裏面isAllowNullValue = true表示容許換存NULL值,magnification = 10表示NULL值和非NULL值之間的時間倍率是10,也就是說當緩存值爲NULL是,二級緩存的有效時間將是1個小時。框架
應對緩存穿透的經常使用方法之一是限流,常見的限流算法有滑動窗口,令牌桶算法和漏桶算法,或者直接使用隊列、加鎖等,在layering-cache裏面我主要使用分佈式鎖來作限流。異步
layering-cache數據讀取流程:
下面是讀取數據的核心代碼:
private <T> T executeCacheMethod(RedisCacheKey redisCacheKey, Callable<T> valueLoader) { Lock redisLock = new Lock(redisTemplate, redisCacheKey.getKey() + "_sync_lock"); // 同一個線程循環20次查詢緩存,每次等待20毫秒,若是仍是沒有數據直接去執行被緩存的方法 for (int i = 0; i < RETRY_COUNT; i++) { try { // 先取緩存,若是有直接返回,沒有再去作拿鎖操做 Object result = redisTemplate.opsForValue().get(redisCacheKey.getKey()); if (result != null) { logger.debug("redis緩存 key= {} 獲取到鎖後查詢查詢緩存命中,不須要執行被緩存的方法", redisCacheKey.getKey()); return (T) fromStoreValue(result); } // 獲取分佈式鎖去後臺查詢數據 if (redisLock.lock()) { T t = loaderAndPutValue(redisCacheKey, valueLoader, true); logger.debug("redis緩存 key= {} 從數據庫獲取數據完畢,喚醒全部等待線程", redisCacheKey.getKey()); // 喚醒線程 container.signalAll(redisCacheKey.getKey()); return t; } // 線程等待 logger.debug("redis緩存 key= {} 從數據庫獲取數據未獲取到鎖,進入等待狀態,等待{}毫秒", redisCacheKey.getKey(), WAIT_TIME); container.await(redisCacheKey.getKey(), WAIT_TIME); } catch (Exception e) { container.signalAll(redisCacheKey.getKey()); throw new LoaderCacheValueException(redisCacheKey.getKey(), e); } finally { redisLock.unlock(); } } logger.debug("redis緩存 key={} 等待{}次,共{}毫秒,任未獲取到緩存,直接去執行被緩存的方法", redisCacheKey.getKey(), RETRY_COUNT, RETRY_COUNT * WAIT_TIME, WAIT_TIME); return loaderAndPutValue(redisCacheKey, valueLoader, true); }
當須要加載緩存的時候,須要獲取到鎖纔有權限到後臺去加載緩存數據,不然就會等待(同一個線程循環20次查詢緩存,每次等待20毫秒,若是仍是沒有數據直接去執行被緩存的方法,這個主要是爲了防止獲取到鎖而且去加載緩存的線程出問題,沒有返回而致使死鎖)。當獲取到鎖的線程執行完成會將獲取到的數據放到緩存中,而且喚醒全部等待線程。
這裏須要注意一下讓線程等待必定不能用Thread.sleep()
,我在使用Spring Redis Cache的時候,我發現當併發達到300左右,緩存一旦過時就會引發死鎖,緣由是使用的是sleep方法來讓沒有獲取到鎖的線程等待,當等待的線程不少的時候會產生大量上下文切換,致使獲取到鎖的線程一直獲取不到cpu的執行權,致使死鎖。在layering-cache裏面,咱們使用的是LockSupport.parkNanos
方法,它會釋放cpu資源, 由於咱們使用的是redis分佈式鎖,因此也不能使用wait-notify機制。
有效應對緩存的擊穿和雪崩的方式之一是緩存預加載。
@Cacheable(value = "people", key = "#person.id", depict = "用戶信息緩存", firstCache = @FirstCache(expireTime = 10, timeUnit = TimeUnit.MINUTES), secondaryCache = @SecondaryCache(expireTime = 10, preloadTime = 2,timeUnit = TimeUnit.HOURS,)) public Person findOne(Person person) { Person p = personRepository.findOne(Example.of(person)); logger.info("爲id、key爲:" + p.getId() + "數據作了緩存"); return p; }
在 layering-cache裏面二級緩存會配置兩個時間,expireTime
是緩存的過時時間,preloadTime
是緩存的刷新時間(預加載時間)。每次二級緩存被命中都會去檢查緩存的過去時間是否小於刷新時間,若是小於就會開啓一個異步線程預先去更新緩存,並將新的值放到緩存中,有效的保證了熱點數據**"永不過時"**。這裏預先更新緩存也是須要加鎖的,並非全部的線程都會落到庫上刷新緩存,若是沒有獲取到鎖就直接結束當前線程。
/** * 刷新緩存數據 */ private <T> void refreshCache(RedisCacheKey redisCacheKey, Callable<T> valueLoader, Object result) { Long ttl = redisTemplate.getExpire(redisCacheKey.getKey()); Long preload = preloadTime; // 容許緩存NULL值,則自動刷新時間也要除以倍數 boolean flag = isAllowNullValues() && (result instanceof NullValue || result == null); if (flag) { preload = preload / getMagnification(); } if (null != ttl && ttl > 0 && TimeUnit.SECONDS.toMillis(ttl) <= preload) { // 判斷是否須要強制刷新在開啓刷新線程 if (!getForceRefresh()) { logger.debug("redis緩存 key={} 軟刷新緩存模式", redisCacheKey.getKey()); softRefresh(redisCacheKey); } else { logger.debug("redis緩存 key={} 強刷新緩存模式", redisCacheKey.getKey()); forceRefresh(redisCacheKey, valueLoader); } } } /** * 硬刷新(執行被緩存的方法) * * @param redisCacheKey {@link RedisCacheKey} * @param valueLoader 數據加載器 */ private <T> void forceRefresh(RedisCacheKey redisCacheKey, Callable<T> valueLoader) { // 儘可能少的去開啓線程,由於線程池是有限的 ThreadTaskUtils.run(() -> { // 加一個分佈式鎖,只放一個請求去刷新緩存 Lock redisLock = new Lock(redisTemplate, redisCacheKey.getKey() + "_lock"); try { if (redisLock.lock()) { // 獲取鎖以後再判斷一下過時時間,看是否須要加載數據 Long ttl = redisTemplate.getExpire(redisCacheKey.getKey()); if (null != ttl && ttl > 0 && TimeUnit.SECONDS.toMillis(ttl) <= preloadTime) { // 加載數據並放到緩存 loaderAndPutValue(redisCacheKey, valueLoader, false); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } }); }
在緩存總量和併發量都很大的時候,這個時候緩存若是同時失效,緩存預熱將是一個很是慢長的過程,就好比說服務重啓或新上線一個新的緩存。這個時候咱們能夠採用切流的方式,讓緩存慢慢預熱,如開始切10%流量,觀察沒有異常後,再切30%流量,觀察沒有異常後,再切60%流量,而後全量。這種方式雖然有點繁瑣,可是一旦遇到異常咱們能夠快速的切迴流量,讓風險可控。
整體來講layering-cache在緩存穿透、擊穿和雪崩上是以預防爲主,補救爲輔。而在應對緩存的這些問題上其實也沒有一個徹底完美的方案,只有最適合本身業務系統的方案。目前若是直接使用layering-cache緩存框架已經基本能應對大部分的緩存問題了。