通常咱們對緩存讀操做的時候有這麼一個固定的套路:html
代碼例子:java
1 @Override 2 public R selectOrderById(Integer id) { 3 //查詢緩存 4 Object redisObj = valueOperations.get(String.valueOf(id)); 5 6 //命中緩存 7 if(redisObj != null) { 8 //正常返回數據 9 return new R().setCode(200).setData(redisObj).setMsg("OK"); 10 } 11 Order order = orderMapper.selectOrderById(id); 12 if (order != null) { 13 valueOperations.set(String.valueOf(id), order); //加入緩存 14 return new R().setCode(200).setData(order).setMsg("OK"); 15 } 16 return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查詢無果"); 17 }
但這樣寫的代碼是不行的,這代碼裏就有咱們緩存的三大問題的兩大問題.穿透,擊穿.git
第一種狀況:Redis掛掉了,請求所有走數據庫.github
第二種狀況:緩存數據設置的過時時間是相同的,而後恰好這些數據刪除了,所有失效了,這個時候所有請求會到數據庫redis
緩存雪崩若是發生了,頗有可能會把咱們的數據庫搞垮,致使整個服務器癱瘓.算法
對於第二種狀況,很是好解決:sql
在存緩存的時候給過時時間加上一個隨機值,這樣大幅度的減小緩存同時過時.shell
第一種狀況:數據庫
事發前:實現Redis的高可用(主從架構+Sentinel 或者Redis Cluster),儘可能避免Redis掛掉這種狀況發生。
事發中:萬一Redis真的掛了,咱們能夠設置本地緩存(ehcache)+限流(hystrix),儘可能避免咱們的數據庫被幹掉(起碼能保證咱們的服務仍是能正常工做的)
事發後:redis持久化,重啓後自動從磁盤上加載數據,快速恢復緩存數據。設計模式
好比你搶了你同事的女神,你同事很氣,想搞你,在你的項目裏,每次請求的ID爲負數.這個時候緩存確定是沒有的,緩存就沒用了,請求就會所有找數據庫,但數據庫也沒用這個值.因此每次返回空出去.
緩存穿透是指查詢一個必定不存在的數據。因爲緩存不命中,而且出於容錯考慮,若是從數據庫查不到數據則不寫入緩存,這將致使這個不存在的數據每次請求都要到數據庫去查詢,失去了緩存的意義。
這就是緩存穿透:
請求的數據在緩存大量不命中,致使請求走數據庫。
緩存穿透若是發生了,也可能把咱們的數據庫搞垮,致使整個服務癱瘓!
解決緩存穿透也有兩種方案:
緩存空對象代碼例子:
1 public R selectOrderById(Integer id) { 2 return cacheTemplate.redisFindCache(String.valueOf(id), 10, TimeUnit.MINUTES, new CacheLoadble<Order>() { 3 @Override 4 public Order load() { 5 return orderMapper.selectOrderById(id); 6 } 7 },false); 8 }
1 public R redisFindCache(String key, long expire, TimeUnit unit, CacheLoadble<T> cacheLoadble, boolean b) { 2 //查詢緩存 3 Object redisObj = valueOperations.get(String.valueOf(key)); 4 //命中緩存 5 if (redisObj != null) { 6 if(redisObj instanceof NullValueResultDO){ 7 return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查詢無果"); 8 } 9 //正常返回數據 10 return new R().setCode(200).setData(redisObj).setMsg("OK"); 11 } 12 try { 13 T load = cacheLoadble.load();//查詢數據庫 14 if (load != null) { 15 valueOperations.set(key, load, expire, unit); //加入緩存 16 return new R().setCode(200).setData(load).setMsg("OK"); 17 }else{ 18 valueOperations.set(key,new NullValueResultDO(),expire,unit); 19 } 20 21 } finally { 22 23 } 24 return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查詢無果"); 25 }
這裏封裝了一個模板redisFindCache,否則每個方法都要寫這個流程.注意在命中緩存時,要判斷數據是不是空對象.
空對象:
1 @Getter 2 @Setter 3 @ToString 4 public class NullValueResultDO{ 5 6 }
緩存空對象的缺點:有大量的空數據佔用redis的內存.治標不治本.
布隆過濾器:
有谷歌的guava,可是是單機版的,不支持分佈式.
也能夠用redis的位數組bit手寫一個分佈式布隆過濾器,代碼就不寫了.過程就是先把id(好比你是用id爲key的)存進布隆過濾器(會通過特定的算法),當咱們請求接口的時候先讓它查詢布隆過濾器,判斷數據是否存在.
上面的代碼還有個緩存擊穿(緩存當中沒有,數據庫中有)問題,就是併發的時候.好比99我的同時請求,仍是會打印99條sql語句,仍是會找數據庫.
這裏的代碼是用的分佈式鎖(互斥鎖)
1 public R redisFindCache(String key, long expire, TimeUnit unit, CacheLoadble<T> cacheLoadble,boolean b){ 2 //判斷是否走過濾器 3 if(b){ 4 //先走過濾器 5 boolean bloomExist = bloomFilter.isExist(String.valueOf(key)); 6 if(!bloomExist){ 7 return new R().setCode(600).setData(null).setMsg("查詢無果"); 8 } 9 } 10 //查詢緩存 11 Object redisObj = valueOperations.get(String.valueOf(key)); 12 //命中緩存 13 if(redisObj != null) { 14 //正常返回數據 15 return new R().setCode(200).setData(redisObj).setMsg("OK"); 16 } 17 // RLock lock0 = redisson.getLock("{taibai0}:" + key); 18 // RLock lock1 = redisson.getLock("{taibai1}:" + key); 19 // RLock lock2 = redisson.getLock("{taibai2}:" + key); 20 // RedissonMultiLock lock = new RedissonMultiLock(lock0,lock1, lock2); 21 try { 22 redisLock.lock(key);//上鎖 23 // lock.lock(); 24 //查詢緩存 25 redisObj = valueOperations.get(String.valueOf(key)); 26 //命中緩存 27 if(redisObj != null) { 28 //正常返回數據 29 return new R().setCode(200).setData(redisObj).setMsg("OK"); 30 } 31 T load = cacheLoadble.load();//查詢數據庫 32 if (load != null) { 33 valueOperations.set(key, load,expire, unit); //加入緩存 34 return new R().setCode(200).setData(load).setMsg("OK"); 35 } 36 return new R().setCode(500).setData(new NullValueResultDO()).setMsg("查詢無果"); 37 }finally { 38 redisLock.unlock(key);//解鎖 39 // lock.unlock(); 40 } 41 }
若是僅僅查詢的話,緩存的數據和數據庫的數據是沒問題的。可是,當咱們要更新時候呢?各類狀況極可能就形成數據庫和緩存的數據不一致了。
從理論上說,只要咱們設置了鍵的過時時間,咱們就能保證緩存和數據庫的數據最終是一致的。由於只要緩存數據過時了,就會被刪除。隨後讀的時候,由於緩存裏沒有,就能夠查數據庫的數據,而後將數據庫查出來的數據寫入到緩存中。
除了設置過時時間,咱們還須要作更多的措施來儘可能避免數據庫與緩存處於不一致的狀況發生。
通常來講,執行更新操做時,咱們會有兩種選擇:
首先,要明確的是,不管咱們選擇哪一個,咱們都但願這兩個操做要麼同時成功,要麼同時失敗。因此,這會演變成一個分佈式事務的問題。
因此,若是原子性被破壞了,可能會有如下的狀況:
若是第一步已經失敗了,咱們直接返回Exception出去就行了,第二步根本不會執行。
下面咱們具體來分析一下吧。
操做緩存也有兩種方案:
通常咱們都是採起刪除緩存緩存策略的,緣由以下:
基於這兩點,對於緩存在更新時而言,都是建議執行刪除操做!
正常狀況是這樣的:
若是原子性被破壞了:
若是在高併發的場景下,出現數據庫與緩存數據不一致的機率特別低,也不是沒有:
要達成上述狀況,仍是說一句機率特別低:
由於這個條件須要發生在讀緩存時緩存失效,並且併發着有一個寫操做。而實際上數據庫的寫操做會比讀操做慢得多,並且還要鎖表,而讀操做必需在寫操做前進入數據庫操做,而又要晚於寫操做更新緩存,全部的這些條件都具有的機率基本並不大。
對於這種策略,實際上是一種設計模式:Cache Aside Pattern
刪除緩存失敗的解決思路:
正常狀況是這樣的:
若是原子性被破壞了:
看起來是很美好,可是咱們在併發場景下分析一下,就知道仍是有問題的了:
因此也會致使數據庫和緩存不一致的問題。
併發下解決數據庫與緩存不一致的思路:
咱們能夠發現,兩種策略各自有優缺點:
在高併發下表現不如意,在原子性被破壞時表現優異
在高併發下表現優異,在原子性被破壞時表現不如意
能夠用databus或者阿里的canal監聽binlog進行更新。
參考資料:
https://coolshell.cn/articles/17416.html
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/redis-consistence.md
https://zhuanlan.zhihu.com/p/48334686
https://blog.csdn.net/z50l2o08e2u4aftor9a/article/details/81008933