關於緩存的使用,我的經驗仍是比較欠缺,對於緩存在應用系統中的使用也只是前幾個月在公司實習的時候,簡單的使用過,且使用的都是人家把框架搭建好的,至於緩存在併發狀況下會產生的一系列問題都已經被框架處理好了,我所作的只是set和get,至於使用時緩存在併發狀況下到底會出現什麼樣的問題,該如何去解決和避免這些問題,沒有去深究。java
秉着「學而時習之」的態度(T_T本身太懶,厚着臉皮),這兩天在鼓搗redis,至於redis的基本使用仍是挺簡單的,今天要說的是我在這個過程當中看到網上博客一直提的關於緩存使用的各類問題,看到好多前輩在高併發下使用緩存都踩了很多大坑,總結他人的經驗也是給本身之後警醒。今天這篇博客只講我對一個問題的理解與思路的想法,並不會去羅列緩存在各類場景下各類解決方案以及各類解決方案之間的優劣,我沒有實際解決緩存問題的經驗,不敢妄自下結論。如下是我的學習過程的記錄,但願各路大俠交流學習。git
場景描述:高併發狀況緩存未命中從而訪問數據庫形成壓力陡增崩潰github
最終解決方案:java中利用讀寫鎖處理併發併發情形redis
業務場景假設:現有一張商品庫存表,業務請求須要根據商品id查詢相應商品的庫存狀況,服務端收到請求返回對應商品的庫存。數據庫
首先,咱們知道使用緩存的基本流程,首先根據key查詢緩存,查詢成功直接返回,查詢失敗(緩存未命中),則查詢數據庫獲得結果寫入緩存再返回。根據前面的場景假設和緩存使用邏輯,請看下面的一段代碼:緩存
1 /** 2 * 根據商品id(也是主鍵)查詢商品庫存記錄 3 */ 4 public GoodsStock selectByPrimaryKey(Integer id) { 5 GoodsStock result; 6 //緩存中查找 7 String goodsStockJsonStr = RedisCache.get(id); 8 9 //緩存中查找成功 10 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 11 logger.info("=====query from cache====="); 12 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class); 13 } 14 //沒有命中緩存,數據庫中查找,並將結果寫入緩存 15 logger.info("=====query from DB====="); 16 result = goodsStockMapper.selectByPrimaryKey(id); 17 //查詢結果寫入緩存 18 RedisCache.set(id, JSONArray.toJSONString(result)); 19 return result; 20 }
以上代碼運行結果,第一次運行緩存中參照失敗是從數據庫中查找,後面每次運行查找相同的id,都是從緩存中獲得(這裏咱先不討論緩存的失效時間之類),只查詢了數據庫一次,因爲每次運行都是單個請求,這段代碼沒有任何問題,如今在多線程下測試這個查詢服務,看看會出現什麼狀況:多線程
1 /** 2 * 10個線程併發調用服務 3 */ 4 @Test 5 public void testMultiThreadQuery() throws Exception{ 6 for(int i = 0; i < 10; i++) { 7 new Thread(new QueryTask()).start(); 8 countDownLatch.countDown(); //啓動線程達到10個時,10個線程同時執行查詢 9 } 10 Thread.sleep(5000); 11 } 12 13 private class QueryTask implements Runnable { 14 @Override 15 public void run() { 16 try { 17 countDownLatch.await(); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 GoodsStock goodsStock = goodsStockService.selectByPrimaryKey(GOODS_ID); 22 } 23 }
運行前咱們先將緩存清空,讓服務請求緩存是出現緩存未命中的狀況,正常狀況是隻要有一個請求查詢出現緩存未命中,那麼就回去查詢數據庫,查詢成功後將結果寫入緩存,這樣後續的請求再查詢統一記錄時,就應該直接從緩存返回,而再也不去查詢數據庫。咱們來看看運行結果,運行結果也在預料之中。併發
1918 [Thread-12] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-5] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-13] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-8] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-7] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-9] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-6] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-4] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-10] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 1918 [Thread-11] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB=====
咱們能夠看到,10個同時到達的請求基本上都是去查詢的數據庫,這點很好理解,由於10個請求同時到達,同時查詢緩存,同時發現緩存沒命中,同時去查數據庫。在這種狀況下,原本後面的請求應該讀取緩存從而達到減輕數據庫壓力的效果,然而在前面這麼多「同時」的情形下,緩存失去了它原有的效果。若是這裏不僅10個請求同時到達,而是在相似秒殺場景下同時有成千上萬個請求到達,那麼數據庫確定不能承受之重直至崩潰。這種場景就很相似於高併發狀況下的緩存擊穿(緩存擊穿是指在高併發狀況下,大量請求查詢一個並不存在的數據,因爲數據不存在,確定會出現緩存不命中,而後去查詢數據庫,而後致使數據庫崩潰。) app
既然咱們清楚得知道問題出如今同時查詢數據庫這裏,那麼很容易就想到利用鎖機制,只讓一個請求去查詢數據庫。框架
利用java提供的鎖機制,讓全部請求到達查詢服務時,若緩存沒有命中,就去競爭一把鎖,獲得鎖的請求才去查詢數據庫,並將查詢結果寫回緩存,後面的請求就直接從緩存中讀取,併發狀況下改進代碼以下:
1 /** 2 * 根據商品id(也是主鍵)查詢商品庫存記錄 3 */ 4 public GoodsStock selectByPrimaryKey(Integer id) { 5 GoodsStock result; 6 //緩存中查找 7 String goodsStockJsonStr = RedisCache.get(id); 8 9 //緩存中查找成功 10 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 11 logger.info("=====query from cache====="); 12 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class); 13 } 14 //沒有命中緩存,這裏加鎖去數據庫中查找,並將結果寫入緩存 15 //後續得到鎖的線程會直接從緩存中讀取,而再也不是訪問數據庫 16 synchronized(this) { 17 goodsStockJsonStr = RedisCache.get(id); 18 if(!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 19 logger.info("=====query from cache====="); 20 return JSONObject.parseObject(goodsStockJsonStr,GoodsStock.class); 21 } 22 logger.info("=====query from DB====="); 23 result = goodsStockMapper.selectByPrimaryKey(id); 24 //查詢結果寫入緩存 25 RedisCache.set(id, JSONArray.toJSONString(result)); 26 } 27 return result; 28 }
這裏,咱們對緩存未命中查詢數據庫的部分進行加鎖進行同步處理,同步代碼塊中再查詢了一次緩存,這樣就保證了同時到達但未得到鎖的線程後面會直接讀取緩存中的數據而再也不訪問數據庫。從而大量減小了同一時刻對數據庫的訪問量。
咱們看看運行結果,能夠發現,只有第一次查詢是從數據庫中查詢,後續查詢全來自緩存:
1 1907 [Thread-11] INFO g.s.impl.GoodsStockServiceImpl - =====query from DB===== 2 2550 [Thread-12] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 3 2578 [Thread-8] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 4 2579 [Thread-7] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 5 2580 [Thread-10] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 6 2581 [Thread-13] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 7 2581 [Thread-5] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 8 2581 [Thread-4] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 9 2582 [Thread-6] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache===== 10 2582 [Thread-9] INFO g.s.impl.GoodsStockServiceImpl - =====query from cache=====
至此,上面提到的在併發的狀況查詢緩存的問題基本上能夠解決,可是咱們都知道,在java中sychronized屬於重量級鎖,讀寫鎖更適合這樣的場景。
這個地方爲甚麼加上讀寫鎖的性能就更高些,這裏涉及到java中的鎖機制問題,就不展開寫,待後面研究清楚再另外單獨記錄。
1 /** 2 * 根據商品id(也是主鍵)查詢商品庫存記錄 3 */ 4 public GoodsStock selectByPrimaryKey(Integer id) { 5 GoodsStock result; 6 readWriteLock.readLock().lock();//添加讀鎖 7 try { 8 //緩存中查找 9 String goodsStockJsonStr = RedisCache.get(id); 10 //緩存中查找成功 11 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 12 logger.info("=====query from cache====="); 13 result = JSONObject.parseObject(goodsStockJsonStr, GoodsStock.class); 14 } else { 15 //若緩存讀取失敗,則須要去數據庫中查詢 16 readWriteLock.readLock().unlock();//釋放讀鎖 17 readWriteLock.writeLock().lock();//添加寫鎖 18 try { 19 goodsStockJsonStr = RedisCache.get(id); 20 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 21 logger.info("=====query from cache====="); 22 return JSONObject.parseObject(goodsStockJsonStr, GoodsStock.class); 23 } 24 logger.info("=====query from DB====="); 25 result = goodsStockMapper.selectByPrimaryKey(id); 26 //查詢結果寫入緩存 27 RedisCache.set(id, JSONArray.toJSONString(result)); 28 } finally { 29 readWriteLock.writeLock().unlock(); 30 readWriteLock.readLock().lock(); 31 } 32 } 33 } finally { 34 readWriteLock.readLock().unlock(); 35 } 36 return result; 37 }
這個地方補充一下,從上面的代碼咱們能夠看到,其實整個查詢方法,主要的業務代碼只有一行:
1 result = goodsStockMapper.selectByPrimaryKey(id);
剩餘的其餘代碼都是無關於業務的其餘處理,咱們在業務中應該儘可能將非業務的代碼抽離出來包裝,使真正的業務代碼簡單高效。對於相似以上這種場景,咱們可使用模板方法,在此簡單補充一下:
查詢業務的模板方法:
1 /** 2 * 併發處理的緩存查詢模板方法 3 * @param queryKey 查詢鍵值 4 * @param expire 緩存過時時間 5 * @param unit 時間單位 6 * @param typeReference 傳入泛型類型的類對象 7 * @param cacheLoadable 業務回調類 8 * @param <T> 9 * @return 10 */ 11 public <T> T queryByCache(String queryKey, long expire, TimeUnit unit, 12 TypeReference<T> typeReference, CacheLoadable<T> cacheLoadable) { 13 T result; 14 readWriteLock.readLock().lock();//添加讀鎖 15 try { 16 //緩存中查找 17 String goodsStockJsonStr = RedisCache.get(queryKey); 18 //緩存中查找成功 19 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 20 logger.info("=====query from cache====="); 21 result = JSONObject.parseObject(goodsStockJsonStr, typeReference); 22 } else { 23 //若緩存讀取失敗,則須要去數據庫中查詢 24 readWriteLock.readLock().unlock();//釋放讀鎖 25 readWriteLock.writeLock().lock();//添加寫鎖 26 try { 27 goodsStockJsonStr = RedisCache.get(queryKey); 28 if (!StringUtils.isEmpty(goodsStockJsonStr) && !"null".equals(goodsStockJsonStr)) { 29 logger.info("=====query from cache====="); 30 return JSONObject.parseObject(goodsStockJsonStr, typeReference); 31 } 32 logger.info("=====query from DB====="); 33 //這裏調用業務傳入的回調方法,真正處理業務的地方只有這一行 34 result = cacheLoadable.load(); 35 RedisCache.set(queryKey, JSONArray.toJSONString(result)); 36 } finally { 37 readWriteLock.writeLock().unlock(); 38 readWriteLock.readLock().lock(); 39 } 40 } 41 } finally { 42 readWriteLock.readLock().unlock(); 43 } 44 return result; 45 }
而後咱們再業務使用的時候,只須要像以下調用便可:
1 public GoodsStock queryByTemplate(Integer id) { 2 return cacheServiceTemplate.queryByCache(String.valueOf(id), 0, null, 3 new TypeReference<GoodsStock>() {}, new CacheLoadable<GoodsStock>() { 4 @Override 5 public GoodsStock load() { 6 return goodsStockMapper.selectByPrimaryKey(id); 7 } 8 }); 9 }
文章中完整源碼:https://github.com/Gonjan/javaPractice/tree/master/src
寫到最後,這篇文章也沒多少乾貨,其實就是將本身動手實踐的一部分記錄下來而已,看來畢竟是博客寫得太少,沒有啥章法,想到哪兒寫到哪,比較亂,寫着寫着就跑偏了(T_T真是哭死),還需多多練習才行。