面試官:如何用Redis實現分佈式鎖?

點贊再看,養成習慣,微信搜索【三太子敖丙】關注這個互聯網苟且偷生的工具人。node

本文 GitHub github.com/JavaFamily 已收錄,有一線大廠面試完整考點、資料以及個人系列文章。git

前言

上一章節我提到了基於zk分佈式鎖的實現,這章節就來講一下基於Redis的分佈式鎖實現吧。程序員

在開始提到Redis分佈式鎖以前,我想跟你們聊點Redis的基礎知識。github

說一下Redis的兩個命令:web

SETNX key value
複製代碼

setnx 是SET if Not eXists(若是不存在,則 SET)的簡寫。面試

用法如圖,若是不存在set成功返回int的1,這個key存在了返回0。redis

SETEX key seconds value
複製代碼

將值 value 關聯到 key ,並將 key 的生存時間設爲 seconds (以秒爲單位)。服務器

若是 key 已經存在,setex命令將覆寫舊值。微信

有小夥伴確定會疑惑萬一set value 成功 set time失敗,那不就傻了麼,這啊Redis官網想到了。數據結構

setex是一個原子性(atomic)操做,關聯值和設置生存時間兩個動做會在同一時間內完成。

我設置了10秒的失效時間,ttl命令能夠查看倒計時,負的說明已經到期了。

跟你們講這兩個命名也是有緣由的,由於他們是Redis實現分佈式鎖的關鍵。

正文

開始前仍是看看場景:

我依然是建立了不少個線程去扣減庫存inventory,不出意外的庫存扣減順序變了,最終的結果也是不對的。

單機加synchronized或者Lock這些常規操做我就不說了好吧,結果確定是對的。

我先實現一個簡單的Redis鎖,而後咱們再實現分佈式鎖,可能更方便你們的理解。

還記得上面我說過的命令麼,實現一個單機的其實比較簡單,大家先思考一下,別往下看。

setnx

能夠看到,第一個成功了,沒釋放鎖,後面的都失敗了,至少順序問題問題是解決了,只要加鎖,縮放後面的拿到,釋放如此循環,就能保證按照順序執行。

可是大家也發現問題了,仍是同樣的,第一個仔set成功了,可是忽然掛了,那鎖就一直在那沒法獲得釋放,後面的線程也永遠得不到鎖,又死鎖了。

因此....

setex

知道我以前說這個命令的緣由了吧,設置一個過時時間,就算線程1掛了,也會在失效時間到了,自動釋放。

我這裏就用到了nx和px的結合參數,就是set值而且加了過時時間,這裏我還設置了一個過時時間,就是這時間內若是第二個沒拿到第一個的鎖,就退出阻塞了,由於多是客戶端斷連了。

加鎖

總體加鎖的邏輯比較簡單,你們基本上都能看懂,不過我拿到當前時間去減開始時間的操做感受有點笨, System.currentTimeMillis()消耗很大的。

/**
 * 加鎖
 *
 * @param id
 * @return
 */

public boolean lock(String id) {
    Long start = System.currentTimeMillis();
    try {
        for (; ; ) {
            //SET命令返回OK ,則證實獲取鎖成功
            String lock = jedis.set(LOCK_KEY, id, params);
            if ("OK".equals(lock)) {
                return true;
            }
            //不然循環等待,在timeout時間內仍未獲取到鎖,則獲取失敗
            long l = System.currentTimeMillis() - start;
            if (l >= timeout) {
                return false;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    } finally {
        jedis.close();
    }
}
複製代碼

System.currentTimeMillis消耗大,每一個線程進來都這樣,我以前寫代碼,就會在服務器啓動的時候,開一個線程不斷去拿,調用方直接獲取值就行了,不過也不是最優解,日期類仍是有不少好方法的。

@Service
public class TimeServcie {
    private static long time;
    static {
        new Thread(new Runnable(){
            @Override
            public void run() {
                while (true){
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    long cur = System.currentTimeMillis();
                    setTime(cur);
                }
            }
        }).start();
    }

    public static long getTime() {
        return time;
    }

    public static void setTime(long time) {
        TimeServcie.time = time;
    }
}
複製代碼

解鎖

解鎖的邏輯更加簡單,就是一段Lua的拼裝,把Key作了刪除。

大家發現沒,我上面加鎖解鎖都用了UUID,這就是爲了保證,誰加鎖了誰解鎖,要是你刪掉了個人鎖,那不亂套了嘛。

LUA是原子性的,也比較簡單,就是判斷一下Key和咱們參數是否相等,是的話就刪除,返回成功1,0就是失敗。

/**
 * 解鎖
 *
 * @param id
 * @return
 */

public boolean unlock(String id) {
    String script =
            "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                    "   return redis.call('del',KEYS[1]) " +
                    "else" +
                    "   return 0 " +
                    "end";
    try {
        String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
        return "1".equals(result) ? true : false;
    } finally {
        jedis.close();
    }
}
複製代碼

驗證

咱們能夠用咱們寫的Redis鎖試試效果,能夠看到都按照順序去執行了

思考

你們是否是以爲完美了,可是上面的鎖,有很多瑕疵的,我沒思考不少點,你或許能夠思考一下,源碼我都開源到個人GItHub了。

並且,鎖通常都是須要可重入行的,上面的線程都是執行完了就釋放了,沒法再次進入了,進去也是從新加鎖了,對於一個鎖的設計來講確定不是很合理的。

我不打算手寫,由於都有現成的,別人幫咱們寫好了。

redisson

redisson的鎖,就實現了可重入了,可是他的源碼比較晦澀難懂。

使用起來很簡單,由於他們底層都封裝好了,你鏈接上你的Redis客戶端,他幫你作了我上面寫的一切,而後更完美。

簡單看看他的使用吧,跟正常使用Lock沒啥區別。

ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);
long start = System.currentTimeMillis();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
final RedissonClient client = Redisson.create(config);
final RLock lock = client.getLock("lock1");

for (int i = 0; i <= NUM; i++) {
    threadPoolExecutor.execute(new Runnable() {
        public void run() {
            lock.lock();
            inventory--;
            System.out.println(inventory);
            lock.unlock();
        }
    });
}
long end = System.currentTimeMillis();
System.out.println("執行線程數:" + NUM + "   總耗時:" + (end - start) + "  庫存數爲:" + inventory);
複製代碼

上面能夠看到我用到了getLock,其實就是獲取一個鎖的實例。

RedissionLock也沒作啥,就是熟悉的初始化。

public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    //命令執行器
    this.commandExecutor = commandExecutor;
    //UUID字符串
    this.id = commandExecutor.getConnectionManager().getId();
    //內部鎖過時時間
    this.internalLockLeaseTime = commandExecutor.
                getConnectionManager().getCfg().getLockWatchdogTimeout();
    this.entryName = id + ":" + name;
}
複製代碼

加鎖

有沒有發現不少跟Lock不少類似的地方呢?

嘗試加鎖,拿到當前線程,而後我開頭說的ttl也看到了,是否是一切都是那麼熟悉?

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    
    //當前線程ID
    long threadId = Thread.currentThread().getId();
    //嘗試獲取鎖
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 若是ttl爲空,則證實獲取鎖成功
    if (ttl == null) {
        return;
    }
    //若是獲取鎖失敗,則訂閱到對應這個鎖的channel
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            //再次嘗試獲取鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            //ttl爲空,說明成功獲取鎖,返回
            if (ttl == null) {
                break;
            }
            //ttl大於0 則等待ttl時間後繼續嘗試獲取
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        //取消對channel的訂閱
        unsubscribe(future, threadId);
    }
    //get(lockAsync(leaseTime, unit));
}
複製代碼

獲取鎖

獲取鎖的時候,也比較簡單,你能夠看到,他也是不斷刷新過時時間,跟我上面不斷去拿當前時間,校驗過時是一個道理,只是我比較粗糙。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {

    //若是帶有過時時間,則按照普通方式獲取鎖
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    //先按照30秒的過時時間來執行獲取鎖的方法
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
        commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
        TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        
    //若是還持有這個鎖,則開啓定時任務不斷刷新該鎖的過時時間
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}
複製代碼

底層加鎖邏輯

你可能會想這麼多操做,在一塊兒不是原子性不仍是有問題麼?

大佬們確定想獲得呀,因此仍是LUA,他使用了Hash的數據結構。

主要是判斷鎖是否存在,存在就設置過時時間,若是鎖已經存在了,那對比一下線程,線程是一個那就證實能夠重入,鎖在了,可是不是當前線程,證實別人還沒釋放,那就把剩餘時間返回,加鎖失敗。

是否是有點繞,多理解一遍。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command)
 
{

        //過時時間
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //若是鎖不存在,則經過hset設置它的值,並設置過時時間
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //若是鎖已存在,而且鎖的是當前線程,則經過hincrby給數值遞增1
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //若是鎖已存在,但並不是本線程,則返回過時時間ttl
                  "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }
複製代碼

解鎖

鎖的釋放主要是publish釋放鎖的信息,而後作校驗,同樣會判斷是否當前線程,成功就釋放鎖,還有個hincrby遞減的操做,鎖的值大於0說明是可重入鎖,那就刷新過時時間。

若是值小於0了,那刪掉Key釋放鎖。

是否是又和AQS很像了?

AQS就是經過一個volatile修飾status去看鎖的狀態,也會看數值判斷是不是可重入的。

因此我說代碼的設計,最後就萬劍歸一,都是同樣的。

public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    
    //解鎖方法
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            //獲取返回值
            Boolean opStatus = future.getNow();
            //若是返回空,則證實解鎖的線程和當前鎖不是同一個線程,拋出異常
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException("
                        attempt to unlock lock, not locked by current thread by node id: "

                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            //解鎖成功,取消刷新過時時間的那個定時任務
            if (opStatus) {
                cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}


protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
    
            //若是鎖已經不存在, 發佈鎖釋放的消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            //若是釋放鎖的線程和已存在鎖的線程不是同一個線程,返回null
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            //經過hincrby遞減1的方式,釋放一次鎖
            //若剩餘次數大於0 ,則刷新過時時間
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            //不然證實鎖已經釋放,刪除key併發布鎖釋放的消息
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
複製代碼

總結

這個寫了比較久,可是不是由於複雜什麼的,是由於我的工做的緣由,最近事情不少嘛,仍是那句話,程序員纔是個人本職寫文章只是個愛好,不能本末倒置了。

你們會發現,你學懂一個技術棧以後,學新的會很快,並且也能發現他們的設計思想和技巧真的很巧妙,也總能找到類似點,和讓你驚歎的點。

就拿Doug Lea寫的AbstractQueuedSynchronizer(AQS)來講,他寫了一行代碼,你可能看幾天才能看懂,大佬們的思想是真的牛。

我看源碼有時候也頭疼,可是去谷歌一下,本身理解一下,忽然恍然大悟的時候以爲一切又很值。

學習就是一條時而鬱鬱寡歡,時而開環大笑的路,你們加油,咱們成長路上一塊兒共勉。

我是敖丙,一個在互聯網苟且偷生的工具人。

最好的關係是互相成就,你們的 「三連」就是丙丙創做的最大動力,咱們下期見!

注:若是本篇博客有任何錯誤和建議,歡迎人才們留言,你快說句話啊


文章持續更新,能夠微信搜索「 三太子敖丙 」第一時間閱讀,回覆【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板,本文 GitHub github.com/JavaFamily 已經收錄,有大廠面試完整考點,歡迎Star。

你知道的越多,你不知道的越多

相關文章
相關標籤/搜索