分佈式鎖實現(一):Redis

前言

單機環境下咱們能夠經過JAVA的Synchronized和Lock來實現進程內部的鎖,可是隨着分佈式應用和集羣環境的出現,系統資源的競爭從單進程多線程的競爭變成了多進程的競爭,這時候就須要分佈式鎖來保證。
實現分佈式鎖如今主流的方式大體有如下三種
1. 基於數據庫的索引和行鎖
2. 基於Redis的單線程原子操做:setNX
3. 基於Zookeeper的臨時有序節點

這篇文章咱們用Redis來實現,會基於現有的各類鎖實現來分析,最後分享Redission的鎖源碼分析來看下分佈式鎖的開源實現
複製代碼

設計實現

加鎖

1、 經過setNx和getSet來實現redis

這是如今網上大部分版本的實現方式,筆者以前項目裏面用到分佈式鎖也是經過這樣的方式實現數據庫

public boolean lock(Jedis jedis, String lockName, Integer expire) {

     //返回是否設置成功
     //setNx加鎖
     long now = System.currentTimeMillis();
     boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;

     if (!result) {
         //防止死鎖的容錯
         String timestamp = jedis.get(lockName);
         if (timestamp != null && Long.parseLong(timestamp) < now) {
             //不經過del方法來刪除鎖。而是經過同步的getSet
             String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
             if (oldValue != null && oldValue.equals(timestamp)) {
                 result = true;
                 jedis.expire(lockName, expire);
             }
         }
     }
     if (result) {
         jedis.expire(lockName, expire);
     }
     return result;
 }

複製代碼

代碼分析緩存

  1. 經過setNx命令老保證操做的原子性,獲取到鎖,而且把過時時間設置到value裏面安全

  2. 經過expire方法設置過時時間,若是設置過時時間失敗的話,再經過value的時間戳來和當前時間戳比較,防止出現死鎖bash

  3. 經過getSet命令在發現鎖過時未被釋放的狀況下,避免刪除了在這個過程當中有可能被其他的線程獲取到了鎖服務器

存在問題多線程

  1. 防止死鎖的解決方案是經過系統當前時間決定的,不過線上服務器系統時間通常來講都是一致的,這個不算是嚴重的問題
  2. 鎖過時的時候可能會有多個線程執行getSet命令,在競爭的狀況下,會修改value的時間戳,理論上來講會有偏差
  3. 鎖沒法具有客戶端標識,在解鎖的時候可能被其他的客戶端刪除同一個key
  4. 雖然有小問題,不過大致上來講這種分佈式鎖的實現方案基本上是符合要求的,可以作到鎖的互斥和避免死鎖

2、 經過Redis高版本的原子命令併發

jedis的set命令能夠自帶複雜參數,經過這些參數能夠實現原子的分佈式鎖命令異步

jedis.set(lockName, "", "NX", "PX", expireTime);
複製代碼

代碼分析分佈式

  1. redis的set命令能夠攜帶複雜參數,第一個是鎖的key,第二個是value,能夠存放獲取鎖的客戶端ID,經過這個校驗是否當前客戶端獲取到了鎖,第三個參數取值NX/XX,第四個參數 EX|PX,第五個就是時間

  2. NX:若是不存在就設置這個key XX:若是存在就設置這個key

  3. EX:單位爲秒,PX:單位爲毫秒

  4. 這個命令實質上就是把咱們以前的setNx和expire命令合併成一個原子操做命令,不須要咱們考慮set失敗或者expire失敗的狀況


解鎖

1、 經過Redis的del命令

public boolean unlock(Jedis jedis, String lockName) {
    	 jedis.del(lockName);
     	return true;
 }
複製代碼

代碼分析

經過redis的del命令能夠直接刪除鎖,可能會出現誤刪其餘線程已經存在的鎖的狀況

2、 Redis的del檢查

public static void unlock2(Jedis jedis, String lockKey, String requestId) {
       
   // 判斷加鎖與解鎖是否是同一個客戶端
   if (requestId.equals(jedis.get(lockKey))) {
       // 若在此時,這把鎖忽然不是這個客戶端的,則會誤解鎖
       jedis.del(lockKey);
   }

}
複製代碼

代碼分析

新增了requestId客戶端ID的判斷,但因爲不是原子操做,在多個進程下面的併發競爭狀況下,沒法保證安全

3、 Redis的LUA腳本

public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {

      String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
      Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));

      if (1L == (long) result) {
          return true;
      }
      return false;

  }
複製代碼

代碼分析

經過Lua腳原本保證操做的原子性,其實就是把以前的先判斷再刪除合併成一個原子性的腳本命令,邏輯就是,先經過get判斷value是否是相等,若相等就刪除,不然就直接return

Redission的分佈式鎖

Redission是redis官網推薦的一個redis客戶端,除了基於redis的基礎的CURD命令之外,重要的是就是Redission提供了方便好用的分佈式鎖API
複製代碼

1、 基本用法

RedissonClient redissonClient = RedissonTool.getInstance();

        RLock distribute_lock = redissonClient.getLock("distribute_lock");

        try {
            boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (distribute_lock.isLocked()) {
                distribute_lock.unlock();
            }
        }
複製代碼

代碼流程

  1. 經過redissonClient獲取RLock實例
  2. tryLock獲取嘗試獲取鎖,第一個是等待時間,第二個是鎖的超時時間,第三個是時間單位
  3. 執行完業務邏輯後,最終釋放鎖

2、 具體實現

咱們經過tryLock來分析redission分佈式的實現,lock方法跟tryLock差很少,只不過沒有最長等待時間的設置,會自旋循環等待鎖的釋放,直到獲取鎖爲止

long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        //獲取當前線程ID,用於實現可重入鎖
        final long threadId = Thread.currentThread().getId();
        //嘗試獲取鎖
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
        	//等待時間結束,返回獲取失敗
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //訂閱鎖的隊列,等待鎖被其他線程釋放後通知
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message,等待訂閱的隊列消息
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
複製代碼

代碼分析

  1. 首先tryAcquire嘗試獲取鎖,若返回ttl爲null,說明獲取到鎖了

  2. 判斷等待時間是否過時,若是過時,直接返回獲取鎖失敗

  3. 經過Redis的Channel訂閱監聽隊列,subscribe內部經過信號量semaphore,再經過await方法阻塞,內部實際上是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果,來保證訂閱成功,再判斷是否到了等待時間

  4. 再次嘗試申請鎖和等待時間的判斷,循環阻塞在這裏等待鎖釋放的消息RedissonLockEntry也維護了一個semaphore的信號量

  5. 不管是否釋放鎖,最終都要取消訂閱這個隊列消息

  6. redission內部的getEntryName是客戶端實例ID+鎖名稱來保證多個實例下的鎖可重入


tryAcquire獲取鎖

redisssion獲取鎖的核心代碼,內部實際上是異步調用,可是用get方法阻塞了

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
複製代碼
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
複製代碼
  1. tryLockInnerAsync方法內部是基於Lua腳原本獲取鎖的

    • 先判斷KEYS[1](鎖名稱)對應的key是否存在,不存在獲取到鎖,hset設置key的value,pexpire設置過時時間,返回null表示獲取到鎖
    • 存在的話,鎖被佔,hexists判斷是不是當前線程的鎖,如果的話,hincrby增長重入次數,從新設置過時時間,不是當前線程的鎖,返回當前鎖的過時時間
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    複製代碼
  2. Redission避免死鎖的解決方案:

    Redission爲了不鎖未被釋放,採用了一個特殊的解決方案,若未設置過時時間的話,redission默認的過時時間是30s,同時未避免鎖在業務未處理完成以前被提早釋放,Redisson在獲取到鎖且默認過時時間的時候,會在當前客戶端內部啓動一個定時任務,每隔internalLockLeaseTime/3的時間去刷新key的過時時間,這樣既避免了鎖提早釋放,同時若是客戶端宕機的話,這個鎖最多存活30s的時間就會自動釋放(刷新過時時間的定時任務進程也宕機)

    // lock acquired,獲取到鎖的時候設置按期更新時間的任務
                    if (ttlRemaining) {
                        scheduleExpirationRenewal(threadId);
                    }
                    
                    //expirationRenewalMap的併發安全MAP記錄設置過的緩存,避免併發狀況下重複設置任務,internalLockLeaseTime / 3的時間後從新設置過時時間
                       private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    複製代碼

    unlock解鎖

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
        }
    複製代碼

    Redission的unlock解鎖也是基於Lua腳本實現的,內部邏輯是先判斷鎖是否存在,不存在說明已經被釋放了,發佈鎖釋放消息後返回,鎖存在再判斷當前線程是否鎖擁有者,不是的話,無權釋放返回,解鎖的話,會減去重入的次數,從新更新過時時間,若重入數撿完,刪除當前key,發佈鎖釋放消息

    寫在後面

    主要基於Redis來設計和實現分佈式鎖,經過經常使用的設計思路引伸到Redission的實現,不管是設計思路仍是代碼健壯性Redission的設計都是優秀的,值得學習,下一步會講解關於Zookeeper的分佈式鎖實現和相關開源源碼分析。

相關文章
相關標籤/搜索