Redis分佈式鎖解決方案

咱們知道分佈式鎖的特性是排他、避免死鎖、高可用。分佈式鎖的實現能夠經過數據庫的樂觀鎖(經過版本號)或者悲觀鎖(經過for update)、Redis的setnx()命令、Zookeeper(在某個持久節點添加臨時有序節點,判斷當前節點是不是序列中最小的節點,若是不是則監聽比當前節點還要小的節點。若是是,獲取鎖成功。當被監聽的節點釋放了鎖(也就是被刪除),會通知當前節點。而後當前節點再嘗試獲取鎖,如此反覆)redis

redis.png

本篇文章,主要講如何用Redis的形式實現分佈式鎖。後續文章會講解熱點KEY讀取,緩存穿透和緩存雪崩的場景和解決方案、緩存更新策略等等知識點,理論知識點較多。spring

Redis配置

個人redis配置以下數據庫

spring.redis.host=
spring.redis.port=6379
#reids超時鏈接時間
spring.redis.timeout=100000
spring.redis.password=
#鏈接池最大鏈接數
spring.redis.pool.max-active=10000
#鏈接池最大空閒數
spring.redis.pool.max-idle=1000
#鏈接池最大等待時間
spring.redis.pool.max-wait=10000
複製代碼
@Component
@Getter
@Setter
@ConfigurationProperties(prefix = "spring.redis")
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private int port;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.timeout}")
    private int timeout;

    @Value("${spring.redis.pool.max-active}")
    private int poolMaxActive;

    @Value("${spring.redis.pool.max-idle}")
    private int poolMaxIdle;

    @Value("${spring.redis.pool.max-wait}")
    private int poolMaxWait;
}
複製代碼
@Component
public class RedisPoolFactory {

    @Autowired
    private RedisConfig redisConfig;

    @Bean
    public JedisPool jedisPoolFactory() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
        poolConfig.setMaxTotal(redisConfig.getPoolMaxActive());
        poolConfig.setTestOnBorrow(true);
        poolConfig.setMaxWaitMillis(redisConfig.getPoolMaxWait());
        JedisPool jp = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(),
                redisConfig.getTimeout(), redisConfig.getPassword(), 0);
        return jp;
    }

}
複製代碼

爲了區分不一樣模塊的key,我抽象出了一個KeyPrefix接口和BasePrefix類。apache

public interface KeyPrefix {

    int expireSeconds();

    String getPrefix();
}
複製代碼
/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: TODO
 * @date 2018/5/10 12:35
 */
public abstract class BasePrefix implements KeyPrefix {

    private int expireSeconds;

    private String prefix;

    public BasePrefix(int expireSeconds, String prefix) {
        this.expireSeconds = expireSeconds;
        this.prefix = prefix;
    }

    public BasePrefix(String prefix) {
        this(0, prefix);
    }

    @Override
    public int expireSeconds() {
        return expireSeconds;
    }

    @Override
    public String getPrefix() {
        String className = getClass().getSimpleName();
        return className + ":" + prefix;
    }

}
複製代碼

分佈式鎖分析與編碼

下面進入正文。由於分佈式系統之間是不一樣進程的,單機版的鎖沒法知足要求。因此咱們能夠藉助中間件Redis的setnx()命令實現分佈式鎖。setnx()命令只會對不存在的key設值,返回1表明獲取鎖成功。對存在的key設值,會返回0表明獲取鎖失敗。這裏的value是System.currentTimeMillis() (獲取鎖的時間)+鎖持有的時間。我這裏設置鎖持有的時間是200ms,實際業務執行的時間遠比這200ms要多的多,持有鎖的客戶端應該檢查鎖是否過時,保證鎖在釋放以前不會過時。由於客戶端故障的狀況多是很複雜的。好比如今有A,B倆個客戶端。A客戶端獲取了鎖,執行業務中作了騷操做致使阻塞了好久,時間應該遠遠超過200ms,當A客戶端從阻塞狀態下恢復繼續執行業務代碼時,A客戶端持有的鎖因爲過時已經被其餘客戶端佔有。這時候A客戶端執行釋放鎖的操做,那麼有可能釋放掉其餘客戶端的鎖。緩存

我這裏設置的客戶端等待鎖的時間是200ms。這裏經過輪詢的方式去讓客戶端獲取鎖。若是客戶端在200ms以內沒有鎖的話,直接返回false。實際場景要設置合適的客戶端等待鎖的時間,避免消耗CPU資源。bash

若是獲取鎖的邏輯只有這三行代碼的話,會形成死循環,明顯不符合分佈式鎖的特性。併發

if (jedis.setnx(realKey, value) == 1) {
                    return true;
                }

複製代碼

因此,咱們要加上鎖過時,而後獲取鎖的策略。經過realKey獲取當前的currentValue。currentValue也就是獲取鎖的時間 + 鎖持有的時間。 若是currentValue不等於null 且 currentValue 小於當前時間,說明鎖已通過期。這時候若是忽然來了C,D兩個客戶端獲取鎖的請求,不就讓C,D兩個客戶端都獲取鎖了嗎。若是防止這種現象發生,咱們採用getSet()命令來解決。getSet(key,value)的命令會返回key對應的value,而後再把key原來的值更新爲value。也就是說getSet()返回的是已過時的時間戳。若是這個已過時的時間戳等於currentValue,說明獲取鎖成功。app

假設客戶端A一開始持有鎖,保存在redis中的value(時間戳)等於T1。 這時候客戶端A的鎖已通過期,那麼C,D客戶端就能夠開始爭搶鎖了。currentValue是T1,C客戶端的value是T2,D客戶端的value是T3。首先C客戶端進入到String oldValue = jedis.getSet(realKey, value);這行代碼,得到的oldValue是T1,同時也會把realKey對應的value更新爲T2。再執行後續的代碼,oldValue等於currentValue,那麼客戶端C獲取鎖成功。接着D客戶端也執行到了String oldValue = jedis.getSet(realKey, value);這行代碼,獲取的oldValue是T2,同時也會把realKey對應的value更新爲T3。因爲oldValue不等於currentValue,那麼客戶端D獲取鎖失敗。分佈式

public boolean lock(KeyPrefix prefix, String key, String value) {
        Jedis jedis = null;
        Long lockWaitTimeOut = 200L;
        Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;

        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;

            for (;;) {
                if (jedis.setnx(realKey, value) == 1) {
                    return true;
                }

                String currentValue = jedis.get(realKey);

                // if lock is expired
                if (!StringUtils.isEmpty(currentValue) &&
                        Long.valueOf(currentValue) < System.currentTimeMillis()) {
                    // gets last lock time
                    String oldValue = jedis.getSet(realKey, value);

                    if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
                        return true;
                    }
                }

                lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();

                if (lockWaitTimeOut <= 0L) {
                    return false;
                }
            }
        } finally {
            returnToPool(jedis);
        }
    }
複製代碼

咱們講解了獲取的邏輯,接着講講釋放鎖的邏輯。咱們在這裏加上!StringUtils.isEmpty(currentValue) && value.equals(currentValue)判斷是爲了防止釋放了不屬於當前客戶端的鎖。仍是舉個例子,若是沒有這個邏輯,A客戶端調用unlock()方法以前,鎖忽然就過時了。這時候B客戶端發現鎖過時了,立馬獲取了鎖。而後A客戶端接着調用unlock()方法,卻釋放了本來屬於B客戶端的鎖。ide

public void unlock(KeyPrefix prefix, String key, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;
            String currentValue = jedis.get(realKey);

            if (!StringUtils.isEmpty(currentValue)
                    && value.equals(currentValue)) {
                jedis.del(realKey);
            }
        } catch (Exception ex) {
            log.info("unlock error");
        } finally {
            returnToPool(jedis);
        }
    }
複製代碼

編碼RedisController,模擬商品秒殺操做。測試分佈式鎖是否可行。(強調:這裏只是舉一個例子,更直觀的判斷分佈式鎖可行,不適合實際場景!!!!!實際上搶購,是直接將庫存放入到redis,是否結束標記放入到內存中,經過內存標記和redis中的decr()預減庫存,而後將秒殺消息入隊到消息隊列中,最後消費消息並落地到DB中)

/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: TODO
 * @date 2018/8/28 9:27
 */
@RestController
@RequestMapping("/redis")
public class RedisController {

    private static LongAdder longAdder = new LongAdder();
    private static Long LOCK_EXPIRE_TIME = 200L;
    private static Long stock = 10000L;

    @Autowired
    private RedisService redisService;

    static {
        longAdder.add(10000L);
    }

    @GetMapping("/v1/seckill")
    public String seckillV1() {
        Long time = System.currentTimeMillis() + LOCK_EXPIRE_TIME;
        if (!redisService.lock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time))) {
            return "人太多了,換個姿式操做一下";
        }

        if (longAdder.longValue() == 0L) {
            return "已搶光";
        }

        doSomeThing();

        if (longAdder.longValue() == 0L) {
            return "已搶光";
        }

        longAdder.decrement();

        redisService.unlock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time));

        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已搶" + bought + ", 還剩下" + stock;
    }

    @GetMapping("/detail")
    public String detail() {
        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已搶" + bought + ", 還剩下" + stock;
    }

    @GetMapping("/v2/seckill")
    public String seckillV2() {
        if (longAdder.longValue() == 0L) {
            return "已搶光";
        }

        doSomeThing();

        if (longAdder.longValue() == 0L) {
            return "已搶光";
        }

        longAdder.decrement();

        Long stock = longAdder.longValue();
        Long bought = 10000L - stock;
        return "已搶" + bought + ", 還剩下" + stock;
    }

    @GetMapping("/v3/seckill")
    public String seckillV3() {
        if (stock == 0) {
            return "已搶光";
        }

        doSomeThing();
        stock--;

        Long bought = 10000L - stock;
        return "已搶" + bought + ", 還剩下" + stock;
    }


    public void doSomeThing() {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}

複製代碼

http://localhost:8081/redis/v1/seckill進行壓測,我使用的壓測工具是ab測試工具。這裏用10000個併發用戶,20000個請求來進行壓測。

ab -c 10000 -n 20000 http://localhost:8081/redis/v1/seckill
複製代碼

壓測結果以下:

E:\cmazxiaoma_download\httpd-2.4.34-o102o-x64-vc14\Apache24\bin>ab -c 10000 -n 2
0000 http://localhost:8081/redis/v1/seckill
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 2000 requests
Completed 4000 requests
Completed 6000 requests
Completed 8000 requests
Completed 10000 requests
Completed 12000 requests
Completed 14000 requests
Completed 16000 requests
Completed 18000 requests
Completed 20000 requests
Finished 20000 requests


Server Software:
Server Hostname:        localhost
Server Port:            8081

Document Path:          /redis/v1/seckill
Document Length:        22 bytes

Concurrency Level:      10000
Time taken for tests:   108.426 seconds
Complete requests:      20000
Failed requests:        19991
   (Connect: 0, Receive: 0, Length: 19991, Exceptions: 0)
Total transferred:      3420218 bytes
HTML transferred:       760218 bytes
Requests per second:    184.46 [#/sec] (mean)
Time per request:       54213.000 [ms] (mean)
Time per request:       5.421 [ms] (mean, across all concurrent requests)
Transfer rate:          30.80 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   6.3      0     549
Processing:  2393 36477 16329.1  45101   90269
Waiting:      182 36435 16351.4  45046   90267
Total:       2393 36477 16329.0  45101   90269

Percentage of the requests served within a certain time (ms)
  50%  45101
  66%  47680
  75%  49136
  80%  50392
  90%  53200
  95%  53743
  98%  54510
  99%  56014
 100%  90269 (longest request)
複製代碼

咱們再來看看是否有超賣現象,貌似仍是正常。

。


回溯分析

我打開RedisDesktopManager查看db0的key信息時,發現還有一個key沒有刪除掉。說明咱們寫的unlock()方法在1w併發用戶,2w請求下仍是存在問題。

image.png

仔細推敲本身以前寫的代碼發現(仍是拿上面的例子說事),客戶端D雖然獲取鎖失敗,可是以前進行了String oldValue = jedis.getSet(realKey, value);操做,仍是成功的更新了realKey對應的value。咱們進行unlock()操做時,釋放客戶端的鎖是根據value來標識當前客戶端的。一開始客戶端C的value是T2,因爲客戶端D的getSet()操做,覆蓋掉了客戶端C的value,讓其更新成T3。因爲value.equals(currentValue)條件不成立,因此不會執行到jedis.del(realKey)

其實lock()方法也經不起推敲: 1.分佈式各個系統時間不一致,若是要這樣作,只能進行時間同步。 2.當某個客戶端鎖過時時,多個客戶端開始爭搶鎖。雖然最後只有一個客戶端能成功鎖,可是獲取鎖失敗的客戶端能覆蓋獲取鎖成功客戶端的過時時間。 3.當客戶端的鎖過時時間被覆蓋,會形成鎖不具備標識性,會形成客戶端沒有釋放鎖。

因此咱們要重寫lock與unlock()的邏輯,看到網上已經有不少的解決方案。(不過也有不少錯誤案例)

咱們能夠經過redis的set(key,value,NX,EX,timeout)合併普通的set()和expire()操做,使其具備原子性。

/**
   * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * @param key * @param value * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key * if it already exist. * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds * @param time expire time in the units of <code>expx</code> * @return Status code reply */ public String set(final String key, final String value, final String nxxx, final String expx, final long time) { checkIsInMultiOrPipeline(); client.set(key, value, nxxx, expx, time); return client.getStatusCodeReply(); } 複製代碼

經過set(key,value,NX,EX,timeout)方法,咱們就能夠輕鬆實現分佈式鎖。值得注意的是這裏的value做爲客戶端鎖的惟一標識,不能重複。

public boolean lock1(KeyPrefix prefix, String key, String value, Long lockExpireTimeOut,
                         Long lockWaitTimeOut) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;
            Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;

            for (;;) {
                String result = jedis.set(realKey, value, "NX", "PX", lockExpireTimeOut);

                if ("OK".equals(result)) {
                    return true;
                }

                lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();

                if (lockWaitTimeOut <= 0L) {
                    return false;
                }
            }
        } catch (Exception ex) {
            log.info("lock error");
        } finally {
            returnToPool(jedis);
        }

        return false;
    }
複製代碼

咱們可使用lua腳本合併get()和del()操做,使其具備原子性。一切大功告成。

public boolean unlock1(KeyPrefix prefix, String key, String value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String realKey = prefix.getPrefix() + key;

            String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

            Object result = jedis.eval(luaScript, Collections.singletonList(realKey),
                    Collections.singletonList(value));

            if ("1".equals(result)) {
                return true;
            }

        } catch (Exception ex) {
            log.info("unlock error");
        } finally {
            returnToPool(jedis);
        }
        return false;

    }
複製代碼

剛纔看了評論,看到了各位大佬提出的一系列問題。我作出如下解釋:

  1. 秒殺操做,我在這裏只是舉一個例子,更直觀的判斷分佈式鎖可行,不適合實際場景!!!!!實際上搶購,是將商品庫存放入到redis、將是否結束標記Flag放入到內存中,經過內存標記和redis中的decr()預減庫存,而後將秒殺消息入隊到消息隊列中,最後消費消息並落地到DB中。

2.請耐心讀完本篇文章。第一個案例代碼是錯誤的,我後續講解了如何發現和分析錯誤案例代碼的思路。 在此基礎下,推導出正確的代碼。

3.經過評論,我看到有一篇文章做者的思路是這樣的: 獲取鎖以後,經過標誌位和開啓新線程的方式輪詢去刷新當前客戶端持有鎖的時間,以保證在釋放鎖以前鎖不會過時,而後鎖釋放後,將標誌位置爲false,線程中止循環。可是這樣有一個問題:假如執行了lock()操做以後,客戶端因爲一些緣由阻塞了,那麼unlock()方法一直得不到執行,那麼標誌位一直爲true,開啓刷新過時時間的線程一直死循環,會形成資源的嚴重浪費。並且線程一直增長當前客戶端持有鎖的時間,會形成其餘客戶端一直拿不到鎖,並且形成死鎖。


尾言

你們好,我是cmazxiaoma(寓意是沉夢昂志的小馬),感謝各位閱讀本文章。 小弟不才。 若是您對這篇文章有什麼意見或者錯誤須要改進的地方,歡迎與我討論。 若是您以爲還不錯的話,但願大家能夠點個贊。 但願個人文章對你能有所幫助。 有什麼意見、看法或疑惑,歡迎留言討論。

最後送上:心之所向,素履以往。生如逆旅,一葦以航。

saoqi.png
相關文章
相關標籤/搜索