spring boot redis分佈式鎖

一. Redis 分佈式鎖的實現以及存在的問題

鎖是針對某個資源,保證其訪問的互斥性,在實際使用當中,這個資源通常是一個字符串。使用 Redis 實現鎖,主要是將資源放到 Redis 當中,利用其原子性,當其餘線程訪問時,若是 Redis 中已經存在這個資源,就不容許以後的一些操做。spring boot使用 Redis 的操做主要是經過 RedisTemplate 來實現,通常步驟以下:html

  1. 將鎖資源放入 Redis (注意是當key不存在時才能放成功,因此使用 setIfAbsent 方法):
redisTemplate.opsForValue().setIfAbsent("key", "value");
  1. 設置過時時間
redisTemplate.expire("key", 30000, TimeUnit.MILLISECONDS);
  1. 釋放鎖
redisTemplate.delete("key");

通常狀況下,這樣的實現就可以知足鎖的需求了,可是若是在調用 setIfAbsent 方法以後線程掛掉了,即沒有給鎖定的資源設置過時時間,默認是永不過時,那麼這個鎖就會一直存在。因此須要保證設置鎖及其過時時間兩個操做的原子性,spring data的 RedisTemplate 當中並無這樣的方法。可是在jedis當中是有這種原子操做的方法的,須要經過 RedisTemplate 的 execute 方法獲取到jedis裏操做命令的對象,代碼以下:java

String result = redisTemplate.execute(new RedisCallback<String>() {
    @Override public String doInRedis(RedisConnection connection) throws DataAccessException {
        JedisCommands commands = (JedisCommands) connection.getNativeConnection(); return commands.set(key, "鎖定的資源", "NX", "PX", expire);
    }
});

注意: Redis 從2.6.12版本開始 set 命令支持 NX 、 PX 這些參數來達到 setnx 、 setex 、 psetex 命令的效果,文檔參見: http://doc.redisfans.com/string/set.htmlredis

NX: 表示只有當鎖定資源不存在的時候才能 SET 成功。利用 Redis 的原子性,保證了只有第一個請求的線程才能得到鎖,而以後的全部線程在鎖定資源被釋放以前都不能得到鎖。spring

PX: expire 表示鎖定的資源的自動過時時間,單位是毫秒。具體過時時間根據實際場景而定sql

這樣在獲取鎖的時候就可以保證設置 Redis 值和過時時間的原子性,避免前面提到的兩次 Redis 操做期間出現意外而致使的鎖不能釋放的問題。可是這樣仍是可能會存在一個問題,考慮以下的場景順序:架構

  • 線程T1獲取鎖
  • 線程T1執行業務操做,因爲某些緣由阻塞了較長時間
  • 鎖自動過時,即鎖自動釋放了
  • 線程T2獲取鎖
  • 線程T1業務操做完畢,釋放鎖(實際上是釋放的線程T2的鎖)

按照這樣的場景順序,線程T2的業務操做實際上就沒有鎖提供保護機制了。因此,每一個線程釋放鎖的時候只能釋放本身的鎖,即鎖必需要有一個擁有者的標記,而且也須要保證釋放鎖的原子性操做。併發

所以在獲取鎖的時候,能夠生成一個隨機不惟一的串放入當前線程中,而後再放入 Redis 。釋放鎖的時候先判斷鎖對應的值是否與線程中的值相同,相同時才作刪除操做。app

Redis 從2.6.0開始經過內置的 Lua 解釋器,可使用 EVAL 命令對 Lua 腳本進行求值,文檔參見: http://doc.redisfans.com/script/eval.htmldom

所以咱們能夠經過 Lua 腳原本達到釋放鎖的原子操做,定義 Lua 腳本以下:異步

if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end

具體意思能夠參考上面提供的文檔地址

使用 RedisTemplate 執行的代碼以下:

// 使用Lua腳本刪除Redis中匹配value的key,能夠避免因爲方法執行時間過長而redis鎖自動過時失效的時候誤刪其餘線程的鎖 // spring自帶的執行腳本方法中,集羣模式直接拋出不支持執行腳本的異常,因此只能拿到原redis的connection來執行腳本 Long result = redisTemplate.execute(new RedisCallback<Long>() { public Long doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection(); // 集羣模式和單機模式雖然執行腳本的方法同樣,可是沒有共同的接口,因此只能分開執行 // 集羣模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
        } // 單機模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
        } return 0L;
    }
});

代碼中分爲集羣模式和單機模式,而且二者的方法、參數都同樣,緣由是spring封裝的執行腳本的方法中( RedisConnection 接口繼承於 RedisScriptingCommands 接口的 eval 方法),集羣模式的方法直接拋出了不支持執行腳本的異常(雖然實際是支持的),因此只能拿到 Redis 的connection來執行腳本,而 JedisCluster 和 Jedis 中的方法又沒有實現共同的接口,因此只能分開調用。

spring封裝的集羣模式執行腳本方法源碼:

# JedisClusterConnection.java /**
 * (non-Javadoc)
 * @see org.springframework.data.redis.connection.RedisScriptingCommands#eval(byte[], org.springframework.data.redis.connection.ReturnType, int, byte[][])
 */ @Override public  T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) { throw new InvalidDataAccessApiUsageException("Eval is not supported in cluster environment.");
}

至此,咱們就完成了一個相對可靠的 Redis 分佈式鎖,可是,在集羣模式的極端狀況下,仍是可能會存在一些問題,好比以下的場景順序( 本文暫時不深刻開展 ):

  • 線程T1獲取鎖成功
  • Redis 的master節點掛掉,slave自動頂上
  • 線程T2獲取鎖,會從slave節點上去判斷鎖是否存在,因爲Redis的master slave複製是異步的,因此此時線程T2可能成功獲取到鎖

爲了能夠之後擴展爲使用其餘方式來實現分佈式鎖,定義了接口和抽象類,全部的源碼以下:

# DistributedLock.java 頂級接口 /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:05
 * @version 1.0.0
 */ public interface DistributedLock { public static final long TIMEOUT_MILLIS = 30000; public static final int RETRY_TIMES = Integer.MAX_VALUE; public static final long SLEEP_MILLIS = 500; public boolean lock(String key); public boolean lock(String key, int retryTimes); public boolean lock(String key, int retryTimes, long sleepMillis); public boolean lock(String key, long expire); public boolean lock(String key, long expire, int retryTimes); public boolean lock(String key, long expire, int retryTimes, long sleepMillis); public boolean releaseLock(String key);
}
# AbstractDistributedLock.java 抽象類,實現基本的方法,關鍵方法由子類去實現 /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:57
 * @version 1.0.0
 */ public abstract class AbstractDistributedLock implements DistributedLock {

    @Override public boolean lock(String key) { return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, int retryTimes) { return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, int retryTimes, long sleepMillis) { return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
    }

    @Override public boolean lock(String key, long expire) { return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, long expire, int retryTimes) { return lock(key, expire, retryTimes, SLEEP_MILLIS);
    }

}
# RedisDistributedLock.java Redis分佈式鎖的實現 import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:14
 * @version 1.0.0
 */ public class RedisDistributedLock extends AbstractDistributedLock { private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class); private RedisTemplate<Object, Object> redisTemplate; private ThreadLocal<String> lockFlag = new ThreadLocal<String>(); public static final String UNLOCK_LUA; static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    } public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) { super(); this.redisTemplate = redisTemplate;
    }

    @Override public boolean lock(String key, long expire, int retryTimes, long sleepMillis) { boolean result = setRedis(key, expire); // 若是獲取鎖失敗,按照傳入的重試次數進行重試 while((!result) && retryTimes-- > 0){ try {
                logger.debug("lock failed, retrying..." + retryTimes);
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) { return false;
            }
            result = setRedis(key, expire);
        } return result;
    } private boolean setRedis(String key, long expire) { try { String result = redisTemplate.execute(new RedisCallback<String>() {
                @Override public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection(); String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid); return commands.set(key, uuid, "NX", "PX", expire);
                }
            }); return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        } return false;
    }
    
    @Override public boolean releaseLock(String key) { // 釋放鎖的時候,有可能由於持鎖以後方法執行時間大於鎖的有效期,此時有可能已經被另一個線程持有鎖,因此不能直接刪除 try {
            List<String> keys = new ArrayList<String>();
            keys.add(key);
            List<String> args = new ArrayList<String>();
            args.add(lockFlag.get()); // 使用lua腳本刪除redis中匹配value的key,能夠避免因爲方法執行時間過長而redis鎖自動過時失效的時候誤刪其餘線程的鎖 // spring自帶的執行腳本方法中,集羣模式直接拋出不支持執行腳本的異常,因此只能拿到原redis的connection來執行腳本 Long result = redisTemplate.execute(new RedisCallback() { public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集羣模式和單機模式雖然執行腳本的方法同樣,可是沒有共同的接口,因此只能分開執行 // 集羣模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    } // 單機模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    } return 0L;
                }
            }); return result != null && result > 0;
        } catch (Exception e) {
            logger.error("release lock occured an exception", e);
        } return false;
    }
    
}

二. 基於 AOP 的 Redis 分佈式鎖

在實際的使用過程當中,分佈式鎖能夠封裝好後使用在方法級別,這樣就不用每一個地方都去獲取鎖和釋放鎖,使用起來更加方便。

  • 首先定義個註解:
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:36
 * @version 1.0.0
 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** 鎖的資源,redis的key*/ String value() default "default"; /** 持鎖時間,單位毫秒*/ long keepMills() default 30000; /** 當獲取失敗時候動做*/ LockFailAction action() default LockFailAction.CONTINUE;
    
    public enum LockFailAction{ /** 放棄 */ GIVEUP, /** 繼續 */ CONTINUE;
    } /** 重試的間隔時間,設置GIVEUP忽略此項*/ long sleepMills() default 200; /** 重試次數*/ int retryTimes() default 5;
}
  • 裝配分佈式鎖的bean
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.RedisDistributedLock; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:31
 * @version 1.0.0
 */ @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class)
public class DistributedLockAutoConfiguration {
    
    @Bean @ConditionalOnBean(RedisTemplate.class)
    public DistributedLock redisDistributedLock(RedisTemplate redisTemplate){ return new RedisDistributedLock(redisTemplate);
    }
    
}
  • 定義切面(spring boot配置方式)
import java.lang.reflect.Method; import java.util.Arrays; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock; import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock.LockFailAction; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:22
 * @version 1.0.0
 */ @Aspect @Configuration @ConditionalOnClass(DistributedLock.class) @AutoConfigureAfter(DistributedLockAutoConfiguration.class) public class DistributedLockAspectConfiguration { private final Logger logger = LoggerFactory.getLogger(DistributedLockAspectConfiguration.class); @Autowired private DistributedLock distributedLock; @Pointcut("@annotation(com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock)") private void lockPoint(){
        
    } @Around("lockPoint()") public Object around(ProceedingJoinPoint pjp) throws Throwable{
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String key = redisLock.value(); if(StringUtils.isEmpty(key)){
            Object[] args = pjp.getArgs();
            key = Arrays.toString(args);
        }
        int retryTimes = redisLock.action().equals(LockFailAction.CONTINUE) ? redisLock.retryTimes() : 0;
        boolean lock = distributedLock.lock(key, redisLock.keepMills(), retryTimes, redisLock.sleepMills()); if(!lock) {
            logger.debug("get lock failed : " + key); return null;
        } //獲得鎖,執行方法,釋放鎖 logger.debug("get lock success : " + key); try { return pjp.proceed();
        } catch (Exception e) {
            logger.error("execute locked method occured an exception", e);
        } finally {
            boolean releaseResult = distributedLock.releaseLock(key);
            logger.debug("release lock : " + key + (releaseResult ? " success" : " failed"));
        } return null;
    }
}
  • spring boot starter還須要在 resources/META-INF 中添加 spring.factories 文件
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAutoConfiguration,\ com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAspectConfiguration

這樣封裝以後,使用spring boot開發的項目,直接依賴這個starter,就能夠在方法上加 RedisLock 註解來實現分佈式鎖的功能了,固然若是須要本身控制,直接注入分佈式鎖的bean便可

@Autowired private DistributedLock distributedLock;

若是須要使用其餘的分佈式鎖實現,繼承 AbstractDistributedLock 後實現獲取鎖和釋放鎖的方法便可

歡迎歡迎學Java的朋友們加入java架構交流: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索