隨着業務愈來愈複雜,應用服務都會朝着分佈式、集羣方向部署,而分佈式CAP原則告訴咱們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。html
不少場景中,須要使用分佈式事務、分佈式鎖等技術來保證數據最終一致性。有的時候,咱們須要保證某一方法同一時刻只能被一個線程執行。
在單機(單進程)環境中,JAVA提供了不少併發相關API,但在多機(多進程)環境中就無能爲力了。java
對於分佈式鎖,最好可以知足如下幾點git
能夠保證在分佈式部署的應用集羣中,同一個方法在同一時間只能被一臺機器上的一個線程執行
這把鎖要是一把可重入鎖(避免死鎖)
這把鎖最好是一把阻塞鎖
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好
針對分佈式鎖,目前有如下幾種實現方案(From: http://www.hollischuang.com/a...)github
基於數據庫實現分佈式鎖
基於緩存實現分佈式鎖
基於zookeeper實現分佈式鎖
對於第一種(基於數據庫)及第三種(基於zookeeper)的實現方式能夠參考博文http://www.hollischuang.com/a...,本篇文章介紹如何基於redis實現分佈式鎖redis
首先奉上源碼 https://github.com/manerfan/m...spring
鎖的實現主要基於redis的SETNX
命令(SETNX詳細解釋參考這裏),咱們來看SETNX
的解釋數據庫
SETNX key value
將 key 的值設爲 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不作任何動做。
SETNX 是『SET if Not eXists』(若是不存在,則 SET)的簡寫。返回值:
設置成功,返回 1 。
設置失敗,返回 0 。api
使用SETNX
完成同步鎖的流程及事項以下:緩存
SETNX
命令獲取鎖,若返回0(key已存在,鎖已存在)則獲取失敗,反之獲取成功SETNX
命令老是返回0而進入死鎖狀態,須要爲該key設置一個「合理」的過時時間DEL
命令將鎖數據刪除/** * 同步鎖 * * @property key Redis key * @property stringRedisTemplate RedisTemplate * @property expire Redis TTL/秒 * @property safetyTime 安全時間/秒 */ class SyncLock( private val key: String, private val stringRedisTemplate: StringRedisTemplate, private val expire: Long, private val safetyTime: Long )
key
reids中的key,對應java api synchronized的對象expire
reids中key的過時時間safetyTime
下文介紹其做用安全
private val value: String get() = Thread.currentThread().name /** * 嘗試獲取鎖(當即返回) * * @return 是否獲取成功 */ fun tryLock(): Boolean { val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false if (locked) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) } return locked }
這裏使用setIfAbsent
函數(對應SETNX
命令)嘗試設置key的值爲value(當前線程id+線程名),若成功則同時設置key的過時時間並返回true,不然返回false
private val waitMillisPer: Long = 10 /** * 嘗試獲取鎖,並至多等待timeout時長 * * @param timeout 超時時長 * @param unit 時間單位 * * @return 是否獲取成功 */ fun tryLock(timeout: Long, unit: TimeUnit): Boolean { val waitMax = unit.toMillis(timeout) var waitAlready: Long = 0 while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer } if (waitAlready < waitMax) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) return true } return false }
這裏使用while循環不斷嘗試鎖的獲取,並至多嘗試timeout時長,在timeout時間內若成功則同時設置key的過時時間並返回true,不然返回false
其實以上兩種tryLock
函數仍是有一種可能即是,在調用setIfAbsent
後、調用expire
以前若服務出現異常,也將致使該鎖(key)沒法釋放(過時或刪除),使得其餘線程/進程再沒法獲取鎖而進入死循環,爲了不此問題的產生,咱們引入了safetyTime
該參數的做用爲,從獲取鎖開始直到safetyTime時長,若仍未獲取成功則認爲某一線程/進程出現異常致使數據不正確,此時強制獲取,其實現以下
/** * 獲取鎖 */ fun lock() { val waitMax = TimeUnit.SECONDS.toMillis(safetyTime) var waitAlready: Long = 0 while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer } // stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS) }
這裏一樣使用while循環不斷嘗試鎖的獲取,但至多等待safetyTime時長,最終不管是否成功,均使用SETEX
命令將key設置爲當前先線程對應的value,並同時設置該key的過時時間
/** * 釋放鎖 */ fun unLock() { stringRedisTemplate.opsForValue()[key]?.let { if (it == value) { stringRedisTemplate.delete(key) } } }
鎖的釋放使用DEL
命令刪除key,但須要注意的是,釋放鎖時只能釋放本線程持有的鎖
若expire設置不合理,如expire設置爲10秒,結果在獲取鎖後線程運行了20秒,該鎖有可能已經被其餘線程強制獲取,即該key表明的鎖已經不是當前線程所持有的鎖,此時便不能冒然刪除該key,而只能釋放本線程持有的鎖。
爲了更好的與spring集成,咱們建立一個工廠類來輔助建立同步鎖實例
/** * SyncLock同步鎖工廠類 */ @Component class SyncLockFactory { @Autowired private lateinit var stringRedisTemplate: StringRedisTemplate private val syncLockMap = mutableMapOf<String, SyncLock>() /** * 建立SyncLock * * @param key Redis key * @param expire Redis TTL/秒,默認10秒 * @param safetyTime 安全時間/秒,爲了防止程序異常致使死鎖,在此時間後強制拿鎖,默認 expire * 5 秒 */ @Synchronized fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock { if (!syncLockMap.containsKey(key)) { syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime) } return syncLockMap[key]!! } }
在spring框架下能夠更方便的使用
@Component class SomeLogic: InitializingBean { @Autowired lateinit var syncLockFactory: SyncLockFactory lateinit var syncLock override fun afterPropertiesSet() { syncLock = syncLockFactory.build("lock:some:name", 10) } fun someFun() { syncLock.lock() try { // some logic } finally { syncLock.unlock() } } }
藉助spring aop框架,咱們能夠將SyncLock的使用進一步簡化
/** * 同步鎖註解 * * @property key Redis key * @property expire Redis TTL/秒,默認10秒 */ @Target(AnnotationTarget.FUNCTION) @Retention(AnnotationRetention.RUNTIME) annotation class SyncLockable( val key: String, val expire: Long = 10 )
/** * 同步鎖註解處理 */ @Aspect @Component class SyncLockHandle { @Autowired private lateinit var syncLockFactory: SyncLockFactory /** * 在方法上執行同步鎖 */ @Around("@annotation(syncLockable)") fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? { val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire) try { lock.lock() return jp.proceed() } finally { lock.unLock() } } }
如此一來,咱們即可以按照以下方式使用SyncLock
@Component class SomeLogic { @SyncLockable("lock:some:name", 10) fun someFun() { // some logic } }
是否是顯得更加方便!