分佈式鎖通常有三種實現方式:1. 數據庫樂觀鎖;2. 基於 Redis 的分佈式鎖;3. 基於 ZooKeeper 的分佈式鎖。本文介紹基於 Redis 實現分佈式鎖。 html
關於實現分佈式鎖的三種方式,能夠參考網上的一些關於 分佈式鎖簡單入門以及三種實現方式介紹 (由於不少這種文章,我也不一一列舉了)redis
本文中的分佈式鎖經過註解的方式實現,能夠自定義重試次數,鎖超時時間等。 spring
在沒有使用分佈式鎖以前,若是有兩個線程併發操做同一條數據,可能會出現併發問題(髒讀、不可重複讀、幻讀)。 數據庫
舉個例子,下面是一段將數據的值 +1 的代碼: 緩存
public void addNum() {
try {
// 查詢數據
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模擬耗時操做
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存數據
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {
}
}複製代碼
有兩個線程同時訪問: bash
15:56:06,241 INFO [stdout] (default task-39) start:0
15:56:07,548 INFO [stdout] (default task-40) start:0
15:56:08,242 INFO [stdout] (default task-39) end:1
15:56:09,555 INFO [stdout] (default task-40) end:1複製代碼
能夠看到 task-39 和 task-40 兩個線程讀取到的值均是 0,執行兩次後,值爲 1 ,並非想要的 2。 多線程
具體執行的狀況以下: 併發
若是是單機部署,那麼能夠用多線程的 18 般武藝來解決併發問題,好比加鎖等,改動以下:app
public synchronized void addNum() {
try {
// 查詢數據
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模擬耗時操做
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存數據
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {
}
}複製代碼
加了一個關鍵字 synchronized 就能解決單機下的併發問題,結果以下: dom
16:09:49,539 INFO [stdout] (default task-46) start:0
16:09:51,541 INFO [stdout] (default task-46) end:1
16:09:51,592 INFO [stdout] (default task-47) start:1
16:09:53,597 INFO [stdout] (default task-47) end:2複製代碼
若是集羣部署的話,這種方式就沒法解決了(每臺機器的 JVM 沒法共享,沒法加鎖),只能使用分佈式鎖。
pom.xml:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.4.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.8.9</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.9</version>
</dependency>複製代碼
首先定義一個接口,提供加鎖、解鎖兩個方法。
public interface IDistributedLock {
public static final long TIMEOUT_MILLIS = 5000;
public static final int RETRY_TIMES = Integer.MAX_VALUE;
public static final long SLEEP_MILLIS = 500;
public boolean lock(String key);
public boolean lock(String key, int retryTimes);
public boolean lock(String key, int retryTimes, long sleepMillis);
public boolean lock(String key, long expire);
public boolean lock(String key, long expire, int retryTimes);
public boolean lock(String key, long expire, int retryTimes, long sleepMillis);
public boolean releaseLock(String key);
}複製代碼
定義一個抽象類,實現該接口:
public abstract class AbstractDistributedLockImpl implements IDistributedLock {
@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes) {
return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes, long sleepMillis) {
return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
}
@Override
public boolean lock(String key, long expire) {
return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, long expire, int retryTimes) {
return lock(key, expire, retryTimes, SLEEP_MILLIS);
}
}複製代碼
具體實現:
@Component
public class RedisDistributedLock extends AbstractDistributedLockImpl {
private RedisTemplate<Object, Object> redisTemplate;
private ThreadLocal<String> lockFlag = new ThreadLocal<>();
private static final String UNLOCK_LUA;
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
static {
/**
* Redis 從2.6.0開始經過內置的 Lua 解釋器,可使用 EVAL 命令對 Lua 腳本進行求值,文檔參見: http://doc.redisfans.com/script/eval.html
*/
UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
}
public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
boolean result = setRedis(key, expire);
// 若是獲取鎖失敗,按照傳入的重試次數進行重試
while ((!result) && retryTimes-- > 0) {
try {
System.out.println("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = setRedis(key, expire);
}
return result;
}
/**
* 在獲取鎖的時候就可以保證設置 Redis 值和過時時間的原子性,避免前面提到的兩次 Redis 操做期間出現意外而致使的鎖不能釋放的問題。可是這樣仍是可能會存在一個問題,考慮以下的場景順序:
* <p>
* 1. 線程T1獲取鎖
* 2. 線程T1執行業務操做,因爲某些緣由阻塞了較長時間
* 3. 鎖自動過時,即鎖自動釋放了
* 4. 線程T2獲取鎖
* 5. 線程T1業務操做完畢,釋放鎖(實際上是釋放的線程T2的鎖)
* 6. 按照這樣的場景順序,線程T2的業務操做實際上就沒有鎖提供保護機制了。因此,每一個線程釋放鎖的時候只能釋放本身的鎖,即鎖必需要有一個擁有者的標記,而且也須要保證釋放鎖的原子性操做。
* <p>
* 所以在獲取鎖的時候,能夠生成一個隨機不惟一的串放入當前線程中,而後再放入 Redis 。釋放鎖的時候先判斷鎖對應的值是否與線程中的值相同,相同時才作刪除操做
*
* @param key redis key
* @return 是否釋放鎖成功
*/
@Override
public boolean releaseLock(String key) {
// 釋放鎖的時候,有可能由於持鎖以後方法執行時間大於鎖的有效期,此時有可能已經被另一個線程持有鎖,因此不能直接刪除
try {
List<String> keys = new ArrayList<>();
keys.add(key);
List<String> args = new ArrayList<>();
args.add(lockFlag.get());
// 使用lua腳本刪除redis中匹配value的key,能夠避免因爲方法執行時間過長而redis鎖自動過時失效的時候誤刪其餘線程的鎖
// spring自帶的執行腳本方法中,集羣模式直接拋出不支持執行腳本的異常,因此只能拿到原redis的connection來執行腳本
Long result = redisTemplate.execute((RedisCallback<Long>) redisConnection -> {
Object nativeConnection = redisConnection.getNativeConnection();
// 集羣模式和單機模式雖然執行腳本的方法同樣,可是沒有共同的接口,因此只能分開執行
// 集羣模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
// 單機模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
});
return result != null && result > 0;
} catch (Exception e) {
System.out.println("release lock occured an exception" + e);
} finally {
// 清除掉ThreadLocal中的數據,避免內存溢出
lockFlag.remove();
}
return false;
}
private boolean setRedis(String key, long expire) {
try {
String result = redisTemplate.execute((RedisCallback<String>) redisConnection -> {
JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection();
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
return commands.set(key, uuid, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expire);
});
return !StringUtils.isEmpty(result);
} catch (Exception e) {
System.out.println("set redis occured an exception" + e);
}
return false;
}
}複製代碼
定義一個註解類:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DistributeLock {
/**
* 鎖的資源,key。
* 支持spring El表達式
*/
@AliasFor("name")
String name() default "'default'";
/**
* 鎖的資源,value。
* 支持spring El表達式
*/
@AliasFor("value")
String value() default "'default'";
/**
* 持鎖時間,單位毫秒
*/
long keepMills() default 10000;
/**
* 當獲取失敗時候動做
*/
LockFailAction action() default LockFailAction.CONTINUE;
public enum LockFailAction {
/**
* 放棄
*/
GIVEUP,
/**
* 繼續
*/
CONTINUE;
}
/**
* 重試的間隔時間,設置GIVEUP忽略此項
*/
long sleepMills() default 400;
/**
* 重試次數
*/
int retryTimes() default 5;
}複製代碼
定義切面:
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private IDistributedLock distributedLock;
private ExpressionParser parser = new SpelExpressionParser();
private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
/**
* 定義切入點
*/
@Pointcut("@annotation(com.telehot.distributedlock.annotations.DistributeLock)")
private void lockPoint() {
}
/**
* 環繞通知
*
* @param pjp pjp
* @return 方法返回結果
* @throws Throwable throwable
*/
@Around("lockPoint()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
DistributeLock lockAction = method.getAnnotation(DistributeLock.class);
String logKey = getLogKey(lockAction, pjp, method);
int retryTimes = lockAction.action().equals(DistributeLock.LockFailAction.CONTINUE) ? lockAction.retryTimes() : 0;
boolean lock = distributedLock.lock(logKey, lockAction.keepMills(), retryTimes, lockAction.sleepMills());
if (!lock) {
System.out.println("get lock failed : " + logKey);
return null;
}
//獲得鎖,執行方法,釋放鎖
System.out.println("get lock success : " + logKey);
try {
return pjp.proceed();
} catch (Exception e) {
System.out.println("execute locked method occured an exception" + e);
} finally {
boolean releaseResult = distributedLock.releaseLock(logKey);
System.out.println("release lock : " + logKey + (releaseResult ? " success" : " failed"));
}
return null;
}
/**
* 得到分佈式緩存的key
*
* @param lockAction 註解對象
* @param pjp pjp
* @param method method
* @return String
*/
private String getLogKey(DistributeLock lockAction, ProceedingJoinPoint pjp, Method method) {
String name = lockAction.name();
String value = lockAction.value();
Object[] args = pjp.getArgs();
return parse(name, method, args) + "_" + parse(value, method, args);
}
/**
* 解析spring EL表達式
*
* @param key key
* @param method method
* @param args args
* @return parse result
*/
private String parse(String key, Method method, Object[] args) {
String[] params = discoverer.getParameterNames(method);
if (null == params || params.length == 0 || !key.contains("#")) {
return key;
}
EvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < params.length; i++) {
context.setVariable(params[i], args[i]);
}
return parser.parseExpression(key).getValue(context, String.class);
}
}複製代碼
配置文件:
application.properties:
redis.pool.min-idle=0
redis.pool.max-idle=8
redis.hostName=127.0.0.1
redis.port=6379
複製代碼
applicationContext.xml:
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="${redis.pool.max-idle}"/> <!-- 最大可以保持idel狀態的對象數 -->
<property name="minIdle" value="${redis.pool.min-idle}"/> <!-- 最小可以保持idel狀態的對象數 -->
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<!-- redis 配置 -->
<constructor-arg index="0" ref="jedisPoolConfig"/>
<property name="hostName" value="${redis.hostName}"/>
<property name="port" value="${redis.port}"/>
</bean>
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
</bean>
<aop:aspectj-autoproxy proxy-target-class="true" />複製代碼
注意下,最後一行別忘了加上,否則 AOP 不會起做用。若是報錯,須要加上 xml 的命名空間,能夠自行百度。
修改以前的代碼:
@DistributeLock(name = "#key")
public void addNum(String key) {
try {
// 查詢數據
TE te = teManager.findOne(1L);
System.out.println("start:" + te.getNum());
// 模擬耗時操做
Thread.sleep(2000);
// 值加1
te.setNum(te.getNum() + 1);
// 保存數據
teManager.save(te);
System.out.println("end:" + te.getNum());
} catch (Exception e) {
}
}複製代碼
只須要加一個註解就行,能夠用固定的 key 值,也能夠用業務 ID 來做爲鎖的 key。這邊用了業務 ID 來當 key,註解中也能自定義重試次數、超時時間等。
運行結果:
16:31:11,093 INFO [stdout] (default task-60) get lock success : key1_'default'
16:31:11,102 INFO [stdout] (default task-60) start:0
16:31:12,413 INFO [stdout] (default task-61) lock failed, retrying...4
16:31:12,813 INFO [stdout] (default task-61) lock failed, retrying...3
16:31:13,103 INFO [stdout] (default task-60) end:1
16:31:13,104 INFO [stdout] (default task-60) release lock : key1_'default' success
16:31:13,214 INFO [stdout] (default task-61) get lock success : key1_'default'
16:31:13,222 INFO [stdout] (default task-61) start:1
16:31:15,223 INFO [stdout] (default task-61) end:2
16:31:15,225 INFO [stdout] (default task-61) release lock : key1_'default' success複製代碼
能夠看到 task-60 一開始就獲取到了鎖,而 task-61 一開始獲取鎖失敗,進行了重試,直到 task-60 運行完釋放鎖後,task-61 纔拿到鎖,繼續執行代碼。
總結