以商品超賣爲例講解分佈式鎖

本案例主要講解Redis實現分佈式鎖的兩種實現方式:Jedis實現、Redisson實現。網上關於這方面講解太多了,Van自認爲文筆沒他們好,仍是用示例代碼說明。java

1、jedis 實現

該方案只考慮Redis單機部署的場景mysql

1.1 加鎖

1.1.1 原理

jedis.set(String key, String value, String nxxx, String expx, int time)
複製代碼
  1. key: 使用key來當鎖,由於key是惟一的;
  2. value: 我傳的是惟一值(UUID),不少童鞋可能不明白,有key做爲鎖不就夠了嗎,爲何還要用到value?緣由是分佈式鎖要知足解鈴還須繫鈴人:經過給value賦值爲requestId,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候要驗證value值,不能誤解鎖;
  3. nxxx: 這個參數我填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做;
  4. expx: 這個參數我傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定;
  5. time: 與第四個參數相呼應,表明key的過時時間。

1.1.2 小結

  • set()加入了NX參數,能夠保證若是已有key存在,則函數不會調用成功,也就是隻有一個客戶端能持有鎖,知足互斥性;
  • 其次,因爲咱們對鎖設置了過時時間,即便鎖的持有者後續發生崩潰而沒有解鎖,鎖也會由於到了過時時間而自動解鎖(即key被刪除),不會發生死鎖;
  • 最後,由於咱們將value賦值爲requestId,表明加鎖的客戶端請求標識,那麼在客戶端在解鎖的時候就能夠進行校驗是不是同一個客戶端。

1.2 釋放鎖

釋放鎖時須要驗證value值,也就是說咱們在獲取鎖的時候須要設置一個value,不能直接用del key這種粗暴的方式,由於直接del key任何客戶端均可以進行解鎖了,因此解鎖時,咱們須要判斷鎖是不是本身的(基於value值來判斷)git

  1. 首先,寫了一個簡單Lua腳本代碼,做用是:獲取鎖對應的value值,檢查是否與requestId相等,若是相等則刪除鎖(解鎖);
  2. 而後,將Lua代碼傳到jedis.eval()方法裏,並使參數KEYS[1]賦值爲lockKeyARGV[1]賦值爲requestIdeval()方法是將Lua代碼交給Redis服務端執行。

1.3 案例(家庭多人領取獎勵的場景)

這裏放出的是關鍵代碼,詳細可運行的代碼可至文末地址下載示例代碼。github

1.3.1 準備

該案例模擬家庭內多人經過領取一個獎勵,可是隻能有一我的能領取成功,不能重複領取(以前作過獎勵模塊的需求)redis

  • family_reward_record
CREATE TABLE `family_reward_record` (
  `id` bigint(10) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
  `family_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品名稱',
  `reward_type` int(10) NOT NULL DEFAULT '1' COMMENT '商品庫存數量',
  `state` int(1) NOT NULL DEFAULT '0' COMMENT '商品狀態',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '入庫時間',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=270 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='家庭領取獎勵表(家庭內多人只能有一我的能領取成功,不能重複領取)';
複製代碼
  • application.yml
spring:
  datasource:
    url: jdbc:mysql://47.98.178.84:3306/dev
    username: dev
    password: password
    driver-class-name: com.mysql.jdbc.Driver
  redis:
    host: 47.98.178.84
    port: 6379
    password: password
    timeout: 2000
# mybatis
mybatis:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: cn.van.mybatis.demo.entity
複製代碼

1.3.2 核心實現

  • Jedis 單機配置類 - RedisConfig.java
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.timeout}")
    private int timeout;

    @Bean
    public JedisPool redisPoolFactory() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        if (StringUtils.isEmpty(password)) {
            return new JedisPool(jedisPoolConfig, host, port, timeout);
        }
        return new JedisPool(jedisPoolConfig, host, port, timeout, password);
    }

    @Bean(name = "redisTemplate")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
        objectMapper.enableDefaultTyping(DefaultTyping.NON_FINAL);

        Jackson2JsonRedisSerializer<Object> jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        jsonRedisSerializer.setObjectMapper(objectMapper);
        redisTemplate.setDefaultSerializer(jsonRedisSerializer);

        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
複製代碼
  • 分佈式鎖工具類 - RedisDistributedLock.java
@Component
public class RedisDistributedLock {
    /** * 成功獲取鎖標示 */
    private static final String LOCK_SUCCESS = "OK";
    /** * 成功解鎖標示 */
    private static final Long RELEASE_SUCCESS = 1L;

    @Autowired
    private JedisPool jedisPool;

    /** * redis 數據存儲過時時間 */
    final int expireTime = 500;

    /** * 嘗試獲取分佈式鎖 * @param lockKey 鎖 * @param lockValue 請求標識 * @return 是否獲取成功 */
    public boolean tryLock(String lockKey, String lockValue) {
        Jedis jedis = null;
        try{
            jedis = jedisPool.getResource();
            String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
        return false;
    }

    /** * 釋放分佈式鎖 * @param lockKey 鎖 * @param lockValue 請求標識 * @return 是否釋放成功 */
    public boolean unLock(String lockKey, String lockValue) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
        return false;
    }
}
複製代碼
  • 不加鎖時:模擬 familyId = 1 的家庭同時領取獎勵
@Override
public HttpResult receiveAward() {
    Long familyId = 1L;
    Map<String, Object> params = new HashMap<String, Object>(16);
    params.put("familyId", familyId);
    params.put("rewardType", 1);
    int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params);
    if (count == 0) {
        FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now());
        int num = familyRewardRecordMapper.insert(recordDO);
        if (num == 1) {
            return HttpResult.success();
        }
        return HttpResult.failure(-1, "記錄插入失敗");
    }
    return HttpResult.success("該記錄已存在");
}
複製代碼
  • 加鎖的實現:模擬 familyId = 2 的家庭同時領取獎勵
@Override
public HttpResult receiveAwardLock() {
    Long familyId = 2L;
    Map<String, Object> params = new HashMap<String, Object>(16);
    params.put("familyId", familyId);
    params.put("rewardType", 1);
    int count = familyRewardRecordMapper.selectCountByFamilyIdAndRewardType(params);
    if (count == 0) {
        // 沒有記錄則建立領取記錄
        FamilyRewardRecordDO recordDO = new FamilyRewardRecordDO(familyId,1,0,LocalDateTime.now());
        // 分佈式鎖的key(familyId + rewardType)
        String lockKey = recordDO.getFamilyId() + "_" + recordDO.getRewardType();
        // 分佈式鎖的value(惟一值)
        String lockValue = createUUID();
        boolean lockStatus = redisLock.tryLock(lockKey, lockValue);
        // 鎖被佔用
        if (!lockStatus) {
            log.info("鎖已經佔用了");
            return HttpResult.failure(-1,"失敗");
        }
        // 無論多個請求,加鎖以後,只會有一個請求能拿到鎖,進行插入操做
        log.info("拿到了鎖,當前時刻:{}",System.currentTimeMillis());

        int num = familyRewardRecordMapper.insert(recordDO);
        if (num != 1) {
            log.info("數據插入失敗!");
            return HttpResult.failure(-1, "數據插入失敗!");
        }
        log.info("數據插入成功!準備解鎖...");
        boolean unLockState = redisLock.unLock(lockKey,lockValue);
        if (!unLockState) {
            log.info("解鎖失敗!");
            return HttpResult.failure(-1, "解鎖失敗!");
        }
        log.info("解鎖成功!");
        return HttpResult.success();
    }
    log.info("該記錄已存在");
    return HttpResult.success("該記錄已存在");
}
private String createUUID() {
    UUID uuid = UUID.randomUUID();
    String str = uuid.toString().replace("-", "_");
    return str;
}
複製代碼

1.3.3 測試

我採用的是JMeter工具進行測試,加鎖和不加鎖的狀況都設置成:五次併發請求。算法

1.3.3.1 不加鎖

/** * 家庭成員領取獎勵(不加鎖) * @return */
@PostMapping("/receiveAward")
public HttpResult receiveAward() {
    return redisLockService.receiveAward();
}
複製代碼

1.3.3.2 加鎖

/** * 家庭成員領取獎勵(加鎖) * @return */
@PostMapping("/receiveAwardLock")
public HttpResult receiveAwardLock() {
    return redisLockService.receiveAwardLock();
}
複製代碼

經過對比,說明分佈式鎖起做用了。spring

1.4 小結

我上家使用的就是這種加鎖方式,看上去很OK,實際上在Redis集羣的時候會出現問題,好比:sql

A客戶端在Redismaster節點上拿到了鎖,可是這個加鎖的key尚未同步到slave節點,master故障,發生故障轉移,一個slave節點升級爲master節點,B客戶端也能夠獲取同個key的鎖,但客戶端A也已經拿到鎖了,這就致使多個客戶端都拿到鎖。json

正由於如此,Redis做者antirez基於分佈式環境下提出了一種更高級的分佈式鎖的實現方式:Redlock緩存

2、Redlock實現

2.1 原理

antirez提出的Redlock算法大概是這樣的:

Redis的分佈式環境中,咱們假設有NRedis master。這些節點徹底互相獨立,不存在主從複製或者其餘集羣協調機制。咱們確保將在N個實例上使用與在Redis單實例下相同方法獲取和釋放鎖。如今咱們假設有5Redis master節點,同時咱們須要在5臺服務器上面運行這些Redis實例,這樣保證他們不會同時都宕掉。

2.1.1 加鎖

爲了取到鎖,客戶端應該執行如下操做(RedLock算法加鎖步驟):

  1. 獲取當前Unix時間,以毫秒爲單位;
  2. 依次嘗試從5個實例,使用相同的key和具備惟一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設置一個網絡鏈接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間爲10秒,則超時時間應該在5-50毫秒之間。這樣能夠避免服務器端Redis已經掛掉的狀況下,客戶端還在死死地等待響應結果。若是服務器端沒有在規定時間內響應,客戶端應該儘快嘗試去另一個Redis實例請求獲取鎖;
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就獲得獲取鎖使用的時間。當且僅當從大多數(N/2+1,這裏是3個節點)的Redis節點都取到鎖,而且使用的時間小於鎖失效時間時,鎖纔算獲取成功;
  4. 若是取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
  5. 若是由於某些緣由,獲取鎖失敗(沒有在至少N/2+1Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在全部的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖可是客戶端沒有獲得響應而致使接下來的一段時間不能被從新獲取鎖)。

2.1.2 解鎖

向全部的Redis實例發送釋放鎖命令便可,不用關心以前有沒有從Redis實例成功獲取到鎖.

2.2 案例(商品超賣爲例)

這部分以最多見的案例:搶購時的商品超賣(庫存數減小爲負數)爲例

2.2.1 準備

  • good
CREATE TABLE `good` (
                      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵id',
                      `good_name` varchar(255) NOT NULL COMMENT '商品名稱',
                      `good_counts` int(255) NOT NULL COMMENT '商品庫存',
                      `create_time` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '建立時間',
                      PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COMMENT='商品表';
-- 插入兩條測試數據
INSERT INTO `good` VALUES (1, '哇哈哈', 5, '2019-09-20 17:39:04');
INSERT INTO `good` VALUES (2, '衛龍', 5, '2019-09-20 17:39:06');
複製代碼
  • 配置文件跟上面同樣

2.2.2 核心實現

  • Redisson 配置類 RedissonConfig.java

我這裏配置的是單機,更多配置詳見github.com/redisson/re…

@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;

    /** * RedissonClient,單機模式 * @return * @throws IOException */
    @Bean
    public RedissonClient redissonSentinel() {
        //支持單機,主從,哨兵,集羣等模式,此爲單機模式
        
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setPassword(password);
        return Redisson.create(config);
    }
}
複製代碼
  • 不加鎖時
@Override
public HttpResult saleGoods(){
    // 以指定goodId = 1:哇哈哈爲例
    Long goodId = 1L;
    GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId);
    int goodStock = goodDO.getGoodCounts();
    if (goodStock >= 1) {
        goodMapper.saleOneGood(goodId);
    }
    return HttpResult.success();
}
複製代碼
  • 加鎖
@Override
public HttpResult saleGoodsLock(){
    // 以指定goodId = 2:衛龍爲例
    Long goodId = 2L;
    GoodDO goodDO = goodMapper.selectByPrimaryKey(goodId);
    int goodStock = goodDO.getGoodCounts();
    String key = goodDO.getGoodName();
    log.info("{}剩餘總庫存,{}件", key,goodStock);
    // 將商品的實時庫存放在redis 中,便於讀取
    stringRedisTemplate.opsForValue().set(key, Integer.toString(goodStock));
    // redisson 鎖 的key
    String lockKey = goodDO.getId() +"_" + key;
    RLock lock = redissonClient.getLock(lockKey);
    // 設置60秒自動釋放鎖 (默認是30秒自動過時)
    lock.lock(60, TimeUnit.SECONDS);
    // 此步開始,串行銷售
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
    // 若是緩存中庫存量大於1,能夠繼續銷售
    if (stock >= 1) {
        goodDO.setGoodCounts(stock - 1);
        int num = goodMapper.saleOneGood(goodId);
        if (num == 1) {
            // 減庫存成功,將緩存同步
            stringRedisTemplate.opsForValue().set(key,Integer.toString((stock-1)));
        }
        log.info("{},當前庫存,{}件", key,stock);
    }
    lock.unlock();
    return HttpResult.success();
}
複製代碼

2.3 測試

採用的是JMeter工具進行測試,初始化的時候兩個商品的庫存設置都是:5;因此這裏加鎖和不加鎖的狀況都設置成:十次併發請求。

2.3.1 不加鎖

/** * 售賣商品(不加鎖) * @return */
@PostMapping("/saleGoods")
public HttpResult saleGoods() {
    return redisLockService.saleGoods();
}
複製代碼

2.3.2 加鎖

/** * 售賣商品(加鎖) * @return */
@PostMapping("/saleGoodsLock")
public HttpResult saleGoodsLock() {
    return redisLockService.saleGoodsLock();
}
複製代碼

2.3.3 小結

經過2.3.12.3.2的結果對比很明顯:前者出現了超賣狀況,庫存數賣到了-5,這是決不容許的;而加了鎖的狀況後,庫存只會減小到0,便再也不銷售。

3、總結

再次說明:以上代碼不全,如需嘗試,請前往Van 的 Github 查看完整示例代碼

第一種基於Redis的分佈式鎖並不適合用於生產環境。Redisson 可用於生產環境。固然,分佈式的選擇還有Zookeeper的選項,Van後續會整理出來供你們參考。

3.1 示例源碼地址

github.com/vanDusty/Sp…

3.2 技術交流

  1. 風塵博客
  2. 風塵博客-博客園
  3. 風塵博客-CSDN

關注公衆號,瞭解更多:

風塵博客
相關文章
相關標籤/搜索