部門老大:redis 分佈式鎖再這麼用,我就勸退你

-若有不嚴謹或者錯誤之處,還望不吝賜教,輕點懟,人家仍是個孩子,嚶嚶嚶~面試

引言

最近項目上線的頻率頗高,連着幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前只能硬着頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫代碼,能夠直接叫作Bug。我就熬夜寫了一個bug被罵慘了。redis

因爲是作商城業務,要頻繁的對商品庫存進行扣減,應用是集羣部署,爲避免併發形成庫存超買超賣等問題,採用 redis 分佈式鎖加以控制。本覺得給扣庫存的代碼加上鎖lock.tryLock就萬事大吉了sql

/**
     * @author xiaofu
     * @description 扣減庫存
     * @date 2020/4/21 12:10
     */
   public String stockLock() {
        RLock lock = redissonClient.getLock("stockLock");
        try {
            /**
             * 獲取鎖
             */
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
              
                /**
                 * 扣減庫存
                 */
                。。。。。。
            } else {
                LOGGER.info("未獲取到鎖業務結束..");
            }
        } catch (Exception e) {
            LOGGER.info("處理異常", e);
        } 
        return "ok";
  }
複製代碼

結果業務代碼執行完之後我忘了釋放鎖lock.unlock(),致使redis線程池被打滿,redis服務大面積故障,形成庫存數據扣減混亂,被領導一頓臭罵,這個月績效~ 哎·~。數據庫

隨着 使用redis 鎖的時間越長,我發現 redis 鎖的坑遠比想象中要多。就算在面試題當中redis分佈式鎖的出鏡率也比較高,好比:「用鎖遇到過哪些問題?」 ,「又是如何解決的?」 基本都是一套連招問出來的。緩存

今天就分享一下我用redis 分佈式鎖的踩坑日記,以及一些解決方案,和你們一塊兒共勉。併發

1、鎖未被釋放

這種狀況是一種低級錯誤,就是我上邊犯的錯,因爲當前線程 獲取到redis 鎖,處理完業務後未及時釋放鎖,致使其它線程會一直嘗試獲取鎖阻塞,例如:用Jedis客戶端會報以下的錯誤信息異步

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
複製代碼

redis線程池已經沒有空閒線程來處理客戶端命令。分佈式

解決的方法也很簡單,只要咱們細心一點,拿到鎖的線程處理完業務及時釋放鎖,若是是重入鎖未拿到鎖後,線程能夠釋放當前鏈接而且sleep一段時間。ide

public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
                TODO .........
          } else {
                // 釋放當前redis鏈接
                redis.close();
                // 休眠1000毫秒
                sleep(1000);
          }
        }
    }
複製代碼

2、B的鎖被A給釋放了

咱們知道Redis實現鎖的原理在於 SETNX命令。當 key不存在時將 key的值設爲 value ,返回值爲 1;若給定的 key 已經存在,則 SETNX不作任何動做,返回值爲 0性能

SETNX key value
複製代碼

咱們來設想一下這個場景:AB兩個線程來嘗試給key myLock加鎖,A線程先拿到鎖(假如鎖3秒後過時),B線程就在等待嘗試獲取鎖,到這一點毛病沒有。

那若是此時業務邏輯比較耗時,執行時間已經超過redis鎖過時時間,這時A線程的鎖自動釋放(刪除key),B線程檢測到myLock這個key不存在,執行 SETNX命令也拿到了鎖。

可是,此時A線程執行完業務邏輯以後,仍是會去釋放鎖(刪除key),這就致使B線程的鎖被A線程給釋放了。

爲避免上邊的狀況,通常咱們在每一個線程加鎖時要帶上本身獨有的value值來標識,只釋放指定valuekey,不然就會出現釋放鎖混亂的場景。

3、數據庫事務超時

emm~ 聊redis鎖咋還扯到數據庫事務上來了?彆着急往下看,看下邊這段代碼:

@Transaction
   public void lock() {
   
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
            }
        }
    }
複製代碼

給這個方法添加一個@Transaction註解開啓事務,如代碼中拋出異常進行回滾,要知道數據庫事務但是有超時時間限制的,並不會無條件的一直等一個耗時的數據庫操做。

好比:咱們解析一個大文件,再將數據存入到數據庫,若是執行時間太長,就會致使事務超時自動回滾。

一旦你的key長時間獲取不到鎖,獲取鎖等待的時間遠超過數據庫事務超時時間,程序就會報異常。

通常爲解決這種問題,咱們就須要將數據庫事務改成手動提交、回滾事務。

@Autowired
    DataSourceTransactionManager dataSourceTransactionManager;

    @Transaction
    public void lock() {
        //手動開啓事務
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            while (true) {
                boolean flag = this.getLock(key);
                if (flag) {
                    insert();
                    //手動提交事務
                    dataSourceTransactionManager.commit(transactionStatus);
                }
            }
        } catch (Exception e) {
            //手動回滾事務
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }
複製代碼

4、鎖過時了,業務還沒執行完

這種狀況和咱們上邊提到的第二種比較相似,但解決思路上略有不一樣。

一樣是redis分佈式鎖過時,而業務邏輯沒執行完的場景,不過,這裏換一種思路想問題,redis鎖的過時時間再弄長點不就解決了嗎?

那仍是有問題,咱們能夠在加鎖的時候,手動調長redis鎖的過時時間,可這個時間多長合適?業務邏輯的執行時間是不可控的,調的過長又會影響操做性能。

要是redis鎖的過時時間可以自動續期就行了。

爲了解決這個問題咱們使用redis客戶端redissonredisson很好的解決了redis在分佈式環境下的一些棘手問題,它的宗旨就是讓使用者減小對Redis的關注,將更多精力用在處理業務邏輯上。

redisson對分佈式鎖作了很好封裝,只需調用API便可。

RLock lock = redissonClient.getLock("stockLock");
複製代碼

redisson在加鎖成功後,會註冊一個定時任務監聽這個鎖,每隔10秒就去查看這個鎖,若是還持有鎖,就對過時時間進行續期。默認過時時間30秒。這個機制也被叫作:「看門狗」,這名字。。。

舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過時時間再次重置成30秒。

經過分析下邊redisson的源碼實現能夠發現,不論是加鎖解鎖續約都是客戶端把一些複雜的業務邏輯,經過封裝在Lua腳本中發送給redis,保證這段複雜業務邏輯執行的原子性

@Slf4j
@Service
public class RedisDistributionLockPlus {
 
    /**
     * 加鎖超時時間,單位毫秒, 即:加鎖時間內執行完操做,若是未完成會有並發現象
     */
    private static final long DEFAULT_LOCK_TIMEOUT = 30;
 
    private static final long TIME_SECONDS_FIVE = 5 ;
 
    /**
     * 每一個key的過時時間 {@link LockContent}
     */
    private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
 
    /**
     * redis執行成功的返回
     */
    private static final Long EXEC_SUCCESS = 1L;
 
    /**
     * 獲取鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:超時時間
     */
    private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
            "if redis.call('exists', KEYS[1]) == 0 then " +
               "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
               "for k, v in pairs(t) do " +
                 "if v == 'OK' then return tonumber(ARGV[2]) end " +
               "end " +
            "return 0 end";
 
    /**
     * 釋放鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:業務耗時 arg3: 業務開始設置的timeout
     */
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "local ctime = tonumber(ARGV[2]) " +
            "local biz_timeout = tonumber(ARGV[3]) " +
            "if ctime > 0 then  " +
               "if redis.call('exists', KEYS[2]) == 1 then " +
                   "local avg_time = redis.call('get', KEYS[2]) " +
                   "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                   "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                   "else redis.call('del', KEYS[2]) end " +
               "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
            "end " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
    /**
     * 續約lua腳本
     */
    private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
 
 
    private final StringRedisTemplate redisTemplate;
 
    public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        ScheduleTask task = new ScheduleTask(this, lockContentMap);
        // 啓動定時任務
        ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
    }
 
    /**
     * 加鎖
     * 取到鎖加鎖,取不到鎖一直等待知道得到鎖
     *
     * @param lockKey
     * @param requestId 全局惟一
     * @param expire   鎖過時時間, 單位秒
     * @return
     */
    public boolean lock(String lockKey, String requestId, long expire) {
        log.info("開始執行加鎖, lockKey ={}, requestId={}", lockKey, requestId);
        for (; ; ) {
            // 判斷是否已經有線程持有鎖,減小redis的壓力
            LockContent lockContentOld = lockContentMap.get(lockKey);
            boolean unLocked = null == lockContentOld;
            // 若是沒有被鎖,就獲取鎖
            if (unLocked) {
                long startTime = System.currentTimeMillis();
                // 計算超時時間
                long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                String lockKeyRenew = lockKey + "_renew";
 
                RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                List<String> keys = new ArrayList<>();
                keys.add(lockKey);
                keys.add(lockKeyRenew);
                Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                if (null != lockExpire && lockExpire > 0) {
                    // 將鎖放入map
                    LockContent lockContent = new LockContent();
                    lockContent.setStartTime(startTime);
                    lockContent.setLockExpire(lockExpire);
                    lockContent.setExpireTime(startTime + lockExpire * 1000);
                    lockContent.setRequestId(requestId);
                    lockContent.setThread(Thread.currentThread());
                    lockContent.setBizExpire(bizExpire);
                    lockContent.setLockCount(1);
                    lockContentMap.put(lockKey, lockContent);
                    log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId);
                    return true;
                }
            }
            // 重複獲取鎖,在線程池中因爲線程複用,線程相等並不能肯定是該線程的鎖
            if (Thread.currentThread() == lockContentOld.getThread()
                      && requestId.equals(lockContentOld.getRequestId())){
                // 計數 +1
                lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                return true;
            }
 
            // 若是被鎖或獲取鎖失敗,則等待100毫秒
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // 這裏用lombok 有問題
                log.error("獲取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e);
                return false;
            }
        }
    }
 
 
    /**
     * 解鎖
     *
     * @param lockKey
     * @param lockValue
     */
    public boolean unlock(String lockKey, String lockValue) {
        String lockKeyRenew = lockKey + "_renew";
        LockContent lockContent = lockContentMap.get(lockKey);
 
        long consumeTime;
        if (null == lockContent) {
            consumeTime = 0L;
        } else if (lockValue.equals(lockContent.getRequestId())) {
            int lockCount = lockContent.getLockCount();
            // 每次釋放鎖, 計數 -1,減到0時刪除redis上的key
            if (--lockCount > 0) {
                lockContent.setLockCount(lockCount);
                return false;
            }
            consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
        } else {
            log.info("釋放鎖失敗,不是本身的鎖。");
            return false;
        }
 
        // 刪除已完成key,先刪除本地緩存,減小redis壓力, 分佈式鎖,只有一個,因此這裏不加鎖
        lockContentMap.remove(lockKey);
 
        RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        keys.add(lockKeyRenew);
 
        Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                Long.toString(lockContent.getBizExpire()));
        return EXEC_SUCCESS.equals(result);
 
    }
 
    /**
     * 續約
     *
     * @param lockKey
     * @param lockContent
     * @return true:續約成功,false:續約失敗(一、續約期間執行完成,鎖被釋放 二、不是本身的鎖,三、續約期間鎖過時了(未解決))
     */
    public boolean renew(String lockKey, LockContent lockContent) {
 
        // 檢測執行業務線程的狀態
        Thread.State state = lockContent.getThread().getState();
        if (Thread.State.TERMINATED == state) {
            log.info("執行業務的線程已終止,再也不續約 lockKey ={}, lockContent={}", lockKey, lockContent);
            return false;
        }
 
        String requestId = lockContent.getRequestId();
        long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
 
        RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
        log.info("續約結果,True成功,False失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
        return EXEC_SUCCESS.equals(result);
    }
 
 
    static class ScheduleExecutor {
 
        public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
            long delay = unit.toMillis(initialDelay);
            long period_ = unit.toMillis(period);
            // 定時執行
            new Timer("Lock-Renew-Task").schedule(task, delay, period_);
        }
    }
 
    static class ScheduleTask extends TimerTask {
 
        private final RedisDistributionLockPlus redisDistributionLock;
        private final Map<String, LockContent> lockContentMap;
 
        public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
            this.redisDistributionLock = redisDistributionLock;
            this.lockContentMap = lockContentMap;
        }
 
        @Override
        public void run() {
            if (lockContentMap.isEmpty()) {
                return;
            }
            Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
            for (Map.Entry<String, LockContent> entry : entries) {
                String lockKey = entry.getKey();
                LockContent lockContent = entry.getValue();
                long expireTime = lockContent.getExpireTime();
                // 減小線程池中任務數量
                if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                    //線程池異步續約
                    ThreadPool.submit(() -> {
                        boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                        if (renew) {
                            long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                            lockContent.setExpireTime(expireTimeNew);
                        } else {
                            // 續約失敗,說明已經執行完 OR redis 出現問題
                            lockContentMap.remove(lockKey);
                        }
                    });
                }
            }
        }
    }
}

複製代碼

5、redis主從複製的坑

redis高可用最多見的方案就是主從複製(master-slave),這種模式也給redis分佈式鎖挖了一坑。

redis cluster集羣環境下,假如如今A客戶端想要加鎖,它會根據路由規則選擇一臺master節點寫入key mylock,在加鎖成功後,master節點會把key異步複製給對應的slave節點。

若是此時redis master節點宕機,爲保證集羣可用性,會進行主備切換slave變爲了redis masterB客戶端在新的master節點上加鎖成功,而A客戶端也覺得本身仍是成功加了鎖的。

此時就會致使同一時間內多個客戶端對一個分佈式鎖完成了加鎖,致使各類髒數據的產生。

至於解決辦法嘛,目前看尚未什麼根治的方法,只能儘可能保證機器的穩定性,減小發生此事件的機率。

總結

上面就是我在使用Redis 分佈式鎖時遇到的一些坑,有點小感慨,常常用一個方法填上這個坑,沒多久就發現另外一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,只不過是在權衡利弊後,選一個在接受範圍內的折中方案而已。


小福利:

關注個人公號,回覆【666】,幾百本各種技術電子書相送,噓~免費 送給你們,無套路自行領取

相關文章
相關標籤/搜索