Redis修行 — 分佈式鎖

java

常見的實現方式

  • 基於數據庫的分佈式鎖
  • 基於緩存的分佈式鎖(redis,memcached等)
  • 基於ZooKeeper的分佈式鎖(臨時有序節點)

本文主要介紹經過Redis本身去實現分佈式鎖以及使用開源框架Redisson去實現分佈式鎖,基於數據庫和Zookeeper方式簡要帶過。nginx

特性

  • 互斥性:只能有一個客戶端持有鎖
  • 防死鎖:客戶端在持有鎖期間崩潰,未能解鎖,也有其餘方式去解鎖,不影響其餘客戶端獲取鎖
  • 只有加鎖的人才能釋放鎖

原理

分佈式鎖本質上能夠理解爲是一個全部客戶端共享的全局變量,當這個全局變量存在時,說明已經有客戶端獲取到了鎖,其餘客戶端只能等它釋放鎖(刪除這個全局變量)後才能獲取到鎖(設置全局變量)。git

基於Redis實現分佈式鎖

按照上面的特性和理論,咱們整理一下基本思路:github

  • 指定一個key做爲鎖標記,存入Redis中,指定一個惟一的用戶標識做爲value
  • 當key不存在時才能設置值,確保同一時間只有一個客戶端得到鎖,知足互斥性特性
  • 設置一個過時時間,防止因系統異常致使沒能刪除這個key,知足防死鎖特性
  • 當處理完業務以後須要清除這個key來釋放鎖。
  • 清除key時須要校驗value值,須要知足只有加鎖的人才能釋放鎖

獲取鎖

使用如下指令:web

SET mylock userId NX PX 10000
複製代碼
  • mylock爲鎖對應的key
  • userId爲惟一的用戶標識,用於刪除時校驗
  • NX表示只有當key不存在時才能set成功,確保只有一個客戶端可以請求成功
  • PX 10000表示這個鎖有一個10秒的自動過時時間

釋放鎖

當業務完成後刪除key來釋放鎖,能夠執行如下lua腳本:redis

if redis.call("get",KEYS[1]) == ARGV[1then
    return redis.call("del",KEYS[1])
else
    return 0
end
複製代碼

執行以上腳本時,須要將mylock做爲KEYS[1]傳進去,將userId做爲ARGV[1]傳進去spring

注意點

  • 必需要給鎖加一個過時時間:這樣即便中間系統異常了,等過時時間到了,也能夠自動釋放鎖,防止出現死鎖現象
  • 獲取鎖時不能分紅先設置key,再設置過時時間兩步去執行,錯誤示例以下:
    # 當key不存在時設置值
    setnx mylock userId
    # 設置過時時間
    expire mylock 10
複製代碼

這樣會存在一個問題,若是系統在執行完setnx以後異常了,expire指令就沒法執行,一樣會出現死鎖現象sql

  • 有必要將value設置爲一個惟一的用戶標識,用於保證所要釋放的鎖是本身創建的,由於在極端的狀況下會出現下列狀況:

A成功獲取了鎖數據庫

A在某個操做上被阻塞了好久apache

A的鎖到達過時時間

B獲取了鎖

A從阻塞中恢復了,執行釋放鎖操做,把B的鎖釋放了,致使B操做不受保護

  • 釋放鎖操做須要保證操做時原子性的,須要經過Lua腳原本實現。它將GET、判斷是否相同、DEL三個步驟以一個原子性的方式去完成。若是按邏輯分開執行一樣會出現相似上面的問題:

A先判斷當前鎖的值,肯定了是本身建的鎖,準備釋放鎖了

由於網路問題或者系統卡頓致使A被阻塞了

A的鎖過時了

B獲取鎖

A從阻塞中恢復了

A調用DEL釋放了B的鎖

缺陷

從上面的描述能夠看出來,當出現系統阻塞或者網絡延遲等狀況下,可能業務尚未執行完成,鎖就過時自動釋放了,這時它的業務操做時不受保護的。

代碼實現

本文樣例基於SpringBoot實現

pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- redis Lettuce 模式 鏈接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
複製代碼
yml配置文件
spring:
  redis:
    # Redis數據庫索引(默認爲0)
    database: 0
    # Redis服務器地址
    host: localhost
    # Redis服務器鏈接端口
    port: 6379
    # Redis服務器鏈接密碼(默認爲空)
    # password: admin
    # 鏈接超時時間(毫秒)
    timeout: 3000ms
    lettuce:
      pool:
        # 鏈接池最大鏈接數(使用負值表示沒有限制)
        max-active: 20
        # 鏈接池最大阻塞等待時間(使用負值表示沒有限制)
        max-wait: 3000ms
        # 鏈接池中的最大空閒鏈接(負數沒有限制)
        max-idle: 8
        # 鏈接池中的最小空閒鏈接
        min-idle: 0
複製代碼
鎖操做
@Component
public class RedisLock {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 加鎖
     */

    public boolean tryLock(String key, String value) {
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, value, 5, TimeUnit.SECONDS);
        if (isLocked == null) {
            return false;
        }
        return isLocked;
    }

    /**
     * 解鎖
     */

    public Boolean unLock(String key, String value) {
        // 執行 lua 腳本
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        // 指定 lua 腳本
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("redis/unLock.lua")));
        // 指定返回類型
        redisScript.setResultType(Long.class);
        // 參數一:redisScript,參數二:key列表,參數三:arg(可多個)
        Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
        return result != null && result > 0;
    }
}
複製代碼

釋放鎖須要執行Lua腳本,路徑爲:resources/redis/unLock.lua

if redis.call("get",KEYS[1]) == ARGV[1then
  return redis.call("del",KEYS[1])
else
  return 0
end
複製代碼
測試

模擬一個減庫存的操做,先在redis中設置庫存量50,key爲productKey,建立訪問接口:

@RestController
@RequestMapping("/redis")
public class RedisController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redisLock";

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/lock")
    public void lockTest() throws InterruptedException {
        // 用戶惟一標識
        String lockValue = UUID.randomUUID().toString().replace("-""");
        Random random = new Random();
        int sleepTime;
        while (true) {
            if (redisLock.tryLock(LOCK_KEY, lockValue)) {
                logger.info("[{}]成功獲取鎖", lockValue);
                break;
            }
            sleepTime = random.nextInt(1000);
            Thread.sleep(sleepTime);
            logger.info("[{}]獲取鎖失敗,{}毫秒後從新嘗試獲取鎖", lockValue, sleepTime);
        }
        // 剩餘庫存
        String products = stringRedisTemplate.opsForValue().get(PRODUCT_KEY);
        if (products == null) {
            logger.info("[{}]獲取剩餘庫存失敗,釋放鎖:{} @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }
        int surplus = Integer.parseInt(products);
        if (surplus <= 0) {
            logger.info("[{}]庫存不足,釋放鎖:{} ##########################################", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
            return;
        }

        logger.info("[{}]當前庫存[{}],操做:庫存-1", lockValue, surplus);
        stringRedisTemplate.opsForValue().decrement(PRODUCT_KEY);
        logger.info("[{}]操做完成,開始釋放鎖,釋放結果:{}", lockValue, redisLock.unLock(LOCK_KEY, lockValue));
    }
}
複製代碼

啓動項目,使用JMeter進行併發測試,設置1秒60次請求,觀察控制檯輸出和最終redis中庫存數量

Redisson 實現

Redisson是【Redis官方推薦】官網推薦分佈式鎖實現的方案。使用起來也很簡單。這裏只作簡單演示,具體能夠看官方文檔

Redis son 莫非是redis親兒子的意思

pom.xml

直接引入redisson-spring-boot-starter,它包含了對spring-boot-starter-webspring-boot-starter-data-redis的依賴

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.12.0</version>
</dependency>
複製代碼

建立配置文件

@Configuration
public class RedissonConfig {
    /**
     * 這裏只配置單節點的,支持集羣、哨兵等方式配置
     * 能夠用Config.fromYAML加載yml文件中的配置
     */

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379")
                .setDatabase(0);
        return Redisson.create(config);
    }
}
複製代碼

注意這裏的address須要以 redis://host:port 的格式

建立測試接口

@RestController
@RequestMapping("/redisson")
public class RedissonController {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String PRODUCT_KEY = "productKey";

    private static final String LOCK_KEY = "redissonLock";

    @Autowired
    private RedissonClient redissonClient;

    @RequestMapping("/lock")
    public void lock() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        // 設置5秒過時時間
        lock.lock(5, TimeUnit.SECONDS);
        String lockValue = lock.toString();
        logger.info("[{}]成功獲取鎖,開始執行業務。。。", lockValue);

        RAtomicLong atomicLong = redissonClient.getAtomicLong(PRODUCT_KEY);
        long surplus = atomicLong.get();
        if (surplus <= 0) {
            lock.unlock();
            logger.info("[{}]庫存不足,釋放鎖 ##########################################", lockValue);
            return;
        }
        logger.info("[{}]當前庫存[{}],庫存 -1,剩餘庫存[{}]", lockValue, surplus, atomicLong.decrementAndGet());

        logger.info("[{}]操做完成,釋放鎖", lockValue);
        lock.unlock();
    }
}
複製代碼

啓動項目,使用JMeter進行併發測試,一樣設置1秒60次請求,觀察控制檯輸出和最終redis中庫存數量

基於數據庫實現分佈式鎖

經過惟一索引的方式

# 創建一張記錄鎖信息的表
lockName -- 鎖名稱。 加上惟一索引,確保只能有一個客戶端得到鎖
creater -- 建立人,只有建立者才能解鎖
expire -- 過時時間
複製代碼
  • 執行前先插入鎖數據,lockName作了惟一性約束,若是多個請求同時提交只會有一個請求提交成功。
  • 執行完後刪除鎖
  • 能夠經過定時任務方式去刪除已過時的數據,防止死鎖

經過樂觀鎖的形式

  • 在須要操做的表中加一個字段version
  • 操做任務前先查詢到當前version的值
    select version from product where product_name = '電腦'
複製代碼
  • 更新數據時,將前面查出來的version的值做爲條件
    update product set product_count = product_count - 1version = version + 1 where product_name = '電腦' and version = ${version}
複製代碼

這樣若是在這期間數據被修改了,那麼version的值就不一致了,更新操做會失敗。這樣就確保了在你業務期間沒有其餘人修改過數據。

基於 ZooKeeper 的分佈式鎖

ZooKeeper的分佈式鎖主要是經過建立臨時有序節點的方式實現的:

  • 發起加鎖請求,在ZooKeeper中建立一個臨時有序節點
  • 判斷本身建立的節點是不是最小序號
  • 若是是最小的,則成功獲取鎖
  • 若是不是最小的,則在它的上一節點加上一個監聽器
  • 處理完業務後,釋放鎖,即刪除對應的節點
  • ZooKeeper通知監聽這個節點的監聽器,你的前面已經沒有其餘節點了,你能夠獲取鎖了
  • 對應節點獲取鎖

能夠發現,ZooKeeper的方式獲取鎖是有序的,先請求的先獲取鎖,而經過redis的方式是無序的,誰先搶到誰得到鎖

訪問源碼

全部代碼均上傳至Github上,方便你們訪問

>>>>>> Redis實現分佈式鎖 <<<<<<

平常求贊

創做不易,若是各位以爲有幫助,求點贊 支持

新建公衆號,求關注

相關文章
相關標籤/搜索