「分佈式鎖」是用來解決分佈式應用中「併發衝突」的一種經常使用手段,實現方式通常有基於zookeeper及基於redis二種。具體到業務場景中,咱們要考慮二種狀況:html
1、搶不到鎖的請求,容許丟棄(即:忽略)java
好比:一些不是很重要的場景,好比「監控數據持續上報」,某一篇文章的「已讀/未讀」標識位更新,對於同一個id,若是併發的請求同時到達,只要有一個請求處理成功,就算成功。redis
用活動圖表示以下:spring
2、併發請求,不論哪一條都必需要處理的場景(即:不容許丟數據)併發
好比:一個訂單,客戶正在前臺修改地址,管理員在後臺同時修改備註。地址和備註字段的修改,都必須正確更新,這二個請求同時到達的話,若是不借助db的事務,很容易形成行鎖競爭,但用事務的話,db的性能顯然比不上redis輕量。app
解決思路:A,B二個請求,誰先搶到分佈式鎖(假設A先搶到鎖),誰先處理,搶不到的那個(即:B),在一旁不停等待重試,重試期間一旦發現獲取鎖成功,即表示A已經處理完,把鎖釋放了。這時B就能夠繼續處理了。less
但有二點要注意:dom
a、須要設置等待重試的最長時間,不然若是A處理過程當中有bug,一直卡死,或者未能正確釋放鎖,B就一直會等待重試,可是又永遠拿不到鎖。分佈式
b、等待最長時間,必須小於鎖的過時時間。不然,假設鎖2秒過時自動釋放,可是A還沒處理完(即:A的處理時間大於2秒),這時鎖會由於redis key過時「提早」誤釋放,B重試時拿到鎖,形成A,B同時處理。(注:可能有同窗會說,不設置鎖的過時時間,不就完了麼?理論上講,確實能夠這麼作,可是若是業務代碼有bug,致使處理完後沒有unlock,或者根本忘記了unlock,分佈式鎖就會一直沒法釋放。因此綜合考慮,給分佈式鎖加一個「保底」的過時時間,讓其始終有機會自動釋放,更爲靠譜)spring-boot
用活動圖表示以下:
寫了一個簡單的工具類:
package com.cnblogs.yjmyzz.redisdistributionlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.util.StringUtils; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 利用redis獲取分佈式鎖 * * @author 菩提樹下的楊過 * @blog http://yjmyzz.cnblogs.com/ */ public class RedisLock { private StringRedisTemplate redisTemplate; private Logger logger = LoggerFactory.getLogger(this.getClass()); /** * simple lock嘗試獲取鍋的次數 */ private int retryCount = 3; /** * 每次嘗試獲取鎖的重試間隔毫秒數 */ private int waitIntervalInMS = 100; public RedisLock(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 利用redis獲取分佈式鎖(未獲取鎖的請求,容許丟棄!) * * @param redisKey 鎖的key值 * @param expireInSecond 鎖的自動釋放時間(秒) * @return * @throws DistributionLockException */ public String simpleLock(final String redisKey, final int expireInSecond) throws DistributionLockException { String lockValue = UUID.randomUUID().toString(); boolean flag = false; if (StringUtils.isEmpty(redisKey)) { throw new DistributionLockException("key is empty!"); } if (expireInSecond <= 0) { throw new DistributionLockException("expireInSecond must be bigger than 0"); } try { for (int i = 0; i < retryCount; i++) { boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS); if (success) { flag = true; break; } try { TimeUnit.MILLISECONDS.sleep(waitIntervalInMS); } catch (Exception ignore) { logger.warn("redis lock fail: " + ignore.getMessage()); } } if (!flag) { throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ..."); } return lockValue; } catch (DistributionLockException be) { throw be; } catch (Exception e) { logger.warn("get redis lock error, exception: " + e.getMessage()); throw e; } } /** * 利用redis獲取分佈式鎖(未獲取鎖的請求,將在timeoutSecond時間範圍內,一直等待重試) * * @param redisKey 鎖的key值 * @param expireInSecond 鎖的自動釋放時間(秒) * @param timeoutSecond 未獲取到鎖的請求,嘗試重試的最久等待時間(秒) * @return * @throws DistributionLockException */ public String lock(final String redisKey, final int expireInSecond, final int timeoutSecond) throws DistributionLockException { String lockValue = UUID.randomUUID().toString(); boolean flag = false; if (StringUtils.isEmpty(redisKey)) { throw new DistributionLockException("key is empty!"); } if (expireInSecond <= 0) { throw new DistributionLockException("expireInSecond must be greater than 0"); } if (timeoutSecond <= 0) { throw new DistributionLockException("timeoutSecond must be greater than 0"); } if (timeoutSecond >= expireInSecond) { throw new DistributionLockException("timeoutSecond must be less than expireInSecond"); } try { long timeoutAt = System.currentTimeMillis() + timeoutSecond * 1000; while (true) { boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, lockValue, expireInSecond, TimeUnit.SECONDS); if (success) { flag = true; break; } if (System.currentTimeMillis() >= timeoutAt) { break; } try { TimeUnit.MILLISECONDS.sleep(waitIntervalInMS); } catch (Exception ignore) { logger.warn("redis lock fail: " + ignore.getMessage()); } } if (!flag) { throw new DistributionLockException(Thread.currentThread().getName() + " cannot acquire lock now ..."); } return lockValue; } catch (DistributionLockException be) { throw be; } catch (Exception e) { logger.warn("get redis lock error, exception: " + e.getMessage()); throw e; } } /** * 鎖釋放 * * @param redisKey * @param lockValue */ public void unlock(final String redisKey, final String lockValue) { if (StringUtils.isEmpty(redisKey)) { return; } if (StringUtils.isEmpty(lockValue)) { return; } try { String currLockVal = redisTemplate.opsForValue().get(redisKey); if (currLockVal != null && currLockVal.equals(lockValue)) { boolean result = redisTemplate.delete(redisKey); if (!result) { logger.warn(Thread.currentThread().getName() + " unlock redis lock fail"); } else { logger.info(Thread.currentThread().getName() + " unlock redis lock:" + redisKey + " successfully!"); } } } catch (Exception je) { logger.warn(Thread.currentThread().getName() + " unlock redis lock error:" + je.getMessage()); } } }
而後寫個spring-boot來測試一下:
package com.cnblogs.yjmyzz.redisdistributionlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootApplication public class RedisDistributionLockApplication { private static Logger logger = LoggerFactory.getLogger(RedisDistributionLockApplication.class); public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext applicationContext = SpringApplication.run(RedisDistributionLockApplication.class, args); //初始化 StringRedisTemplate redisTemplate = applicationContext.getBean(StringRedisTemplate.class); RedisLock redisLock = new RedisLock(redisTemplate); String lockKey = "lock:test"; CountDownLatch start = new CountDownLatch(1); CountDownLatch threadsLatch = new CountDownLatch(2); final int lockExpireSecond = 5; final int timeoutSecond = 3; Runnable lockRunnable = () -> { String lockValue = ""; try { //等待發令槍響,防止線程搶跑 start.await(); //容許丟數據的簡單鎖示例 lockValue = redisLock.simpleLock(lockKey, lockExpireSecond); //不容許丟數據的分佈式鎖示例 //lockValue = redisLock.lock(lockKey, lockExpireSecond, timeoutSecond); //停一下子,故意讓後面的線程搶不到鎖 TimeUnit.SECONDS.sleep(2); logger.info(String.format("%s get lock successfully, value:%s", Thread.currentThread().getName(), lockValue)); } catch (Exception e) { e.printStackTrace(); } finally { redisLock.unlock(lockKey, lockValue); //執行完後,計數減1 threadsLatch.countDown(); } }; Thread t1 = new Thread(lockRunnable, "T1"); Thread t2 = new Thread(lockRunnable, "T2"); t1.start(); t2.start(); //預備:開始! start.countDown(); //等待全部線程跑完 threadsLatch.await(); logger.info("======>done!!!"); } }
用2個線程模擬併發場景,跑起來後,輸出以下:
能夠看到T2線程沒搶到鎖,直接拋出了預期的異常。
把44行的註釋打開,即:換成不容許丟數據的模式,再跑一下:
能夠看到,T1先搶到鎖,而後通過2秒的處理後,鎖釋放,這時T2重試拿到了鎖,繼續處理,最終釋放。