基於redis的分佈式鎖實現

基於Redis實現分佈式鎖

隨着業務愈來愈複雜,應用服務都會朝着分佈式、集羣方向部署,而分佈式CAP原則告訴咱們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。html

不少場景中,須要使用分佈式事務、分佈式鎖等技術來保證數據最終一致性。有的時候,咱們須要保證某一方法同一時刻只能被一個線程執行。
在單機(單進程)環境中,JAVA提供了不少併發相關API,但在多機(多進程)環境中就無能爲力了。java

對於分佈式鎖,最好可以知足如下幾點git

能夠保證在分佈式部署的應用集羣中,同一個方法在同一時間只能被一臺機器上的一個線程執行
這把鎖要是一把可重入鎖(避免死鎖)
這把鎖最好是一把阻塞鎖
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的性能要好

針對分佈式鎖,目前有如下幾種實現方案(From: http://www.hollischuang.com/a...github

基於數據庫實現分佈式鎖
基於緩存實現分佈式鎖
基於zookeeper實現分佈式鎖

對於第一種(基於數據庫)及第三種(基於zookeeper)的實現方式能夠參考博文http://www.hollischuang.com/a...,本篇文章介紹如何基於redis實現分佈式鎖redis

首先奉上源碼 https://github.com/manerfan/m...spring

分佈式同步鎖實現

實現思路

鎖的實現主要基於redis的SETNX命令(SETNX詳細解釋參考這裏),咱們來看SETNX的解釋數據庫

SETNX key value
將 key 的值設爲 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不作任何動做。
SETNX 是『SET if Not eXists』(若是不存在,則 SET)的簡寫。

返回值:
設置成功,返回 1 。
設置失敗,返回 0 。api

使用SETNX完成同步鎖的流程及事項以下:緩存

  1. 使用SETNX命令獲取鎖,若返回0(key已存在,鎖已存在)則獲取失敗,反之獲取成功
  2. 爲了防止獲取鎖後程序出現異常,致使其餘線程/進程調用SETNX命令老是返回0而進入死鎖狀態,須要爲該key設置一個「合理」的過時時間
  3. 釋放鎖,使用DEL命令將鎖數據刪除

實現過程

建立同步鎖實現類

/**
 * 同步鎖
 *
 * @property key Redis key
 * @property stringRedisTemplate RedisTemplate
 * @property expire Redis TTL/秒
 * @property safetyTime 安全時間/秒
 */
class SyncLock(
        private val key: String,
        private val stringRedisTemplate: StringRedisTemplate,
        private val expire: Long,
        private val safetyTime: Long
)

key reids中的key,對應java api synchronized的對象
expire reids中key的過時時間
safetyTime 下文介紹其做用安全

實現鎖的獲取功能

private val value: String get() = Thread.currentThread().name

/**
 * 嘗試獲取鎖(當即返回)
 *
 * @return 是否獲取成功
 */
fun tryLock(): Boolean {
    val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false
    if (locked) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    }
    return locked
}

這裏使用setIfAbsent函數(對應SETNX命令)嘗試設置key的值爲value(當前線程id+線程名),若成功則同時設置key的過時時間並返回true,不然返回false

實現帶超時時間的鎖獲取功能

private val waitMillisPer: Long = 10

/**
 * 嘗試獲取鎖,並至多等待timeout時長
 *
 * @param timeout 超時時長
 * @param unit 時間單位
 *
 * @return 是否獲取成功
 */
fun tryLock(timeout: Long, unit: TimeUnit): Boolean {
    val waitMax = unit.toMillis(timeout)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    if (waitAlready < waitMax) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
        return true
    }
    return false
}

這裏使用while循環不斷嘗試鎖的獲取,並至多嘗試timeout時長,在timeout時間內若成功則同時設置key的過時時間並返回true,不然返回false

其實以上兩種tryLock函數仍是有一種可能即是,在調用setIfAbsent後、調用expire以前若服務出現異常,也將致使該鎖(key)沒法釋放(過時或刪除),使得其餘線程/進程再沒法獲取鎖而進入死循環,爲了不此問題的產生,咱們引入了safetyTime
該參數的做用爲,從獲取鎖開始直到safetyTime時長,若仍未獲取成功則認爲某一線程/進程出現異常致使數據不正確,此時強制獲取,其實現以下

實現帶保護功能的鎖獲取功能

/**
 * 獲取鎖
 */
fun lock() {
    val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    // stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)
}

這裏一樣使用while循環不斷嘗試鎖的獲取,但至多等待safetyTime時長,最終不管是否成功,均使用SETEX 命令將key設置爲當前先線程對應的value,並同時設置該key的過時時間

實現鎖的釋放功能

/**
 * 釋放鎖
 */
fun unLock() {
    stringRedisTemplate.opsForValue()[key]?.let {
        if (it == value) {
            stringRedisTemplate.delete(key)
        }
    }
}

鎖的釋放使用DEL命令刪除key,但須要注意的是,釋放鎖時只能釋放本線程持有的鎖
若expire設置不合理,如expire設置爲10秒,結果在獲取鎖後線程運行了20秒,該鎖有可能已經被其餘線程強制獲取,即該key表明的鎖已經不是當前線程所持有的鎖,此時便不能冒然刪除該key,而只能釋放本線程持有的鎖。


集成spring boot

爲了更好的與spring集成,咱們建立一個工廠類來輔助建立同步鎖實例

/**
 * SyncLock同步鎖工廠類
 */
@Component
class SyncLockFactory {
    @Autowired
    private lateinit var stringRedisTemplate: StringRedisTemplate

    private val syncLockMap = mutableMapOf<String, SyncLock>()

    /**
     * 建立SyncLock
     *
     * @param key Redis key
     * @param expire Redis TTL/秒,默認10秒
     * @param safetyTime 安全時間/秒,爲了防止程序異常致使死鎖,在此時間後強制拿鎖,默認 expire * 5 秒
     */
    @Synchronized
    fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock {
        if (!syncLockMap.containsKey(key)) {
            syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime)
        }
        return syncLockMap[key]!!
    }
}

在spring框架下能夠更方便的使用

@Component
class SomeLogic: InitializingBean {
  @Autowired
  lateinit var syncLockFactory: SyncLockFactory
  
  lateinit var syncLock

  override fun afterPropertiesSet() {
    syncLock = syncLockFactory.build("lock:some:name", 10)
  }

  fun someFun() {
    syncLock.lock()
    try {
      // some logic
    } finally {
      syncLock.unlock()
    }
  }
}

註解的實現

藉助spring aop框架,咱們能夠將SyncLock的使用進一步簡化

建立註解類

/**
 * 同步鎖註解
 *
 * @property key Redis key
 * @property expire Redis TTL/秒,默認10秒
 */
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class SyncLockable(
        val key: String,
        val expire: Long = 10
)

實現AOP

/**
 * 同步鎖註解處理
 */
@Aspect
@Component
class SyncLockHandle {
    @Autowired
    private lateinit var syncLockFactory: SyncLockFactory

    /**
     * 在方法上執行同步鎖
     */
    @Around("@annotation(syncLockable)")
    fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? {
        val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)

        try {
            lock.lock()
            return jp.proceed()
        } finally {
            lock.unLock()
        }
    }
}

如此一來,咱們即可以按照以下方式使用SyncLock

@Component
class SomeLogic {
    @SyncLockable("lock:some:name", 10)
    fun someFun() {
        // some logic
    }
}

是否是顯得更加方便!


訂閱號

相關文章
相關標籤/搜索