RedissonLock分佈式鎖源碼分析

最近碰到的一個問題,Java代碼中寫了一個定時器,分佈式部署的時候,多臺同時執行的話就會出現重複的數據,爲了不這種狀況,以前是經過在配置文件裏寫上能夠執行這段代碼的IP,代碼中判斷若是跟這個IP相等,則執行,不然不執行,想一想也是一種比較簡單的方式吧,可是感受很low很low,因此改用分佈式鎖。
目前分佈式鎖經常使用的三種方式:1.數據庫的鎖;2.基於Redis的分佈式鎖;3.基於ZooKeeper的分佈式鎖。其中數據庫中的鎖有共享鎖和排他鎖,這兩種都沒法直接解決數據庫的單點和可重入的問題,因此,本章仍是來說講基於Redis的分佈式鎖,也能夠用其餘緩存(Memcache、Tair等)來實現。html

1、實現分佈式鎖的要求

  1. 互斥性。在任什麼時候候,當且僅有一個客戶端可以持有鎖。
  2. 不能有死鎖。持有鎖的客戶端崩潰後,後續客戶端可以加鎖。
  3. 容錯性。大部分Redis或者ZooKeeper節點可以正常運行。
  4. 加鎖解鎖相同。加鎖的客戶端和解鎖的客戶端必須爲同一個客戶端,不能讓其餘的解鎖了。

2、Redis實現分佈式鎖的經常使用命令

1.SETNX key val
當且僅當key不存在時,set一個key爲val的字符串,返回1;若key存在,則什麼都不作,返回0。
2.expire key timeout
爲key設置一個超時時間,單位爲second,超過這個時間鎖會自動釋放,避免死鎖。
3.delete key
刪除key,此處用來解鎖使用。
4.HEXISTS key field
當key 中存儲着field的時候返回1,若是key或者field至少有一個不存在返回0。
5.HINCRBY key field increment
將存儲在 key 中的哈希(Hash)對象中的指定字段 field 的值加上增量 increment。若是鍵 key 不存在,一個保存了哈希對象的新建將被建立。若是字段 field 不存在,在進行當前操做前,其將被建立,且對應的值被置爲 0。返回值是增量以後的值。java

3、常見寫法

由上面三個命令,咱們能夠很快的寫一個分佈式鎖出來:node

if (conn.setnx("lock","1").equals(1L)) { 
    //do something
    return true; 
} 
return false;

可是這樣也會存在問題,若是獲取該鎖的客戶端掛掉了怎麼辦?通常而言,咱們能夠經過設置expire的過時時間來防止客戶端掛掉所帶來的影響,能夠下降應用掛掉所帶來的影響,不過當時間失效的時候,要保證其餘客戶端只有一臺可以獲取。git

4、Redisson

Redisson在基於NIO的Netty框架上,充分的利用了Redis鍵值數據庫提供的一系列優點,在Java實用工具包中經常使用接口的基礎上,爲使用者提供了一系列具備分佈式特性的經常使用工具類。使得本來做爲協調單機多線程併發程序的工具包得到了協調分佈式多機多線程併發系統的能力,大大下降了設計和研發大規模分佈式系統的難度。同時結合各富特點的分佈式服務,更進一步簡化了分佈式環境中程序相互之間的協做。——摘自百度百科github

4.1 測試例子

先在pom引入Redssionredis

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.1</version>
</dependency>

起100個線程,同時對count進行操做,每次操做減1,加鎖的時候可以保持順序輸出,不加的話爲隨機。數據庫

public class RedissonTest implements Runnable {
    private static RedissonClient redisson;
    private static int count = 10000;

    private static void init() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://119.23.46.71:6340")
                .setPassword("root")
                .setDatabase(10);
        redisson = Redisson.create(config);
    }

    @Override
    public void run() {
        RLock lock = redisson.getLock("anyLock");
        lock.lock();
        count--;
        System.out.println(count);
        lock.unlock();
    }

    public static void main(String[] args) {
        init();
        for (int i = 0; i < 100; i++) {
            new Thread(new RedissonTest()).start();
        }
    }
}

輸出結果(部分結果):api

...
9930
9929
9928
9927
9926
9925
9924
9923
9922
9921

...

去掉lock.lock()和lock.unlock()以後(部分結果):緩存

...
9930
9931
9933
9935
9938
9937
9940
9941
9942
9944
9947
9946
9914
...

5、RedissonLock源碼分析

最新版的Redisson要求redis可以支持eval的命令,不然沒法實現,即Redis要求2.6版本以上。在lua腳本中能夠調用大部分的Redis命令,使用腳本的好處以下:
(1)減小網絡開銷:在Redis操做需求須要向Redis發送5次請求,而使用腳本功能完成一樣的操做只須要發送一個請求便可,減小了網絡往返時延。
(2)原子操做:Redis會將整個腳本做爲一個總體執行,中間不會被其餘命令插入。換句話說在編寫腳本的過程當中無需擔憂會出現競態條件,也就無需使用事務。事務能夠完成的全部功能均可以用腳原本實現。
(3)複用:客戶端發送的腳本會永久存儲在Redis中,這就意味着其餘客戶端(能夠是其餘語言開發的項目)能夠複用這一腳本而不須要使用代碼完成一樣的邏輯。安全

5.1 使用到的全局變量

全局變量:
expirationRenewalMap:存儲entryName和其過時時間,底層用的netty的PlatformDependent.newConcurrentHashMap()
internalLockLeaseTime:鎖默認釋放的時間:30 * 1000,即30秒
id:UUID,用做客戶端的惟一標識
PUBSUB:訂閱者模式,當釋放鎖的時候,其餘客戶端可以知道鎖已經被釋放的消息,並讓隊列中的第一個消費者獲取鎖。使用PUB/SUB消息機制的優勢:減小申請鎖時的等待時間、安全、 鎖帶有超時時間、鎖的標識惟一,防止死鎖 鎖設計爲可重入,避免死鎖。
commandExecutor:命令執行器,異步執行器

5.2 加鎖

以lock.lock()爲例,調用lock以後,底層使用的是lockInterruptibly,以後調用lockInterruptibly(-1, null);

(1)咱們來看一下lockInterruptibly的源碼,若是別的客戶端沒有加鎖,則當前客戶端進行加鎖而且訂閱,其餘客戶端嘗試加鎖,而且獲取ttl,而後等待已經加了鎖的客戶端解鎖。

//leaseTime默認爲-1
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();//獲取當前線程ID
    Long ttl = tryAcquire(leaseTime, unit, threadId);//嘗試加鎖
    // 若是爲空,當前線程獲取鎖成功,不然已經被其餘客戶端加鎖
    if (ttl == null) {
        return;
    }
    //等待釋放,並訂閱鎖
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            // 從新嘗試獲取鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 成功獲取鎖
            if (ttl == null) {
                break;
            }
            // 等待鎖釋放
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 取消訂閱
        unsubscribe(future, threadId);
    }
}

(2)下面是tryAcquire的實現,調用的是tryAcquireAsync

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }

(3)下面是tryAcquireAsync的實現,異步嘗試進行加鎖,嘗試加鎖的時候leaseTime爲-1。一般若是客戶端沒有加鎖成功,則會進行阻塞,leaseTime爲鎖釋放的時間。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {   //在lock.lock()的時候,已經聲明瞭leaseTime爲-1,嘗試加鎖
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    //監聽事件,訂閱消息
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                //獲取新的超時時間
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;  //返回ttl時間
}

(4)下面是tryLockInnerAsyncy異步加鎖,使用lua可以保證操做是原子性的

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

參數
KEYS[1](getName()) :須要加鎖的key,這裏須要是字符串類型。
ARGV[1](internalLockLeaseTime) :鎖的超時時間,防止死鎖
ARGV[2](getLockName(threadId)) :鎖的惟一標識,也就是剛纔介紹的 id(UUID.randomUUID()) + 「:」 + threadId
lua腳本解釋

--檢查key是否被佔用了,若是沒有則設置超時時間和惟一標識,初始化value=1
if (redis.call('exists', KEYS[1]) == 0) then
  redis.call('hset', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil; 
end; 
--若是鎖重入,須要判斷鎖的key field 都一致狀況下 value 加一 
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  redis.call('hincrby', KEYS[1], ARGV[2], 1);
  --鎖重入從新設置超時時間  
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end;
--返回剩餘的過時時間
return redis.call('pttl', KEYS[1]);

(5)流程圖

5.3 解鎖

解鎖的代碼很簡單,大意是將該節點刪除,併發布消息。
(1)unlock源碼

public void unlock() {
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }

(2)異步解鎖,並返回是否成功

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

輸入的參數有:
參數:
KEYS[1](getName()):須要加鎖的key,這裏須要是字符串類型。
KEYS[2](getChannelName()):redis消息的ChannelName,一個分佈式鎖對應惟一的一個 channelName:「redisson_lock__channel__{」 + getName() + 「}」
ARGV[1](LockPubSub.unlockMessage):redis消息體,這裏只須要一個字節的標記就能夠,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其餘訂閱解鎖消息的客戶端線程申請鎖。
ARGV[2](internalLockLeaseTime):鎖的超時時間,防止死鎖
ARGV[3](getLockName(threadId)) :鎖的惟一標識,也就是剛纔介紹的 id(UUID.randomUUID()) + 「:」 + threadId

此處lua腳本的做用:

--若是keys[1]不存在,則發佈消息,說明已經被解鎖了
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
--key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
--將value減1,這裏主要用在重入鎖
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
--刪除key並消息
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1;
end; 
return nil;

(3)刪除過時信息

void cancelExpirationRenewal() {
    Timeout task = expirationRenewalMap.remove(getEntryName());
    if (task != null) {
        task.cancel();
    }
}

總結

Redis2.6版本以後引入了eval,可以支持lua腳本,更好的保證了redis的原子性,並且redisson採用了大量異步的寫法來避免性能所帶來的影響。本文只是講解了下redisson的重入鎖,其還有公平鎖、聯鎖、紅鎖、讀寫鎖等,有興趣的能夠看下。感受這篇文章寫得也不是很好,畢竟netty還沒開始學,有些api也不太清楚,但願各位大佬可以建議建議~~

參考:
1.redisson
2.Redis分佈式鎖的正確實現方式
3.分佈式鎖的多種實現方式
4.用Redis構建分佈式鎖
5.基於Redis的分佈式鎖實現
6.基於Redis實現分佈式鎖,Redisson使用及源碼分析

相關文章
相關標籤/搜索