前提, 應用服務是分佈式或多服務, 而這些"多"有共同的"redis";
(2017-12-04) 笑哭, 寫這篇以前一直以爲應該有大神已經寫好了, 但未找到. 其實redis官網已經給出了實現(百度、阿里都是用的這套): Redis分佈式鎖、Distributed locks with Redis
java版本的名字叫redisson, 其github: https://github.com/redisson/redisson
GitHub: https://github.com/vergilyn/SpringBootDemo
代碼結構:
html
(具體實現思路參考: 分佈式鎖的實現、如何用消息系統避免分佈式事務?)java
一、基於數據庫
能夠用數據庫的行鎖for update
, 或專門新建一張鎖控制表
來實現.
過於依賴數據庫, 且健壯性也不是特別好, 徹底能夠把此種方案捨棄.
(話說都涉及到分佈式或多服務器,基本主要仍是用redis、memcached或其餘緩存服務實現併發鎖)
二、基於ZooKeeper實現分佈式鎖
並未去研究, 參考上面的博客連接.
三、基於redis實現
redis實現的複雜度不算高, 只是須要注意一些實現細節. 健壯性貌似只比zookeeper差點, 但徹底可接受.git
一、主要的redis核心命令: 利用redis是單線程的特性, 用setnx、getset、time來實現.
二、思路: redis的key-value就表明一個對象鎖, 當此key存在說明鎖已被獲取, 其他相同對象操做則須要等待獲取鎖.
三、須要注意的細節:
1) 鎖的釋放, 要特別避免死鎖出現, 主要是特殊狀況下如何釋放鎖.
2) 等待獲取鎖的線程, 最好有超時機制.
3) 注意多服務器之間的時間是否同步.
4) 注意獲取鎖操做別佔用或建立太多的鏈接(即便及時關閉了鏈接), 很影響系統的性能.github
/** * 鎖的策略參考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基於redis分佈式鎖實現「秒殺」</a> * FIXME 此方式加鎖策略存在必定缺陷: 在setIfAbsent()以後expire()執行以前程序異常 鎖不會被釋放. 雖然出現概率極低 * * @param timeout timeout的時間範圍內輪詢鎖, 單位: 秒 * @param expire 設置鎖超時時間 * @return true, 獲取鎖成功; false, 獲取鎖失敗. */ public boolean lock(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); // 用nanos、mills具體看需求. timeout = TimeUnit.SECONDS.toNanos(timeout); try { // 在timeout的時間範圍內不斷輪詢鎖 while (System.nanoTime() - beginTime < timeout) { // 鎖不存在的話,設置鎖並設置鎖過時時間,即加鎖 if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) { this.redisClient.expire(key, expire, unit);//設置鎖失效時間, 防止永久阻塞 this.lock = true; return true; } // 短暫休眠後輪詢,避免可能的活鎖 System.out.println("get lock waiting..."); Thread.sleep(30, RANDOM.nextInt(30)); } } catch (Exception e) { throw new RuntimeException("locking error", e); } return false; }
以上鎖策略已經很完美, 1) 指定了獲取鎖的超時時間; 2) 設置了鎖的失效, 防止永久阻塞;
但可能有極端狀況, 即setIfAbsent()
成功, expire()
執行以前, 若是出現異常狀況, 致使expire()
沒有執行, 因此此時會出現永久阻塞. (道理是很難遇到這狀況)redis
/** * 特別注意: 若是多服務器之間存在時間差, 並不建議用System.nanoTime()、System.currentTimeMillis(). * 更好的是統一用redis-server的時間, 但只能獲取到milliseconds. * 鎖的策略參考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis實現分佈式鎖</a> * * @param timeout 獲取鎖超時, 單位: 毫秒 * @param expire 鎖失效時常, 單位: 毫秒 * @return true, 獲取鎖成功; false, 獲取鎖失敗. */ public boolean lockB(long timeout, long expire) { long bt = System.currentTimeMillis(); long lockVal; String lockExpireTime; try { while (!this.lock) { if(System.currentTimeMillis() - bt > timeout){ throw new RedisLockException("get lock timeout!"); } // 鎖的鍵值: {當前時間} + {失效時常} = {鎖失效時間} lockVal = getRedisTime() + expire; // 1. 嘗試獲取鎖 boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + ""); if (ifAbsent) { // 設置成功, 表示得到鎖 // 這種策略下, 是否設置key失效不過重要. 由於, 正常流程中最後會釋放鎖(del-key); 若是是異常狀況下未釋放鎖, 後面的代碼也會判斷鎖是否失效. // 設置的好處: 能減小redis的內存消耗, 及時清理無效的key(暫時只想到這) // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); this.lock = true; return true; } lockExpireTime = this.redisClient.opsForValue().get(this.key); long curTime = getRedisTime(); // curTime > expireVal: 表示此鎖已無效 /* 在鎖無效的前提下, 嘗試獲取鎖: (必定要用)getAndSet() * * 假設鎖已失效, 且未正常expire. 此時C一、C2同時執行到此, C2先執行getAndSet(key, time-02), C2獲取到鎖 * 此時C1.getAndSet(key, time-01)返回的是time-02, 顯然curTime > time-02: false. * 因此, C1並未獲取到鎖. 但C1修改了key的值爲: time-01. * 但由於C一、C2是同時執行到此, 因此time-0一、time-02的值近視相等. * (若多服務器存在時間差, 那這個差值有問題, 因此服務器時間若是不一樣步則不能用System.nanoTime()、System.currentTimeMillis(), 該用redis-server time.) */ if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { // getset必須在{curTime > expireVal} 判斷以後; 不然, 可能出現死循環 lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + ""); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { // this.redisClient.expire(key, timeout, TimeUnit.SECONDS); // 是否設置失效不重要, 理由同上. this.lock = true; return true; } } // 鎖被佔用, 短暫休眠等待輪詢 System.out.println(this + ": get lock waiting..."); Thread.sleep(40); } } catch (Exception e) { e.printStackTrace(); throw new RedisLockException("locking error", e); } System.out.println(this + ": get lock error."); return false; }
此種鎖策略特別要注意:
1) 若是多服務器之間時間不一樣步, 那麼能夠用redis-server的時間.
2) getset的調用必須在curTime > lockExpireTime
的前提下, 不然會出現死循環.
3) 併發時getset產生的偏差, 徹底可忽略.
4) 特別要注意redis鏈接的釋放, 不然很容易佔用過多的redis鏈接數.spring
public class RedisLock { private String key; private boolean lock = false; private final StringRedisTemplate redisClient; private final RedisConnection redisConnection; /** * @param purpose 鎖前綴 * @param key 鎖定的ID等東西 */ public RedisLock(String purpose, String key, StringRedisTemplate redisClient) { if (redisClient == null) { throw new IllegalArgumentException("redisClient 不能爲null!"); } this.key = purpose + "_" + key + "_redis_lock"; this.redisClient = redisClient; this.redisConnection = redisClient.getConnectionFactory().getConnection(); } /** * 鎖的策略參考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基於redis分佈式鎖實現「秒殺」</a> * FIXME 此方式加鎖策略存在必定缺陷: 在setIfAbsent()以後expire()執行以前程序異常 鎖不會被釋放. 雖然出現概率極低 * * @param timeout timeout的時間範圍內輪詢鎖, 單位: 秒 * @param expire 設置鎖超時時間 * @return true, 獲取鎖成功; false, 獲取鎖失敗. */ public boolean lockA(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); // 用nanos、mills具體看需求. timeout = unit.toNanos(timeout); try { // 在timeout的時間範圍內不斷輪詢鎖 while (System.nanoTime() - beginTime < timeout) { // 鎖不存在的話,設置鎖並設置鎖過時時間,即加鎖 if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) { this.redisClient.expire(key, expire, unit);//設置鎖失效時間, 防止永久阻塞 this.lock = true; return true; } // 短暫休眠後輪詢,避免可能的活鎖 System.out.println("get lock waiting..."); Thread.sleep(30); } } catch (Exception e) { throw new RedisLockException("locking error", e); } return false; } /** * 特別注意: 若是多服務器之間存在時間差, 並不建議用System.nanoTime()、System.currentTimeMillis(). * 更好的是統一用redis-server的時間, 但只能獲取到milliseconds. * 鎖的策略參考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis實現分佈式鎖</a> * * @param timeout 獲取鎖超時, 單位: 毫秒 * @param expire 鎖失效時常, 單位: 毫秒 * @return true, 獲取鎖成功; false, 獲取鎖失敗. */ public boolean lockB(long timeout, long expire) { long bt = System.currentTimeMillis(); long lockVal; String lockExpireTime; try { while (!this.lock) { if(System.currentTimeMillis() - bt > timeout){ throw new RedisLockException("get lock timeout!"); } // 鎖的鍵值: {當前時間} + {失效時常} = {鎖失效時間} lockVal = getRedisTime() + expire; // 1. 嘗試獲取鎖 boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + ""); if (ifAbsent) { // 設置成功, 表示得到鎖 this.lock = true; return true; } lockExpireTime = this.redisClient.opsForValue().get(this.key); long curTime = getRedisTime(); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + ""); if (curTime > NumberUtils.toLong(lockExpireTime, 0)) { this.lock = true; return true; } } // 鎖被佔用, 短暫休眠等待輪詢 System.out.println(this + ": get lock waiting..."); Thread.sleep(40); } } catch (Exception e) { e.printStackTrace(); throw new RedisLockException("locking error", e); } System.out.println(this + ": get lock error."); return false; } /** * @return current redis-server time in milliseconds. */ private long getRedisTime() { return this.redisConnection.time(); } private void closeConnection(){ if(!this.redisConnection.isClosed()){ this.redisConnection.close(); } } /** 釋放鎖 */ public void unlock() { if (this.lock) { redisClient.delete(key); } } public boolean isLock() { return lock; } }
@Target(ElementType.PARAMETER) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLockedKey { /** * 複雜對象中須要加鎖的成員變量 */ String field() default ""; }
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisDistributedLock { /** 鎖key的前綴 */ String lockedPrefix() default ""; /** 輪詢鎖的時間超時時常, 單位: ms */ long timeout() default 2000; /** redis-key失效時常, 單位: ms */ int expireTime() default 1000; }
@Component @Aspect public class RedisDistributedLockAop { @Autowired private StringRedisTemplate redisTemplate; /** * 定義緩存邏輯 */ @Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)") public void cache(ProceedingJoinPoint pjp) { Method method = getMethod(pjp); RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class); String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs()); RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate); // boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime()); boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS); if (isLock) { try { pjp.proceed(); return; } catch (Throwable e) { e.printStackTrace(); } finally { redisLock.unlock(); } } System.out.println("執行方法失敗"); } /** * 獲取被攔截的方法對象 */ private Method getMethod(ProceedingJoinPoint pjp) { Object[] args = pjp.getArgs(); Class[] argTypes = new Class[pjp.getArgs().length]; for (int i = 0; i < args.length; i++) { argTypes[i] = args[i].getClass(); } Method method = null; try { method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes); } catch (NoSuchMethodException | SecurityException e) { e.printStackTrace(); } return method; } private String getRedisKey(Annotation[][] annotations, Object[] args){ if (null == args || args.length == 0) { throw new RedisLockException("方法參數爲空,沒有被鎖定的對象"); } if (null == annotations || annotations.length == 0) { throw new RedisLockException("沒有被註解的參數"); } // 只支持第一個註解爲RedisLockedKey的參數 for (int i = 0; i < annotations.length; i++) { for (int j = 0; j < annotations[i].length; j++) { if (annotations[i][j] instanceof RedisLockedKey) { //註解爲LockedComplexObject RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j]; String field = redisLockedKey.field(); try { // field存在, 表示取參數對象的相應field; if(StringUtils.isBlank(field)){ return args[i].toString(); }else { return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString(); } } catch (NoSuchFieldException | SecurityException e) { e.printStackTrace(); throw new RedisLockException("註解對象中不存在屬性: " + redisLockedKey.field()); } } } } throw new RedisLockException("未找到註解對象!"); } }
public class RedisLockException extends RuntimeException{ public RedisLockException(String msg, Throwable throwable) { super(msg, throwable); } public RedisLockException(String msg) { super(msg); } }
#### 視狀況調整 # 部分redis配置 spring.redis.database=0 spring.redis.host=127.0.0.1 # spring.redis.password= spring.redis.port=6379 # 鏈接池最大鏈接數(使用負值表示沒有限制) spring.redis.pool.max-active=1 spring.redis.pool.max-wait=-1 # 鏈接池中的最大空閒鏈接 spring.redis.pool.max-idle=4 # 鏈接池中的最小空閒鏈接 spring.redis.pool.min-idle=0 spring.redis.timeout=2000
@SpringBootApplication @EnableCaching public class DistributedLockApplication implements CommandLineRunner{ @Autowired StringRedisTemplate redisTemplate; @Autowired LockService lockService; @Autowired ThreadPoolTaskExecutor executor; @Bean public ThreadPoolTaskExecutor myExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心線程數 executor.setCorePoolSize(8); // 最大線程數 executor.setMaxPoolSize(12); // 運行線程滿時,等待隊列的大小 executor.setQueueCapacity(1000); executor.setThreadNamePrefix("vl-thread-"); // 池和隊列滿的策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 空閒線程清除時間 executor.setKeepAliveSeconds(60); // 是否容許釋放核心線程 executor.setAllowCoreThreadTimeOut(true); executor.initialize(); return executor; } public static void main(String[] args) { SpringApplication application = new SpringApplication(DistributedLockApplication.class); application.setAdditionalProfiles("redis"); application.run(args); } @Override public void run(String... args) throws Exception { System.out.println("run...."); for (int i = 0; i < 2; i++) { executor.execute(new Runnable() { @Override public void run() { // lockService.lockMethod(new LockBean(1L)); lockService.lockMethod("arg1", 1L); } }); } System.out.println(executor.getThreadPoolExecutor().getTaskCount()); } }
public interface LockService { public void lockMethod(String arg1,Long arg2); public void lockMethod(LockBean lockBean); }
@Service public class LockServiceImpl implements LockService { public static Map<Long, Integer> goods; static{ goods = new HashMap<>(); goods.put(1L, 100); goods.put(2L, 200); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(String arg1, @RedisLockedKey Long arg2) { //最簡單的秒殺,這裏僅做爲demo示例 System.out.println("lockMethod, goods: " + reduceInventory(arg2)); } @Override @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void lockMethod(@RedisLockedKey(field = "idic")LockBean lockBean) { System.out.println("lockMethod bean, goods: " + reduceInventory(lockBean.getIdic())); } // 模擬秒殺操做,姑且認爲一個秒殺就是將庫存減一 private Integer reduceInventory(Long commodityId){ goods.put(commodityId, goods.get(commodityId) - 1); return goods.get(commodityId); } }
public class LockBean { private Long idic; public LockBean(){} public LockBean(long idic) { this.idic = idic; } public Long getIdic() { return idic; } public void setIdic(Long idic) { this.idic = idic; } }
以上只是簡單實現代碼, 若是用於實際項目中, 以上代碼存在不少性能問題, 具體性能問題:
1) 太頻繁的獲取redis鏈接、關閉鏈接.
lockA
: 每次while一定有一次setIfAbsent
, 可能會有expire
, 而後釋放鎖有delete
. 因此一次正常的流程就須要3個鏈接. 若是是併發同時競爭等待獲取鎖, 那麼性能影響也蠻大的.
lockB
: 這種策略要用到的鏈接更多, 而且若是是this.redisClient.getConnectionFactory().getConnection().time()
還要注意要手動釋放這個鏈接.
針對此問題, (我的)想到的可能的代碼改進方案, 每一個RedisLock
中用一個redisConnection
, 把全部的StringRedisTemplate
命令換成更底層的redisConnection
命令:數據庫
public class RedisLock { private String key; private boolean lock = false; private final RedisConnection redisConnection; public RedisLock(String purpose, String key, RedisConnection redisConnection) { if (redisConnection == null) { throw new IllegalArgumentException("redisConnection 不能爲null!"); } this.key = purpose + "_" + key + "_redis_lock"; this.redisConnection = redisConnection; } public boolean lockAc(long timeout, long expire, final TimeUnit unit) { long beginTime = System.nanoTime(); timeout = unit.toNanos(timeout); try { while (System.nanoTime() - beginTime < timeout) { if (this.redisConnection.setNX(this.key.getBytes(), "1".getBytes())) { this.redisConnection.expire(key.getBytes(), unit.toSeconds(expire)); this.lock = true; return true; } System.out.println("lockAc get lock waiting..."); Thread.sleep(30); } } catch (Exception e) { throw new RedisLockException("locking error", e); } return false; } private long getRedisTime() { return this.redisConnection.time(); } private void closeConnection(){ if(!this.redisConnection.isClosed()){ this.redisConnection.close(); } } public void unlock() { if (this.lock) { this.redisConnection.delete(key); } closeConnection(); // 用完必定要關閉, 這個位置不必定好, 可能在Aop調用unlock的finally處更好 } public boolean isLock() { return lock; } }
以上改進代碼依然可能存在的問題:
1) 鏈接極可能沒有正常關閉.
2) 鏈接依然過多, 假設併發有1000個, 那同樣會產生1000個鏈接, 且這些鏈接只會在競爭獲取鎖完後纔會釋放.(且產生了1000個RedisLock對象)
3) 是否能夠緩存註解對象?緩存
針對問題2)
, 主要想達到怎麼儘量減小redis鏈接?
好比: 有1000個併發, 其中200個是兌換商品A, 其中300個是兌換商品B, 其中500個是兌換商品C.springboot
一、是否能夠用單例模式
來實現RedisLock
?
對單例
、多線程
仍是很混亂, 很差說. 但若是可行, 會否太影響獲取鎖的性能?
好比兌換商品A的200個併發共用一個redisConnection, 感受仍是合理的, 畢竟互相之間是競爭關係.
但商品A、商品B、商品C若是也共用一個redisConnection, 是否是徹底不合理?
他們之間根本是"並行"的, 相互之間沒有一點聯繫.
二、因此, 是否更進一步的實現是: 同一個鎖競爭用相同的RedisLock
對象和RedisConnection
鏈接.
即競爭商品A的200個併發用同一個"redisConnection_A"、"redisLock_A", 商品B的300個併發用同一個"redisConnection_B"、"redisLock_B"?服務器
針對問題3)
, 在代碼RedisDistributedLockAop
中, 每次都會:
1) getMethod(pjp)
: 獲取攔截方法.
2) 經過攔截方法解析出getRedisKey
.
是否是能夠這麼實現, 相同的攔截方法只有第一次須要經過反射獲取. 以後直接從緩存(如map)中獲取到method
, 且由於同一個方法, 所能取field
也是同樣的.
好比, 有一下幾個方法都須要用到分佈式併發鎖:
@RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void a(String arg1, @RedisLockedKey Long arg2) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void b(@RedisLockedKey(field = "idic")LockBean lockBean) { // ... } @RedisDistributedLock(lockedPrefix="TEST_PREFIX") public void c(@RedisLockedKey(field = "xx")LockBean lockBean) { // ... }
// key: 完整方法名, 要惟一正確找到; value: 緩存的method Map<String, Method> methodCache = new HashMap<>; methodCache.put("com.service.aa.a()", method); methodCache.put("com.service.aa.b()", method); methodCache.put("com.service.bb.b()", method); // 而後, 同一個方法的註解內容相同, 因此徹底能夠直接調用, 省略RedisLockedKey的邏輯判斷 if(StringUtils.isBlank(field)){ return args[i].toString(); }else { return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString(); }
以上只是本身的構想, 這些構想的可行性, 代碼的具體實現還很難說...
(2017-12-04) 有空分析看下源碼redisson的實現思路, 對比下本身的不足之處.
多個線程之間不能共享鏈接, 參考: REDIS實踐之請勿踩多進程共用一個實例鏈接的坑