聊聊redisson的分佈式鎖

本文主要研究一下redisson的分佈式鎖html

maven

<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.8.1</version>
		</dependency>
複製代碼

實例

@Test
    public void testDistributedLock(){
        Config config = new Config();
//        config.setTransportMode(TransportMode.EPOLL);
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);


        IntStream.rangeClosed(1,5)
                .parallel()
                .forEach(i -> {
                    executeLock(redisson);
                });

        executeLock(redisson);
    }

    public void executeLock(RedissonClient redisson){
        RLock lock = redisson.getLock("myLock");
        boolean locked = false;
        try{
            LOGGER.info("try lock");
            locked = lock.tryLock();
//            locked = lock.tryLock(1,2,TimeUnit.MINUTES);
            LOGGER.info("get lock result:{}",locked);
            if(locked){
                TimeUnit.HOURS.sleep(1);
                LOGGER.info("get lock and finish");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            LOGGER.info("enter unlock");
            if(locked){
                lock.unlock();
            }
        }
    }
複製代碼

源碼解析

RedissonLock.tryLock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javajava

@Override
    public boolean tryLock() {
        return get(tryLockAsync());
    }

    @Override
    public RFuture<Boolean> tryLockAsync() {
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
複製代碼
  • 這裏leaseTime沒有設置的話,默認是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默認爲30秒
  • tryLockInnerAsync使用的是一段lua腳本,該腳本有3個參數,第一個參數爲KEYS數組,後面幾個參數爲ARGV數組的元素
  • 這裏key的值爲調用方指定的這個redissonLock的名稱,兩個變量,第一個爲leaseTime,第二個爲鎖的名稱,使用redissonLock的id+線程id
  • lua腳本第一個方法判斷redissonLock的hashmap是否存在,若是不存在則建立,該hashmap有一個entry的key爲鎖名稱,valude爲1,以後設置該hashmap失效時間爲leaseTime
  • lua腳本第二個方法是在redissonLock的hashmap存在的狀況下,將該鎖名的value增1,同時設置失效時間爲leaseTime
  • 最後返回該redissonLock名稱的key的ttl
  • 執行成功以後判斷ttl是否還有值,有的話則調用scheduleExpirationRenewal,防止lock未執行完就失效
  • scheduleExpirationRenewal是註冊一個延時任務,在internalLockLeaseTime / 3的時候觸發,執行的方法是renewExpirationAsync,將該鎖失效時間重置回internalLockLeaseTime
  • scheduleExpirationRenewal裏頭給scheduleExpirationRenewal任務增長listener,若是設置成功以後還會再次遞歸調用scheduleExpirationRenewal從新註冊延時任務
  • tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自動解鎖時間時調用的方法,它與tryAcquireOnceAsync的區別在於,它對ttl的方回值採用long值來判斷,若是是null,才執行延長失效時間的定時任務,而tryAcquireOnceAsync方法採用的是BooleanNullReplayConvertor,只要返回不是null,則返回true

RedissonLock.unlock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.javanode

@Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

    @Override
    public RFuture<Void> unlockAsync(final long threadId) {
        final RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = expirationRenewalMap.get(getEntryName());
        if (task != null && (threadId == null || task.getThreadId() == threadId)) {
            expirationRenewalMap.remove(getEntryName());
            task.getTimeout().cancel();
        }
    }
複製代碼
  • unlockInnerAsync經過lua腳原本釋放鎖,該lua使用兩個key,一個是redissonLock名稱,一個是channelName
  • 該lua使用的變量有三個,一個是pubSub的unlockMessage,默認爲0,一個是internalLockLeaseTime,默認爲commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一個是鎖名稱
  • 若是該redissonLock不存在,則直接發佈unlock消息返回1;若是該鎖不存在則返回nil;
  • 若是該鎖存在則將其計數-1,若是counter大於0,則重置下失效時間,返回0;若是counter不大於0,則刪除該redissonLock鎖,發佈unlockMessage,返回1;若是上面條件都沒有命中返回nil
  • unlockAsync裏頭對unlockInnerAsync註冊了FutureListener,主要是調用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任務

LockPubSub

redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.javaredis

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        }
    }

}
複製代碼
  • 接收到unlockMessage的時候,會調用RedissonLockEntry的listener,而後觸發latch的release
  • tryAcquireOnceAsync這個方法默認沒有建立LockPubSub,並且沒有指定自動解鎖時間,則定時任務會一直延長失效時間,這個可能存在鎖一直沒釋放的風險

小結

加鎖有以下注意事項:數組

  • 加鎖須要設置超時時間,防止出現死鎖
  • 加鎖以及設置超時時間的時候,須要保證兩個操做的原子性,於是最好使用lua腳本或者使用支持NX以及EX的set方法
  • 加鎖的時候須要把加鎖的調用方信息,好比線程id給記錄下來,這個在解鎖的時候須要使用
  • 對於加鎖時長不肯定的任務,爲防止任務未執行完致使超時被釋放,須要對還沒有運行完的任務延長失效時間

解鎖有以下注意事項:安全

  • 解鎖一系列操做(判斷key是否存在,存在的話刪除key等)須要保證原子性,於是最好使用lua腳本
  • 解鎖須要判斷調用方是否與加鎖時記錄的是否一致,防止鎖被誤刪
  • 若是有延續失效時間的延時任務,在解鎖的時候,須要終止掉該任務

doc

相關文章
相關標籤/搜索