緩存穿透,是指查詢一個數據庫必定不存在的數據。正常的使用緩存流程大體是,數據查詢先進行緩存查詢,若是key不存在或者key已通過期,再對數據庫進行查詢,並把查詢到的對象,放進緩存。若是數據庫查詢對象爲空,則不放進緩存。java
本篇討論緩存擊穿的其中一個表現:web
對於一些設置了過時時間的key,若是這些key可能會在某些時間點被超高併發地訪問,是一種很是「熱點」的數據。這個時候,須要考慮另一個問題:緩存被「擊穿」的問題。redis
如何解決:使用mutex。簡單地來講,就是在緩存失效的時候(判斷拿出來的值爲空),不是當即去load db,而是先使用緩存工具的某些帶成功操做返回值的操做(好比Redis的SETNX或者Memcache的ADD)去set一個mutex key,當操做返回成功時,再進行load db的操做並回設緩存;不然,就重試整個get緩存的方法。相似下面的代碼:spring
public String get(key) { String value = redis.get(key); if (value == null) { //表明緩存值過時 //設置3min的超時,防止del操做失敗的時候,下次緩存過時一直不能load db if (redis.setnx(key_mutex, 1, 3 * 60) == 1) { //表明設置成功 value = db.get(key); redis.set(key, value, expire_secs); redis.del(key_mutex); } else { //這個時候表明同時候的其餘線程已經load db並回設到緩存了,這時候重試獲取緩存值便可 sleep(50); get(key); //重試 } } else { return value; } }
接下來,進行併發壓力測試和優化:數據庫
首先是不使用setNX進行併發壓力測試apache
代碼以下:後端
package cn.chinotan.controller; import lombok.extern.java.Log; import org.apache.catalina.servlet4preview.http.HttpServletRequest; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; /** * @program: test * @description: redis測試 * @author: xingcheng * @create: 2019-03-09 16:26 **/ @RestController @RequestMapping("/redis") @Log public class RedisController { @Autowired StringRedisTemplate redisTemplate; public static final String KEY = "chinotan:redis:pass"; public static final String VALUE = "redis-pass-value"; /** * 模擬耗時操做 3秒 */ public static final Long TIME_CONSUMING = 3 * 1000L; /** * VALUE緩存時間 5秒 */ public static final Long VALUE_TIME = 5 * 1000L; @GetMapping(value = "/pass") public Object hello(HttpServletRequest request) throws Exception { long cacheStart = System.currentTimeMillis(); String value = redisTemplate.opsForValue().get(KEY); long cacheEnd = System.currentTimeMillis(); if (StringUtils.isBlank(value)) { // 模擬耗時操做,從數據庫獲取 long start = System.currentTimeMillis(); TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING); redisTemplate.opsForValue().set(KEY, VALUE, VALUE_TIME, TimeUnit.MILLISECONDS); long end = System.currentTimeMillis(); log.info("從數據庫中獲取耗時: " + (end - start) + "ms"); return VALUE; } else { log.info("從緩存中獲取耗時:" + (cacheEnd - cacheStart) + "ms"); return value; } } }
很簡單的一個get請求,先從緩存中獲取數據,若是數據不存在,則從數據庫獲取,這裏用緩存
TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING);
來模擬一個複雜的從數據庫獲取數據的操做,耗時設定爲3秒鐘tomcat
本次測試採用的是springBoot2.0以上進行部署,jmeter進行壓力併發測試springboot
在壓力測試以前,進行springboot自帶的tomcat併發數和鏈接數調整以及redis鏈接池的調整
redis的鏈接池調整以下:
spring: redis: database: 0 host: 127.0.0.1 jedis: pool: #最大鏈接數據庫鏈接數 max-active: 5000 #最大等待鏈接中的數量 max-idle: 5000 #最大創建鏈接等待時間。若是超過此時間將接到異常。設爲-1表示無限制。 max-wait: -1 #最小等待鏈接中的數量,設 0 爲沒有限制 min-idle: 10 # lettuce: # pool: # max-active: 5000 # max-idle: 5000 # max-wait: -1 # min-idle: 10 # shutdown-timeout: 5000ms password: port: 6379 timeout: 5000
tomcat的調整以下:
server: port: 11111 tomcat: uri-encoding: UTF-8 max-threads: 500 max-connections: 10000
這樣redis和tomcat能夠支持大併發請求
設置完成後查看設置是否生效:
redis鏈接池不生效舉例以下:
必須和配置項相同才正確
以後進行壓測準備:下載jmeter,以後步驟以下
啓動後控制檯打印以下:
能夠看到大量併發過來後,會有屢次的查看操做,並無走到緩存,緩存命中率低,緩存的意義就少不少
下面進行優化:
package cn.chinotan.controller; import lombok.extern.java.Log; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import redis.clients.jedis.JedisCommands; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * @program: test * @description: redis測試 * @author: xingcheng * @create: 2019-03-09 16:26 **/ @RestController @RequestMapping("/redis") @Log public class RedisController { @Autowired StringRedisTemplate redisTemplate; public static final String KEY = "chinotan:redis:pass"; public static final String NX_KEY = "chinotan:redis:nx"; public static final String VALUE = "redis-pass-value"; /** * 間隔時間 3秒 */ public static final Long NX_SLEEP_TIME = 50L; /** * 模擬耗時操做 3秒 */ public static final Long TIME_CONSUMING = 1 * 1000L; /** * VALUE緩存時間 5秒 */ public static final Long VALUE_TIME = 5 * 1000L; /** * 鎖緩存時間 5分鐘 */ public static final Long NX_TIME = 5 * 60L; @GetMapping(value = "/pass") public Object hello() throws Exception { long cacheStart = System.currentTimeMillis(); String value = redisTemplate.opsForValue().get(KEY); long cacheEnd = System.currentTimeMillis(); if (StringUtils.isBlank(value)) { long start = System.currentTimeMillis(); if (setNX(NX_KEY, NX_KEY)) { // 模擬耗時操做,從數據庫獲取 TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING); redisTemplate.opsForValue().set(KEY, VALUE, VALUE_TIME, TimeUnit.MILLISECONDS); long end = System.currentTimeMillis(); redisTemplate.delete(NX_KEY); log.info("從數據庫中獲取耗時: " + (end - start) + "ms"); return VALUE; } else { TimeUnit.MILLISECONDS.sleep(NX_SLEEP_TIME); log.info("緩存穿透遞歸"); return hello(); } } else { log.info("從緩存中獲取耗時:" + (cacheEnd - cacheStart) + "ms"); return value; } } private boolean setNX(String key, String value) { Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(key, value); redisTemplate.expire(key, NX_TIME, TimeUnit.SECONDS); return aBoolean; } }
經過進行setNX命令操做,這個命令在緩存存在時不會進行覆蓋更新寫入操做,並返回false,緩存不存在纔會進行寫入並返回true,一般會被用來分佈式鎖的設計實現
進行優化後,大量的併發請求不會打到數據庫上,而是每隔50ms進行遞歸重試,這樣只有一個請求會請求數據庫,其餘請求只能從緩存中取數,大大增長了緩存的命中率
下面是壓測結果:
能夠看到從數據庫取數的操做日誌只有一條,從而避免了緩存擊穿的一個表現問題
RedisTemplate提供的setNX操做並非原子操做(一個是保存數據操做,一個是設置緩存時間操做,是兩個請求),在併發環境下可能會有問題,該如何解決呢,歡迎你們留言