SpringBoot 集成 redis 分佈式鎖優化

繼上一篇 SpringBoot 集成 redis 分佈式鎖 寫到最後,咱們發現這種分佈式鎖也存在着缺陷,若是A在 setnx 成功後,A成功獲取鎖了,也就是鎖已經存到 Redis 裏面了,此時服務器異常關閉或是重啓,將不會執行咱們的設置過時時間操做,也就不會設置鎖的有效期,這樣的話鎖就不會釋放了,就會產生死鎖。java

爲了解決上篇出現的死鎖問題,提出了雙重防死鎖,能夠更好的解決死鎖問題。redis

原理圖以下: spring

在这里插å
¥å›¾ç‰‡æè¿°

過程分析

一、當A經過 setnx(lockkey,currenttime+timeout) 命令能成功設置 lockkey 時,即返回值爲1;apache

二、當A經過 setnx(lockkey,currenttime+timeout) 命令不能成功設置 lockkey 時,這是不能直接判定獲取鎖失敗;由於咱們在設置鎖時,設置了鎖的超時時間 timeout,當前時間大於 redis 中存儲鍵值爲 lockkey 的 value 值時,能夠認爲上一任的擁有者對鎖的使用權已經失效了,A就能夠強行擁有該鎖;具體斷定過程以下;服務器

三、A經過 get(lockkey),獲取 redis 中的存儲鍵值爲 lockkey 的 value 值,即獲取鎖的相對時間 lockvalueA;分佈式

四、lockvalueA!=null && currenttime>lockvalue,A經過當前的時間與鎖設置的時間作比較,若是當前時間已經大於鎖設置的時間臨界,便可以進一步判斷是否能夠獲取鎖,不然說明該鎖還在被佔用,A就還不能獲取該鎖,結束,獲取鎖失敗;優化

五、步驟4返回結果爲 true 後,經過 getSet 設置新的超時時間,並返回舊值 lockvalueB,以做判斷,由於在分佈式環境,在進入這裏時可能另外的進程獲取到鎖並對值進行了修改,只有舊值與返回的值一致才能說明中間未被其餘進程獲取到這個鎖;this

六、lockvalueB == null || lockvalueA==lockvalueB,判斷:若果 lockvalueB 爲null,說明該鎖已經被釋放了,此時該進程能夠獲取鎖;舊值與返回的 lockvalueB 一致說明中間未被其餘進程獲取該鎖,能夠獲取鎖;不然不能獲取鎖,結束,獲取鎖失敗。spa

代碼實現

項目代碼結構圖code

1570783101690

把上篇的攔截器類(LockMethodInterceptor)的代碼修改以下:

package com.tuhu.twosample.chen.distributed.interceptor;

import com.tuhu.twosample.chen.distributed.annotation.CacheLock;
import com.tuhu.twosample.chen.distributed.common.CacheKeyGenerator;
import org.apache.logging.log4j.util.PropertiesUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.StringUtils;

import java.lang.reflect.Method;

/**
 * @author chendesheng
 * @create 2019/10/11 16:11
 */
@Aspect
@Configuration
public class LockMethodInterceptor {

    @Autowired
    public LockMethodInterceptor(StringRedisTemplate lockRedisTemplate, CacheKeyGenerator cacheKeyGenerator) {
        this.lockRedisTemplate = lockRedisTemplate;
        this.cacheKeyGenerator = cacheKeyGenerator;
    }

    private final StringRedisTemplate lockRedisTemplate;
    private final CacheKeyGenerator cacheKeyGenerator;

    @Around("execution(public * *(..)) && @annotation(com.tuhu.twosample.chen.distributed.annotation.CacheLock)")
    public Object interceptor(ProceedingJoinPoint pjp) {

        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        CacheLock lock = method.getAnnotation(CacheLock.class);

        if (StringUtils.isEmpty(lock.prefix())) {
            throw new RuntimeException("lock key can't be null...");
        }
        final String lockKey = cacheKeyGenerator.getLockKey(pjp);
        final long lockTime = lock.expire();
        try {
            //key不存在才能設置成功,得到了分佈式鎖,設置鎖過時時間
            final Boolean success = lockRedisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()+lockTime));
            if (success) {
                lockRedisTemplate.expire(lockKey, lock.expire(), lock.timeUnit());
            } else {

                String lockValueA = lockRedisTemplate.opsForValue().get(lockKey);
                //查到鎖的值並與當前時間比較檢查其是否已經超時,若超時則能夠從新獲取鎖
                if (lockValueA!=null && System.currentTimeMillis() > Long.valueOf(lockValueA)){
                    //經過用當前時間戳 getAndSet 操做會給對應的key設置新的值並返回舊值,這是一個原子操做
                    String lockValueB = lockRedisTemplate.opsForValue().getAndSet(lockKey,String.valueOf(System.currentTimeMillis()+lockTime));
                    //redis返回nil,則說明該值已經無效
                    if (lockValueB == null && StringUtils.pathEquals(lockValueA,lockValueB)){
                        //獲取鎖成功
                        lockRedisTemplate.expire(lockKey, lock.expire(), lock.timeUnit());
                    }else {
                        //獲取鎖失敗
                        throw new RuntimeException("請勿重複請求");
                    }

                }
                //按理來講 咱們應該拋出一個自定義的 CacheLockException 異常;
                throw new RuntimeException("請勿重複請求");
            }
            try {
                return pjp.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系統異常");
            }
        } finally {

            //若是演示的話須要註釋該代碼;實際應該放開
            // lockRedisTemplate.delete(lockKey);

        }
    }

}
複製代碼

這樣咱們雙重防死鎖的 redis 分佈式鎖也已經實現了。

優化點

加入了超時時間判斷鎖是否超時了,即便A在成功設置了鎖以後,服務器就當即出現宕機或是重啓,也不會出現死鎖問題;由於B在嘗試獲取鎖的時候,若是不能setnx成功,會去獲取 redis 中鎖的超時時間與當前的系統時間作比較,若是當前的系統時間已經大於鎖超時時間,說明A已經對鎖的使用權失效,B能繼續判斷可否獲取鎖,解決了redis分佈式鎖的死鎖問題。

相關文章
相關標籤/搜索