緩存穿透問題分析壓測

緩存穿透

    緩存穿透,是指查詢一個數據庫必定不存在的數據。正常的使用緩存流程大體是,數據查詢先進行緩存查詢,若是key不存在或者key已通過期,再對數據庫進行查詢,並把查詢到的對象,放進緩存。若是數據庫查詢對象爲空,則不放進緩存。java

    本篇討論緩存擊穿的其中一個表現:web

    對於一些設置了過時時間的key,若是這些key可能會在某些時間點被超高併發地訪問,是一種很是「熱點」的數據。這個時候,須要考慮另一個問題:緩存被「擊穿」的問題。redis

  • 概念:緩存在某個時間點過時的時候,剛好在這個時間點對這個Key有大量的併發請求過來,這些請求發現緩存過時通常都會從後端DB加載數據並回設到緩存,這個時候大併發的請求可能會瞬間把後端DB壓垮。
  • 如何解決:使用mutex。簡單地來講,就是在緩存失效的時候(判斷拿出來的值爲空),不是當即去load db,而是先使用緩存工具的某些帶成功操做返回值的操做(好比Redis的SETNX或者Memcache的ADD)去set一個mutex key,當操做返回成功時,再進行load db的操做並回設緩存;不然,就重試整個get緩存的方法。相似下面的代碼:spring

public String get(key) {
    String value = redis.get(key);
    if (value == null) { //表明緩存值過時
        //設置3min的超時,防止del操做失敗的時候,下次緩存過時一直不能load db
        if (redis.setnx(key_mutex, 1, 3 * 60) == 1) {  //表明設置成功
            value = db.get(key);
                    redis.set(key, value, expire_secs);
                    redis.del(key_mutex);
            } else {  //這個時候表明同時候的其餘線程已經load db並回設到緩存了,這時候重試獲取緩存值便可
                    sleep(50);
                    get(key);  //重試
            }
        } else {
            return value;      
        }
}

接下來,進行併發壓力測試和優化:數據庫

首先是不使用setNX進行併發壓力測試apache

代碼以下:後端

package cn.chinotan.controller;

import lombok.extern.java.Log;
import org.apache.catalina.servlet4preview.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**
 * @program: test
 * @description: redis測試
 * @author: xingcheng
 * @create: 2019-03-09 16:26
 **/
@RestController
@RequestMapping("/redis")
@Log
public class RedisController {

    @Autowired
    StringRedisTemplate redisTemplate;

    public static final String KEY = "chinotan:redis:pass";

    public static final String VALUE = "redis-pass-value";

    /**
     * 模擬耗時操做 3秒
     */
    public static final Long TIME_CONSUMING = 3 * 1000L;

    /**
     * VALUE緩存時間 5秒
     */
    public static final Long VALUE_TIME = 5 * 1000L;

    @GetMapping(value = "/pass")
    public Object hello(HttpServletRequest request) throws Exception {
        long cacheStart = System.currentTimeMillis();
        String value = redisTemplate.opsForValue().get(KEY);
        long cacheEnd = System.currentTimeMillis();
        if (StringUtils.isBlank(value)) {
            // 模擬耗時操做,從數據庫獲取
            long start = System.currentTimeMillis();
            TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING);
            redisTemplate.opsForValue().set(KEY, VALUE, VALUE_TIME, TimeUnit.MILLISECONDS);
            long end = System.currentTimeMillis();
            log.info("從數據庫中獲取耗時: " + (end - start) + "ms");
            return VALUE;
        } else {
            log.info("從緩存中獲取耗時:" + (cacheEnd - cacheStart) + "ms");
            return value;
        }
    }

}

很簡單的一個get請求,先從緩存中獲取數據,若是數據不存在,則從數據庫獲取,這裏用緩存

TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING);

來模擬一個複雜的從數據庫獲取數據的操做,耗時設定爲3秒鐘tomcat

本次測試採用的是springBoot2.0以上進行部署,jmeter進行壓力併發測試springboot

在壓力測試以前,進行springboot自帶的tomcat併發數和鏈接數調整以及redis鏈接池的調整

redis的鏈接池調整以下:

spring:
  redis:
    database: 0
    host: 127.0.0.1
    jedis:
      pool:
        #最大鏈接數據庫鏈接數
        max-active: 5000
        #最大等待鏈接中的數量
        max-idle: 5000
        #最大創建鏈接等待時間。若是超過此時間將接到異常。設爲-1表示無限制。
        max-wait: -1
        #最小等待鏈接中的數量,設 0 爲沒有限制
        min-idle: 10
#    lettuce:
#      pool:
#        max-active: 5000
#        max-idle: 5000
#        max-wait: -1
#        min-idle: 10
#      shutdown-timeout: 5000ms
    password:
    port: 6379
    timeout: 5000

tomcat的調整以下:

server:
  port: 11111
  tomcat: 
      uri-encoding: UTF-8
      max-threads: 500
      max-connections: 10000

這樣redis和tomcat能夠支持大併發請求

設置完成後查看設置是否生效:

redis鏈接池不生效舉例以下:

必須和配置項相同才正確

以後進行壓測準備:下載jmeter,以後步驟以下

啓動後控制檯打印以下:

能夠看到大量併發過來後,會有屢次的查看操做,並無走到緩存,緩存命中率低,緩存的意義就少不少

下面進行優化:

package cn.chinotan.controller;

import lombok.extern.java.Log;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.JedisCommands;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * @program: test
 * @description: redis測試
 * @author: xingcheng
 * @create: 2019-03-09 16:26
 **/
@RestController
@RequestMapping("/redis")
@Log
public class RedisController {

    @Autowired
    StringRedisTemplate redisTemplate;

    public static final String KEY = "chinotan:redis:pass";

    public static final String NX_KEY = "chinotan:redis:nx";

    public static final String VALUE = "redis-pass-value";

    /**
     * 間隔時間 3秒
     */
    public static final Long NX_SLEEP_TIME = 50L;

    /**
     * 模擬耗時操做 3秒
     */
    public static final Long TIME_CONSUMING = 1 * 1000L;

    /**
     * VALUE緩存時間 5秒
     */
    public static final Long VALUE_TIME = 5 * 1000L;

    /**
     * 鎖緩存時間 5分鐘
     */
    public static final Long NX_TIME = 5 * 60L;

    @GetMapping(value = "/pass")
    public Object hello() throws Exception {
        long cacheStart = System.currentTimeMillis();
        String value = redisTemplate.opsForValue().get(KEY);
        long cacheEnd = System.currentTimeMillis();
        if (StringUtils.isBlank(value)) {
            long start = System.currentTimeMillis();
            if (setNX(NX_KEY, NX_KEY)) {
                // 模擬耗時操做,從數據庫獲取
                TimeUnit.MILLISECONDS.sleep(TIME_CONSUMING);
                redisTemplate.opsForValue().set(KEY, VALUE, VALUE_TIME, TimeUnit.MILLISECONDS);
                long end = System.currentTimeMillis();
                redisTemplate.delete(NX_KEY);
                log.info("從數據庫中獲取耗時: " + (end - start) + "ms");
                return VALUE;
            } else {
                TimeUnit.MILLISECONDS.sleep(NX_SLEEP_TIME);
                log.info("緩存穿透遞歸");
                return hello();
            }

        } else {
            log.info("從緩存中獲取耗時:" + (cacheEnd - cacheStart) + "ms");
            return value;
        }
    }

    private boolean setNX(String key, String value) {
        Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(key, value);
        redisTemplate.expire(key, NX_TIME, TimeUnit.SECONDS);
        return aBoolean;
    }

}

經過進行setNX命令操做,這個命令在緩存存在時不會進行覆蓋更新寫入操做,並返回false,緩存不存在纔會進行寫入並返回true,一般會被用來分佈式鎖的設計實現

進行優化後,大量的併發請求不會打到數據庫上,而是每隔50ms進行遞歸重試,這樣只有一個請求會請求數據庫,其餘請求只能從緩存中取數,大大增長了緩存的命中率

下面是壓測結果:

能夠看到從數據庫取數的操做日誌只有一條,從而避免了緩存擊穿的一個表現問題

下一步優化方向:

RedisTemplate提供的setNX操做並非原子操做(一個是保存數據操做,一個是設置緩存時間操做,是兩個請求),在併發環境下可能會有問題,該如何解決呢,歡迎你們留言

相關文章
相關標籤/搜索