基於Redis的分佈式鎖實現

前言

本篇文章主要介紹基於Redis的分佈式鎖實現究竟是怎麼一回事,其中參考了許多大佬寫的文章,算是對分佈式鎖作一個總結html

分佈式鎖概覽

在多線程的環境下,爲了保證一個代碼塊在同一時間只能由一個線程訪問,Java中咱們通常可使用synchronized語法和ReetrantLock去保證,這其實是本地鎖的方式。可是如今公司都是流行分佈式架構,在分佈式環境下,如何保證不一樣節點的線程同步執行呢?java

實際上,對於分佈式場景,咱們可使用分佈式鎖,它是控制分佈式系統之間互斥訪問共享資源的一種方式。git

好比說在一個分佈式系統中,多臺機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,若是沒有分佈式鎖機制保證,那麼那多臺機器上的多個服務可能進行併發插入操做,致使數據重複插入,對於某些不容許有多餘數據的業務來講,這就會形成問題。而分佈式鎖機制就是爲了解決相似這類問題,保證多個服務之間互斥的訪問共享資源,若是一個服務搶佔了分佈式鎖,其餘服務沒獲取到鎖,就不進行後續操做。大體意思以下圖所示(不必定準確):github

分佈式鎖

分佈式鎖的特色

分佈式鎖通常有以下的特色:redis

  • 互斥性: 同一時刻只能有一個線程持有鎖
  • 可重入性: 同一節點上的同一個線程若是獲取了鎖以後可以再次獲取鎖
  • 鎖超時:和J.U.C中的鎖同樣支持鎖超時,防止死鎖
  • 高性能和高可用: 加鎖和解鎖須要高效,同時也須要保證高可用,防止分佈式鎖失效
  • 具有阻塞和非阻塞性:可以及時從阻塞狀態中被喚醒

分佈式鎖的實現方式

咱們通常實現分佈式鎖有如下幾種方式:算法

  • 基於數據庫
  • 基於Redis
  • 基於zookeeper

本篇文章主要介紹基於Redis如何實現分佈式鎖spring

Redis的分佈式鎖實現

1. 利用setnx+expire命令 (錯誤的作法)

Redis的SETNX命令,setnx key value,將key設置爲value,當鍵不存在時,才能成功,若鍵存在,什麼也不作,成功返回1,失敗返回0 。 SETNX實際上就是SET IF NOT Exists的縮寫數據庫

由於分佈式鎖還須要超時機制,因此咱們利用expire命令來設置,因此利用setnx+expire命令的核心代碼以下:springboot

public boolean tryLock(String key,String requset,int timeout) {
    Long result = jedis.setnx(key, requset);
    // result = 1時,設置成功,不然設置失敗
    if (result == 1L) {
        return jedis.expire(key, timeout) == 1L;
    } else {
        return false;
    }
}
複製代碼

實際上上面的步驟是有問題的,setnx和expire是分開的兩步操做,不具備原子性,若是執行完第一條指令應用異常或者重啓了,鎖將沒法過時。bash

一種改善方案就是使用Lua腳原本保證原子性(包含setnx和expire兩條指令)

2. 使用Lua腳本(包含setnx和expire兩條指令)

代碼以下

public boolean tryLock_with_lua(String key, String UniqueId, int seconds) {
    String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
            "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
    List<String> keys = new ArrayList<>();
    List<String> values = new ArrayList<>();
    keys.add(key);
    values.add(UniqueId);
    values.add(String.valueOf(seconds));
    Object result = jedis.eval(lua_scripts, keys, values);
    //判斷是否成功
    return result.equals(1L);
}
複製代碼

3. 使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令 (正確作法)

Redis在 2.6.12 版本開始,爲 SET 命令增長一系列選項:

SET key value[EX seconds][PX milliseconds][NX|XX]
複製代碼
  • EX seconds: 設定過時時間,單位爲秒
  • PX milliseconds: 設定過時時間,單位爲毫秒
  • NX: 僅當key不存在時設置值
  • XX: 僅當key存在時設置值

set命令的nx選項,就等同於setnx命令,代碼過程以下:

public boolean tryLock_with_set(String key, String UniqueId, int seconds) {
    return "OK".equals(jedis.set(key, UniqueId, "NX", "EX", seconds));
}
複製代碼

value必需要具備惟一性,咱們能夠用UUID來作,設置隨機字符串保證惟一性,至於爲何要保證惟一性?假如value不是隨機字符串,而是一個固定值,那麼就可能存在下面的問題:

  • 1.客戶端1獲取鎖成功
  • 2.客戶端1在某個操做上阻塞了太長時間
  • 3.設置的key過時了,鎖自動釋放了
  • 4.客戶端2獲取到了對應同一個資源的鎖
  • 5.客戶端1從阻塞中恢復過來,由於value值同樣,因此執行釋放鎖操做時就會釋放掉客戶端2持有的鎖,這樣就會形成問題

因此一般來講,在釋放鎖時,咱們須要對value進行驗證

釋放鎖的實現

釋放鎖時須要驗證value值,也就是說咱們在獲取鎖的時候須要設置一個value,不能直接用del key這種粗暴的方式,由於直接del key任何客戶端均可以進行解鎖了,因此解鎖時,咱們須要判斷鎖是不是本身的,基於value值來判斷,代碼以下:

public boolean releaseLock_with_lua(String key,String value) {
    String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
            "return redis.call('del',KEYS[1]) else return 0 end";
    return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
}
複製代碼

這裏使用Lua腳本的方式,儘可能保證原子性。

使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令 看上去很OK,實際上在Redis集羣的時候也會出現問題,好比說A客戶端在Redis的master節點上拿到了鎖,可是這個加鎖的key尚未同步到slave節點,master故障,發生故障轉移,一個slave節點升級爲master節點,B客戶端也能夠獲取同個key的鎖,但客戶端A也已經拿到鎖了,這就致使多個客戶端都拿到鎖。

因此針對Redis集羣這種狀況,還有其餘方案

4. Redlock算法 與 Redisson 實現

Redis做者 antirez基於分佈式環境下提出了一種更高級的分佈式鎖的實現Redlock,原理以下:

下面參考文章Redlock:Redis分佈式鎖最牛逼的實現redis.io/topics/dist…

假設有5個獨立的Redis節點(注意這裏的節點能夠是5個Redis單master實例,也能夠是5個Redis Cluster集羣,但並非有5個主節點的cluster集羣):

  • 獲取當前Unix時間,以毫秒爲單位
  • 依次嘗試從5個實例,使用相同的key和具備惟一性的value(例如UUID)獲取鎖,當向Redis請求獲取鎖時,客戶端應該設置一個網絡鏈接和響應超時時間,這個超時時間應用小於鎖的失效時間,例如你的鎖自動失效時間爲10s,則超時時間應該在5~50毫秒之間,這樣能夠避免服務器端Redis已經掛掉的狀況下,客戶端還在死死地等待響應結果。若是服務端沒有在規定時間內響應,客戶端應該儘快嘗試去另一個Redis實例請求獲取鎖
  • 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就獲得獲取鎖使用的時間,當且僅當從大多數(N/2+1,這裏是3個節點)的Redis節點都取到鎖,而且使用的時間小於鎖失敗時間時,鎖纔算獲取成功。
  • 若是取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)
  • 若是某些緣由,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在全部的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖可是客戶端沒有獲得響應而致使接下來的一段時間不能被從新獲取鎖)

Redisson實現簡單分佈式鎖

對於Java用戶而言,咱們常用Jedis,Jedis是Redis的Java客戶端,除了Jedis以外,Redisson也是Java的客戶端,Jedis是阻塞式I/O,而Redisson底層使用Netty能夠實現非阻塞I/O,該客戶端封裝了鎖的,繼承了J.U.C的Lock接口,因此咱們能夠像使用ReentrantLock同樣使用Redisson,具體使用過程以下。

  1. 首先加入POM依賴
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.6</version>
</dependency>
複製代碼
  1. 使用Redisson,代碼以下(與使用ReentrantLock相似)
// 1. 配置文件
Config config = new Config();
config.useSingleServer()
        .setAddress("redis://127.0.0.1:6379")
        .setPassword(RedisConfig.PASSWORD)
        .setDatabase(0);
//2. 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);

//3. 設置鎖定資源名稱
RLock lock = redissonClient.getLock("redlock");
lock.lock();
try {
    System.out.println("獲取鎖成功,實現業務邏輯");
    Thread.sleep(10000);
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    lock.unlock();
}
複製代碼

關於Redlock算法的實現,在Redisson中咱們可使用RedissonRedLock來完成,具體使用細節能夠參考大佬的文章: mp.weixin.qq.com/s/8uhYult2h…

Redis實現的分佈式鎖輪子

下面利用SpringBoot + Jedis + AOP的組合來實現一個簡易的分佈式鎖。

1. 自定義註解

自定義一個註解,被註解的方法會執行獲取分佈式鎖的邏輯

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /** * 業務鍵 * * @return */
    String key();
    /** * 鎖的過時秒數,默認是5秒 * * @return */
    int expire() default 5;

    /** * 嘗試加鎖,最多等待時間 * * @return */
    long waitTime() default Long.MIN_VALUE;
    /** * 鎖的超時時間單位 * * @return */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
複製代碼

2. AOP攔截器實現

在AOP中咱們去執行獲取分佈式鎖和釋放分佈式鎖的邏輯,代碼以下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("獲取鎖失敗");
                throw new RuntimeException("獲取鎖失敗");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系統異常");
            }
        }  finally {
            logger.info("釋放鎖");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}

複製代碼

3. Redis實現分佈式鎖核心類

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /** * 直接使用setnx + expire方式獲取分佈式鎖 * 非原子性 * * @param key * @param value * @param timeout * @return */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1時,設置成功,不然設置失敗
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /** * 使用Lua腳本,腳本中使用setnex+expire命令進行加鎖操做 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
                "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判斷是否成功
        return result.equals(1L);
    }

    /** * 在Redis的2.6.12及之後中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /** * 自定義獲取鎖的超時時間 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /** * 錯誤的解鎖方法—直接刪除key * * @param key */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /** * 使用Lua腳本進行解鎖操縱,解鎖的時候驗證value值 * * @param jedis * @param key * @param value * @return */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}
複製代碼

4. Controller層控制

定義一個TestController來測試咱們實現的分佈式鎖

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}
複製代碼

小結

分佈式鎖重點在於互斥性,在任意一個時刻,只有一個客戶端獲取了鎖。在實際的生產環境中,分佈式鎖的實現可能會更復雜,而我這裏的講述主要針對的是單機環境下的基於Redis的分佈式鎖實現,至於Redis集羣環境並無過多涉及,有興趣的朋友能夠查閱相關資料。

項目源碼地址:github.com/pjmike/redi…

參考資料 & 鳴謝

相關文章
相關標籤/搜索