【高併發】高併發分佈式鎖架構解密,不是全部的鎖都是分佈式鎖!!

寫在前面

最近,不少小夥伴留言說,在學習高併發編程時,不太明白分佈式鎖是用來解決什麼問題的,還有很多小夥伴甚至連分佈式鎖是什麼都不太明白。明明在生產環境上使用了本身開發的分佈式鎖,爲何還會出現問題呢?一樣的程序,加上分佈式鎖後,性能差了幾個數量級!這又是爲何呢?今天,咱們就來講說如何在高併發環境下實現分佈式鎖,不是全部的鎖都是高併發的。html

萬字長文,帶你深刻解密高併發環境下的分佈式鎖架構,不是全部的鎖都是分佈式鎖!!!前端

究竟什麼樣的鎖才能更好的支持高併發場景呢?今天,咱們就一塊兒解密高併發環境下典型的分佈式鎖架構,結合【高併發】專題下的其餘文章,學以至用。java

鎖用來解決什麼問題呢?

在咱們編寫的應用程序或者高併發程序中,不知道你們有沒有想過一個問題,就是咱們爲何須要引入鎖?鎖爲咱們解決了什麼問題呢?redis

在不少業務場景下,咱們編寫的應用程序中會存在不少的 資源競爭 的問題。而咱們在高併發程序中,引入鎖,就是爲了解決這些資源競爭的問題。算法

電商超賣問題

這裏,咱們能夠列舉一個簡單的業務場景。好比,在電子商務(商城)的業務場景中,提交訂單購買商品時,首先須要查詢相應商品的庫存是否足夠,只有在商品庫存數量足夠的前提下,才能讓用戶成功的下單。下單時,咱們須要在庫存數量中減去用戶下單的商品數量,並將庫存操做的結果數據更新到數據庫中。整個流程咱們能夠簡化成下圖所示。spring

不少小夥伴也留言說,讓我給出代碼,這樣可以更好的學習和掌握相關的知識。好吧,這裏,我也給出相應的代碼片斷吧。咱們可使用下面的代碼片斷來表示用戶的下單操做,我這裏將商品的庫存信息保存在了Redis中。數據庫

@RequestMapping("/submitOrder")
public String submitOrder(){
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
    }else{
        logger.debug("庫存不足,扣減庫存失敗");
        throw new OrderException("庫存不足,扣減庫存失敗");
    }
    return "success";
}

注意:上述代碼片斷比較簡單,只是爲了方便你們理解,真正項目中的代碼就不能這麼寫了。編程

上述的代碼看似是沒啥問題的,可是咱們不能只從代碼表面上來觀察代碼的執行順序。這是由於在JVM中代碼的執行順序未必是按照咱們書寫代碼的順序執行的。即便在JVM中代碼是按照咱們書寫的順序執行,那咱們對外提供的接口一旦暴露出去,就會有成千上萬的客戶端來訪問咱們的接口。因此說,咱們暴露出去的接口是會被併發訪問的。數組

試問,上面的代碼在高併發環境下是線程安全的嗎?答案確定不是線程安全的,由於上述扣減庫存的操做會出現並行執行的狀況。緩存

咱們可使用Apache JMeter來對上述接口進行測試,這裏,我使用Apache JMeter對上述接口進行測試。

在Jmeter中,我將線程的併發度設置爲3,接下來的配置以下所示。

以HTTP GET請求的方式來併發訪問提交訂單的接口。此時,運行JMeter來訪問接口,命令行會打印出下面的日誌信息。

庫存扣減成功,當前庫存爲:49
庫存扣減成功,當前庫存爲:49
庫存扣減成功,當前庫存爲:49

這裏,咱們明明請求了3次,也就是說,提交了3筆訂單,爲何扣減後的庫存都是同樣的呢?這種現象在電商領域有一個專業的名詞叫作 「超賣」

若是一個大型的高併發電商系統,好比淘寶、天貓、京東等,出現了超賣現象,那損失就沒法估量了!架構設計和開發電商系統的人員估計就要統統下崗了。因此,做爲技術人員,咱們必定要嚴謹的對待技術,嚴格作好系統的每個技術環節。

JVM中提供的鎖

JVM中提供的synchronized和Lock鎖,相信你們並不陌生了,不少小夥伴都會使用這些鎖,也能使用這些鎖來實現一些簡單的線程互斥功能。那麼,做爲立志要成爲架構師的你,是否瞭解過JVM鎖的底層原理呢?

JVM鎖原理

說到JVM鎖的原理,咱們就不得不限說說Java中的對象頭了。

Java中的對象頭

每一個Java對象都有對象頭。若是是⾮數組類型,則⽤2個字寬來存儲對象頭,若是是數組,則會⽤3個字寬來存儲對象頭。在32位處理器中,⼀個字寬是32位;在64位虛擬機中,⼀個字寬是64位。

對象頭的內容以下表 。

長度 內容 說明
32/64bit Mark Word 存儲對象的hashCode或鎖信息等
32/64bit Class Metadata Access 存儲到對象類型數據的指針
32/64bit Array length 數組的長度(若是是數組)

Mark Work的格式以下所示。

鎖狀態 29bit或61bit 1bit是不是偏向鎖? 2bit鎖標誌位
無鎖 0 01
偏向鎖 線程ID 1 01
輕量級鎖 指向棧中鎖記錄的指針 此時這一位不用於標識偏向鎖 00
重量級鎖 指向互斥量(重量級鎖)的指針 此時這一位不用於標識偏向鎖 10
GC標記 此時這一位不用於標識偏向鎖 11

能夠看到,當對象狀態爲偏向鎖時, Mark Word 存儲的是偏向的線程ID;當狀態爲輕量級鎖時, Mark Word 存儲的是指向線程棧中 Lock Record 的指針;當狀態爲重量級鎖時, Mark Word 爲指向堆中的monitor對象的指針 。

有關Java對象頭的知識,參考《深刻淺出Java多線程》。

JVM鎖原理

簡單點來講,JVM中鎖的原理以下。

在Java對象的對象頭上,有一個鎖的標記,好比,第一個線程執行程序時,檢查Java對象頭中的鎖標記,發現Java對象頭中的鎖標記爲未加鎖狀態,因而爲Java對象進行了加鎖操做,將對象頭中的鎖標記設置爲鎖定狀態。第二個線程執行一樣的程序時,也會檢查Java對象頭中的鎖標記,此時會發現Java對象頭中的鎖標記的狀態爲鎖定狀態。因而,第二個線程會進入相應的阻塞隊列中進行等待。

這裏有一個關鍵點就是Java對象頭中的鎖標記如何實現。

JVM鎖的短板

JVM中提供的synchronized和Lock鎖都是JVM級別的,你們都知道,當運行一個Java程序時,會啓動一個JVM進程來運行咱們的應用程序。synchronized和Lock在JVM級別有效,也就是說,synchronized和Lock在同一Java進程內有效。若是咱們開發的應用程序是分佈式的,那麼只是使用synchronized和Lock來解決分佈式場景下的高併發問題,就會顯得有點力不從心了。

synchronized和Lock支持JVM同一進程內部的線程互斥

synchronized和Lock在JVM級別可以保證高併發程序的互斥,咱們可使用下圖來表示。

可是,當咱們將應用程序部署成分佈式架構,或者將應用程序在不一樣的JVM進程中運行時,synchronized和Lock就不能保證分佈式架構和多JVM進程下應用程序的互斥性了。

synchronized和Lock不能實現多JVM進程之間的線程互斥

分佈式架構和多JVM進程的本質都是將應用程序部署在不一樣的JVM實例中,也就是說,其本質仍是多JVM進程。

分佈式鎖

咱們在實現分佈式鎖時,能夠參照JVM鎖實現的思想,JVM鎖在爲對象加鎖時,經過改變Java對象的對象頭中的鎖的標誌位來實現,也就是說,全部的線程都會訪問這個Java對象的對象頭中的鎖標誌位。

咱們一樣以這種思想來實現分佈式鎖,當咱們將應用程序進行拆分並部署成分佈式架構時,全部應用程序中的線程訪問共享變量時,都到同一個地方去檢查當前程序的臨界區是否進行了加鎖操做,而是否進行了加鎖操做,咱們在統一的地方使用相應的狀態來進行標記。

能夠看到,在分佈式鎖的實現思想上,與JVM鎖相差不大。而在實現分佈式鎖中,保存加鎖狀態的服務可使用MySQL、Redis和Zookeeper實現。

可是,在互聯網高併發環境中, 使用Redis實現分佈式鎖的方案是使用的最多的。 接下來,咱們就使用Redis來深刻解密分佈式鎖的架構設計。

Redis如何實現分佈式鎖

Redis命令

在Redis中,有一個不常使用的命令以下所示。

SETNX key value

這條命令的含義就是「SET if Not Exists」,即不存在的時候纔會設置值。

只有在key不存在的狀況下,將鍵key的值設置爲value。若是key已經存在,則SETNX命令不作任何操做。

這個命令的返回值以下。

  • 命令在設置成功時返回1。
  • 命令在設置失敗時返回0。

因此,咱們在分佈式高併發環境下,可使用Redis的SETNX命令來實現分佈式鎖。假設此時有線程A和線程B同時訪問臨界區代碼,假設線程A首先執行了SETNX命令,並返回結果1,繼續向下執行。而此時線程B再次執行SETNX命令時,返回的結果爲0,則線程B不能繼續向下執行。只有當線程A執行DELETE命令將設置的鎖狀態刪除時,線程B纔會成功執行SETNX命令設置加鎖狀態後繼續向下執行。

引入分佈式鎖

瞭解瞭如何使用Redis中的命令實現分佈式鎖後,咱們就能夠對下單接口進行改造了,加入分佈式鎖,以下所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量做爲商品的id
* 實際工做中,這個商品id是前端進行下單操做傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //經過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串「binghe」
    //實際上,value能夠爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
    }else{
        logger.debug("庫存不足,扣減庫存失敗");
        throw new OrderException("庫存不足,扣減庫存失敗");
    }
    //業務執行完成,刪除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    return "success";
}

那麼,在上述代碼中,咱們加入了分佈式鎖的操做,那上述代碼是否可以在高併發場景下保證業務的原子性呢?答案是能夠保證業務的原子性。可是,在實際場景中,上面實現分佈式鎖的代碼是不可用的!!

假設當線程A首先執行stringRedisTemplate.opsForValue()的setIfAbsent()方法返回true,繼續向下執行,正在執行業務代碼時,拋出了異常,線程A直接退出了JVM。此時,stringRedisTemplate.delete(PRODUCT_ID);代碼還沒來得及執行,以後全部的線程進入提交訂單的方法時,調用stringRedisTemplate.opsForValue()的setIfAbsent()方法都會返回false。致使後續的全部下單操做都會失敗。這就是分佈式場景下的死鎖問題。

因此,上述代碼中實現分佈式鎖的方式在實際場景下是不可取的!!

引入try-finally代碼塊

說到這,相信小夥伴們都可以想到,使用try-finall代碼塊啊,接下來,咱們爲下單接口的方法加上try-finally代碼塊。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量做爲商品的id
* 實際工做中,這個商品id是前端進行下單操做傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //經過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串「binghe」
    //實際上,value能夠爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

那麼,上述代碼是否真正解決了死鎖的問題呢?咱們在寫代碼時,不能只盯着代碼自己,以爲上述代碼沒啥問題了。實際上,生產環境是很是複雜的。若是線程在成功加鎖以後,執行業務代碼時,還沒來得及執行刪除鎖標誌的代碼,此時,服務器宕機了,程序並無優雅的退出JVM。也會使得後續的線程進入提交訂單的方法時,因沒法成功的設置鎖標誌位而下單失敗。因此說,上述的代碼仍然存在問題。

引入Redis超時機制

在Redis中能夠設置緩存的自動過時時間,咱們能夠將其引入到分佈式鎖的實現中,以下代碼所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量做爲商品的id
* 實際工做中,這個商品id是前端進行下單操做傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //經過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串「binghe」
    //實際上,value能夠爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代碼中,咱們加入了以下一行代碼來爲Redis中的鎖標誌設置過時時間。

stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

此時,咱們設置的過時時間爲30秒。

那麼問題來了,這樣是否就真正的解決了問題呢?上述程序就真的沒有坑了嗎?答案是仍是有坑的!!

「坑位」分析

咱們在下單操做的方法中爲分佈式鎖引入了超時機制,此時的代碼仍是沒法真正避免死鎖的問題,那「坑位」到底在哪裏呢?試想,當程序執行完stringRedisTemplate.opsForValue().setIfAbsent()方法後,正要執行stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代碼時,服務器宕機了,你還別說,生產壞境的狀況很是複雜,就是這麼巧,服務器就宕機了。此時,後續請求進入提交訂單的方法時,都會由於沒法成功設置鎖標誌而致使後續下單流程沒法正常執行。

既然咱們找到了上述代碼的「坑位」,那咱們如何將這個」坑「填上?如何解決這個問題呢?別急,Redis已經提供了這樣的功能。咱們能夠在向Redis中保存數據的時候,能夠同時指定數據的超時時間。因此,咱們能夠將代碼改形成以下所示。

/**
* 爲了演示方便,我這裏就簡單定義了一個常量做爲商品的id
* 實際工做中,這個商品id是前端進行下單操做傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //經過stringRedisTemplate來調用Redis的SETNX命令,key爲商品的id,value爲字符串「binghe」
    //實際上,value能夠爲任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存爲:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代碼中,咱們在向Redis中設置鎖標誌位的時候就設置了超時時間。此時,只要向Redis中成功設置了數據,則即便咱們的業務系統宕機,Redis中的數據過時後,也會自動刪除。後續的線程進入提交訂單的方法後,就會成功的設置鎖標誌位,並向下執行正常的下單流程。

到此,上述的代碼基本上在功能角度解決了程序的死鎖問題,那麼,上述程序真的就完美了嗎?哈哈,不少小夥伴確定會說不完美!確實,上面的代碼還不是完美的,那你們知道哪裏不完美嗎?接下來,咱們繼續分析。

在開發集成角度分析代碼

在咱們開發公共的系統組件時,好比咱們這裏說的分佈式鎖,咱們確定會抽取一些公共的類來完成相應的功能來供系統使用。

這裏,假設咱們定義了一個RedisLock接口,以下所示。

public interface RedisLock{
    //加鎖操做
    boolean tryLock(String key, long timeout, TimeUnit unit);
    //解鎖操做
    void releaseLock(String key);
}

接下來,使用RedisLockImpl類實現RedisLock接口,提供具體的加鎖和解鎖實現,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        stringRedisTemplate.delete(key);
    }
}

在開發集成的角度來講,當一個線程從上到下執行時,首先對程序進行加鎖操做,而後執行業務代碼,執行完成後,再進行釋放鎖的操做。理論上,加鎖和釋放鎖時,操做的Redis Key都是同樣的。可是,若是其餘開發人員在編寫代碼時,並無調用tryLock()方法,而是直接調用了releaseLock()方法,而且他調用releaseLock()方法傳遞的key與你調用tryLock()方法傳遞的key是同樣的。那此時就會出現問題了,他在編寫代碼時,硬生生的將你加的鎖釋放了!!!

因此,上述代碼是不安全的,別人可以隨隨便便的將你加的鎖刪除,這就是鎖的誤刪操做,這是很是危險的,因此,上述的程序存在很嚴重的問題!!

那如何實現只有加鎖的線程才能進行相應的解鎖操做呢? 繼續向下看。

如何實現加鎖和解鎖的歸一化?

什麼是加鎖和解鎖的歸一化呢?簡單點來講,就是一個線程執行了加鎖操做後,後續必須由這個線程執行解鎖操做,加鎖和解鎖操做由同一個線程來完成。

爲了解決只有加鎖的線程才能進行相應的解鎖操做的問題,那麼,咱們就須要將加鎖和解鎖操做綁定到同一個線程中,那麼,如何將加鎖操做和解鎖操做綁定到同一個線程呢?其實很簡單,相信不少小夥伴都想到了—— 使用ThreadLocal實現 。沒錯,使用ThreadLocal類確實可以解決這個問題。

此時,咱們將RedisLockImpl類的代碼修改爲以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

上述代碼的主要邏輯爲:在對程序執行嘗試加鎖操做時,首先生成一個uuid,將生成的uuid綁定到當前線程,並將傳遞的key參數操做Redis中的key,生成的uuid做爲Redis中的Value,保存到Redis中,同時設置超時時間。當執行解鎖操做時,首先,判斷當前線程中綁定的uuid是否和Redis中存儲的uuid相等,只有兩者相等時,纔會執行刪除鎖標誌位的操做。這就避免了一個線程對程序進行了加鎖操做後,其餘線程對這個鎖進行了解鎖操做的問題。

繼續分析

咱們將加鎖和解鎖的方法改爲以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private String lockUUID;
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        lockUUID = uuid;
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

相信不少小夥伴都會看出上述代碼存在什麼問題了!! 沒錯,那就是 線程安全的問題。

因此,這裏,咱們須要使用ThreadLocal來解決線程安全問題。

可重入性分析

在上面的代碼中,當一個線程成功設置了鎖標誌位後,其餘的線程再設置鎖標誌位時,就會返回失敗。還有一種場景就是在提交訂單的接口方法中,調用了服務A,服務A調用了服務B,而服務B的方法中存在對同一個商品的加鎖和解鎖操做。

因此,服務B成功設置鎖標誌位後,提交訂單的接口方法繼續執行時,也不能成功設置鎖標誌位了。也就是說,目前實現的分佈式鎖沒有可重入性。

這裏,就存在可重入性的問題了。咱們但願設計的分佈式鎖 具備可重入性 ,那什麼是可重入性呢?簡單點來講,就是同一個線程,可以屢次獲取同一把鎖,而且可以按照順序進行解決操做。

其實,在JDK 1.5以後提供的鎖不少都支持可重入性,好比synchronized和Lock。

如何實現可重入性呢?

映射到咱們加鎖和解鎖方法時,咱們如何支持同一個線程可以屢次獲取到鎖(設置鎖標誌位)呢?能夠這樣簡單的設計:若是當前線程沒有綁定uuid,則生成uuid綁定到當前線程,而且在Redis中設置鎖標誌位。若是當前線程已經綁定了uuid,則直接返回true,證實當前線程以前已經設置了鎖標誌位,也就是說已經獲取到了鎖,直接返回true。

結合以上分析,咱們將提交訂單的接口方法代碼改形成以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

這樣寫看似沒有啥問題,可是你們細想一下,這樣寫就真的OK了嗎?

可重入性的問題分析

既然上面分佈式鎖的可重入性是存在問題的,那咱們就來分析下問題的根源在哪裏!

假設咱們提交訂單的方法中,首先使用RedisLock接口對代碼塊添加了分佈式鎖,在加鎖後的代碼中調用了服務A,而服務A中也存在調用RedisLock接口的加鎖和解鎖操做。而屢次調用RedisLock接口的加鎖操做時,只要以前的鎖沒有失效,則會直接返回true,表示成功獲取鎖。也就是說,不管調用加鎖操做多少次,最終只會成功加鎖一次。而執行完服務A中的邏輯後,在服務A中調用RedisLock接口的解鎖方法,此時,會將當前線程全部的加鎖操做得到的鎖所有釋放掉。

咱們可使用下圖來簡單的表示這個過程。

那麼問題來了,如何解決可重入性的問題呢?

解決可重入性問題

相信不少小夥伴都可以想出使用計數器的方式來解決上面可重入性的問題,沒錯,就是使用計數器來解決。 總體流程以下所示。

那麼,體如今程序代碼上是什麼樣子呢?咱們來修改RedisLockImpl類的代碼,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

至此,咱們基本上解決了分佈式鎖的可重入性問題。

說到這裏,我還要問你們一句,上面的解決問題的方案真的沒問題了嗎?

阻塞與非阻塞鎖

在提交訂單的方法中,當獲取Redis分佈式鎖失敗時,咱們直接返回了failure來表示當前請求下單的操做失敗了。試想,在高併發環境下,一旦某個請求得到了分佈式鎖,那麼,在這個請求釋放鎖以前,其餘的請求調用下單方法時,都會返回下單失敗的信息。在真實場景中,這是很是不友好的。咱們能夠將後續的請求進行阻塞,直到當前請求釋放鎖後,再喚醒阻塞的請求得到分佈式鎖來執行方法。

因此,咱們設計的分佈式鎖須要支持 阻塞和非阻塞 的特性。

那麼,如何實現阻塞呢?咱們可使用自旋來實現,繼續修改RedisLockImpl的代碼以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

在分佈式鎖的設計中,阻塞鎖和非阻塞鎖 是很是重要的概念,你們必定要記住這個知識點。

鎖失效問題

儘管咱們實現了分佈式鎖的阻塞特性,可是還有一個問題是咱們不得不考慮的。那就是 鎖失效 的問題。

當程序執行業務的時間超過了鎖的過時時間會發生什麼呢? 想必不少小夥伴都可以想到,那就是前面的請求沒執行完,鎖過時失效了,後面的請求獲取到分佈式鎖,繼續向下執行了,程序沒法作到真正的互斥,沒法保證業務的原子性了。

那如何解決這個問題呢?答案就是:咱們必須保證在業務代碼執行完畢後,才能釋放分佈式鎖。 方案是有了,那如何實現呢?

說白了,咱們須要在業務代碼中,時不時的執行下面的代碼來保證在業務代碼沒執行完時,分佈式鎖不會因超時而被釋放。

springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

這裏,咱們須要定義一個定時策略來執行上面的代碼,須要注意的是:咱們不能等到30秒後再執行上述代碼,由於30秒時,鎖已經失效了。例如,咱們能夠每10秒執行一次上面的代碼。

有些小夥伴說,直接在RedisLockImpl類中添加一個while(true)循環來解決這個問題,那咱們就這樣修改下RedisLockImpl類的代碼,看看有沒有啥問題。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //定義更新鎖的過時時間
            while(true){
                Integer count = threadLocalInteger.get();
                //當前鎖已經被釋放,則退出循環
                if(count == 0 || count <= 0){
                    break;
                }
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒執行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

相信小夥伴們看了代碼就會發現哪裏有問題了:更新鎖過時時間的代碼確定不能這麼去寫。由於這麼寫會 致使當前線程在更新鎖超時時間的while(true)循環中一直阻塞而沒法返回結果。 因此,咱們不能將當前線程阻塞,須要異步執行定時任務來更新鎖的過時時間。

此時,咱們繼續修改RedisLockImpl類的代碼,將定時更新鎖超時的代碼放到一個單獨的線程中執行,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //啓動新線程來執行定時任務,更新鎖過時時間
           new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate)).start();
        }else{
            isLocked = true;   
        }
        //加鎖成功後將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操做
        String uuid = stringRedisTemplate.opsForValue().get(key);
        if(threadLocal.get().equals(uuid)){
            Integer count = threadLocalInteger.get();
            //計數器減爲0時釋放鎖
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key); 
                //獲取更新鎖超時時間的線程並中斷
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if(updateLockTimeoutThread != null){
                     //中斷更新鎖超時時間的線程
                    updateLockTimeoutThread.interrupt();
                    stringRedisTemplate.delete(uuid);   
                }
            }
        }
    }
}

建立UpdateLockTimeoutTask類來執行更新鎖超時的時間。

public class UpdateLockTimeoutTask implements Runnable{
    //uuid
    private long uuid;
    private StringRedisTemplate stringRedisTemplate;
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate){
        this.uuid = uuid;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public void run(){
        //以uuid爲key,當前線程id爲value保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
         //定義更新鎖的過時時間
        while(true){
            springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
            try{
                //每隔10秒執行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

接下來,咱們定義一個ThreadUtils工具類,這個工具類中有一個根據線程id獲取線程的方法getThreadByThreadId(long threadId)。

public class ThreadUtils{
    //根據線程id獲取線程句柄
    public static Thread getThreadByThreadId(long threadId){
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for(int i = 0; i < count; i++){
                if(threadId == threads[i].getId()){
                    return threads[i];
                }
            }
        }
    }
}

上述解決分佈式鎖失效的問題在分佈式鎖領域有一個專業的術語叫作 「異步續命」 。須要注意的是:當業務代碼執行完畢後,咱們須要中止更新鎖超時時間的線程。因此,這裏,我對程序的改動是比較大的,首先,將更新鎖超時的時間任務從新定義爲一個UpdateLockTimeoutTask類,並將uuid和StringRedisTemplate注入到任務類中,在執行定時更新鎖超時時間時,首先將當前線程保存到Redis中,其中Key爲傳遞進來的uuid。

在首先獲取分佈式鎖後,從新啓動線程,並將uuid和StringRedisTemplate傳遞到任務類中執行任務。當業務代碼執行完畢後,調用releaseLock()方法釋放鎖時,咱們會經過uuid從Redis中獲取更新鎖超時時間的線程id,並經過線程id獲取到更新鎖超時時間的線程,調用線程的interrupt()方法來中斷線程。

此時,當分佈式鎖釋放後,更新鎖超時的線程就會因爲線程中斷而退出了。

實現分佈式鎖的基本要求

結合上述的案例,咱們能夠得出實現分佈式鎖的基本要求:

  • 支持互斥性
  • 支持鎖超時
  • 支持阻塞和非阻塞特性
  • 支持可重入性
  • 支持高可用

通用分佈式解決方案

在互聯網行業,分佈式鎖是一個繞不開的話題,同時,也有不少通用的分佈式鎖解決方案,其中,用的比較多的一種方案就是使用開源的Redisson框架來解決分佈式鎖問題。

有關Redisson分佈式鎖的使用方案你們能夠參考《【高併發】你知道嗎?你們都在使用Redisson實現分佈式鎖了!!

既然Redisson框架已經很牛逼了,咱們直接使用Redisson框架是否可以100%的保證分佈式鎖不出問題呢?答案是沒法100%的保證。由於在分佈式領域沒有哪一家公司或者架構師可以保證100%的不出問題,就連阿里這樣的大公司、阿里的首席架構師這樣的技術大牛也不敢保證100%的不出問題。

在分佈式領域,沒法作到100%無端障,咱們追求的是幾個9的目標,例如99.999%無端障。

CAP理論

在分佈式領域,有一個很是重要的理論叫作CAP理論。

  • C:Consistency(一致性)
  • A:Availability(可用性)
  • P:Partition tolerance(分區容錯性)

在分佈式領域中,是必需要保證分區容錯性的,也就是必需要保證「P」,因此,咱們只能保證CP或者AP。

這裏,咱們可使用Redis和Zookeeper來進行簡單的對比,咱們可使用Redis實現AP架構的分佈式鎖,使用Zookeeper實現CP架構的分佈式鎖。

  • 基於Redis的AP架構的分佈式鎖模型

在基於Redis實現的AP架構的分佈式鎖模型中,向Redis節點1寫入數據後,會當即返回結果,以後在Redis中會以異步的方式來同步數據。

  • 基於Zookeeper的CP架構的分佈式鎖模型

在基於Zookeeper實現的CP架構的分佈式模型中,向節點1寫入數據後,會等待數據的同步結果,當數據在大多數Zookeeper節點間同步成功後,纔會返回結果數據。

當咱們使用基於Redis的AP架構實現分佈式鎖時,須要注意一個問題,這個問題可使用下圖來表示。

也就是Redis主從節點之間的數據同步失敗,假設線程向Master節點寫入了數據,而Redis中Master節點向Slave節點同步數據失敗了。此時,另外一個線程讀取的Slave節點中的數據,發現沒有添加分佈式鎖,此時就會出現問題了!!!

因此,在設計分佈式鎖方案時,也須要注意Redis節點之間的數據同步問題。

紅鎖的實現

在Redisson框架中,實現了紅鎖的機制,Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也能夠用來將多個RLock對象關聯爲一個紅鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。當紅鎖中超過半數的RLock加鎖成功後,纔會認爲加鎖是成功的,這就提升了分佈式鎖的高可用。

咱們可使用Redisson框架來實現紅鎖。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

其實,在實際場景中,紅鎖是不多使用的。這是由於使用了紅鎖後會影響高併發環境下的性能,使得程序的體驗更差。因此,在實際場景中,咱們通常都是要保證Redis集羣的可靠性。同時,使用紅鎖後,當加鎖成功的RLock個數不超過總數的一半時,會返回加鎖失敗,即便在業務層面任務加鎖成功了,可是紅鎖也會返回加鎖失敗的結果。另外,使用紅鎖時,須要提供多套Redis的主從部署架構,同時,這多套Redis主從架構中的Master節點必須都是獨立的,相互之間沒有任何數據交互。

高併發「黑科技」與致勝奇招

假設,咱們就是使用Redis來實現分佈式鎖,假設Redis的讀寫併發量在5萬左右。咱們的商城業務須要支持的併發量在100萬左右。若是這100萬的併發所有打入Redis中,Redis極可能就會掛掉,那麼,咱們如何解決這個問題呢?接下來,咱們就一塊兒來探討這個問題。

在高併發的商城系統中,若是採用Redis緩存數據,則Redis緩存的併發處理能力是關鍵,由於不少的前綴操做都須要訪問Redis。而異步削峯只是基本的操做,關鍵仍是要保證Redis的併發處理能力。

解決這個問題的關鍵思想就是:分而治之,將商品庫存分開放。

暗度陳倉

咱們在Redis中存儲商品的庫存數量時,能夠將商品的庫存進行「分割」存儲來提高Redis的讀寫併發量。

例如,原來的商品的id爲10001,庫存爲1000件,在Redis中的存儲爲(10001, 1000),咱們將原有的庫存分割爲5份,則每份的庫存爲200件,此時,咱們在Redia中存儲的信息爲(10001_0, 200),(10001_1, 200),(10001_2, 200),(10001_3, 200),(10001_4, 200)。

此時,咱們將庫存進行分割後,每一個分割後的庫存使用商品id加上一個數字標識來存儲,這樣,在對存儲商品庫存的每一個Key進行Hash運算時,得出的Hash結果是不一樣的,這就說明,存儲商品庫存的Key有很大機率不在Redis的同一個槽位中,這就可以提高Redis處理請求的性能和併發量。

分割庫存後,咱們還須要在Redis中存儲一份商品id和分割庫存後的Key的映射關係,此時映射關係的Key爲商品的id,也就是10001,Value爲分割庫存後存儲庫存信息的Key,也就是10001_0,10001_1,10001_2,10001_3,10001_4。在Redis中咱們可使用List來存儲這些值。

在真正處理庫存信息時,咱們能夠先從Redis中查詢出商品對應的分割庫存後的全部Key,同時使用AtomicLong來記錄當前的請求數量,使用請求數量對從Redia中查詢出的商品對應的分割庫存後的全部Key的長度進行求模運算,得出的結果爲0,1,2,3,4。再在前面拼接上商品id就能夠得出真正的庫存緩存的Key。此時,就能夠根據這個Key直接到Redis中獲取相應的庫存信息。

同時,咱們能夠將分隔的不一樣的庫存數據分別存儲到不一樣的Redis服務器中,進一步提高Redis的併發量。

移花接木

在高併發業務場景中,咱們能夠直接使用Lua腳本庫(OpenResty)從負載均衡層直接訪問緩存。

這裏,咱們思考一個場景:若是在高併發業務場景中,商品被瞬間搶購一空。此時,用戶再發起請求時,若是系統由負載均衡層請求應用層的各個服務,再由應用層的各個服務訪問緩存和數據庫,其實,本質上已經沒有任何意義了,由於商品已經賣完了,再經過系統的應用層進行層層校驗已經沒有太多意義了!!而應用層的併發訪問量是以百爲單位的,這又在必定程度上會下降系統的併發度。

爲了解決這個問題,此時,咱們能夠在系統的負載均衡層取出用戶發送請求時攜帶的用戶id,商品id和活動id等信息,直接經過Lua腳本等技術來訪問緩存中的庫存信息。若是商品的庫存小於或者等於0,則直接返回用戶商品已售完的提示信息,而不用再通過應用層的層層校驗了。

寫在最後

若是以爲文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公衆號,跟冰河學習高併發編程技術。

最後,附上併發編程須要掌握的核心技能知識圖,祝你們在學習併發編程時,少走彎路。

相關文章
相關標籤/搜索