分佈式存儲-Redisson&分佈式鎖&源碼分析

分佈式存儲-Redisson&分佈式鎖&源碼分析

前面講了redis的使用,本篇聊聊如何使用利用redis的客戶端Redisson去實現分佈式鎖,而且分析他的源碼,剖析如何實現,源碼中包含一些點,咱們也會聊到html

  • Lua腳本
  • Redis的Pub&Sub
  • 時間輪 

分佈式鎖

實際上分佈式鎖和咱們以前講的排它鎖同樣(同一時間只能有一個線程/進程訪問一個資源),只不是以前的Synchronize和ReentrantLock是一種線程之間的,而分佈式鎖是進程之間的,咱們看他們兩種實現鎖的方式的時候發現,他們都是有一個標識去存儲是否能夠訪問,MarkWord以及AQS中鎖的狀態,那咱們是否是也能夠把把鎖的狀態存儲在一個地方,當咱們要訪問共享資源的時候去查詢是否有進程正在佔用鎖?是的Redis就是這樣一個第三方的東西,因此咱們能夠用它來作分佈式鎖。一樣能夠作分佈式鎖的第三方還有:MySQL、ZK、Etcd 等。以前講到setNx能夠實現分佈式鎖,setNx咱們說能夠知足共享互斥(當咱們設置的時候,若是有其餘進程正在更改他則返回0),而且知足是原子性,只要知足這個兩個特徵就能實現鎖的機制,前面咱們聊到CAS的時候也是同樣的原理特性。redis

 Redisson實現分佈式鎖

public class RedisLockMain {

    private static RedissonClient redissonClient;

    static{
        Config config=new Config();
        config.useSingleServer().setAddress("redis://ip:6379");
        redissonClient= Redisson.create(config);
    }

    public static void main(String[] args) throws InterruptedException {
       RLock rLock=redissonClient.getLock("updateRepo");
        for (int i = 0; i < 10; i++) {
            if(rLock.tryLock()){ //返回true,表示得到鎖成功
                System.out.println("得到鎖成功");
            }else{
                System.out.println("得到鎖失敗");
            }
            Thread.sleep(2000);
            rLock.unlock();
        }

    }
}

總體梳理

加鎖:數據庫

  • 使用腳本進行判斷咱們要加鎖的那個 key,
  • 不存在的話,進行加鎖,
  • 加鎖的的時候會存儲當前的線程id,默認 這個鎖 key 的生存時間是 30 秒。

互斥鎖(若是此時有第二個客戶端請求加鎖):數組

  • 由於他們執行的都是同一個Lua,首先仍是判斷key,
  • 發現已經已經有人加鎖了,因此他就會執行Lua的下一行,會返回一個當前想操做鎖的過時時間。
  • 若是得到鎖是失敗,那就對使用channel訂閱釋放鎖的事件,
  • 若是得到了鎖的通知,則開始對鎖進行不斷的循環獲取
  • 循環中嘗試得到鎖,而且得到鎖的剩餘時間,
  • 若是拿到了鎖,就直接返回,沒有拿到鎖,那就繼續等待

鎖的續期機制:

由於怕產生死鎖,因此每個鎖都有過時時間,可是程序若是執行的時間,比過時的時間還要長,簡而言之就是,好比過時時間是30s,而程序執行了32s,這個時候可能別的進程就會搶到鎖,那就有可能兩個進程同時執行一個邏輯,那就有問題,這裏就有一個續約機制,只要咱們搶到鎖那就有會啓動一個續約機制,叫作看門狗(Watch Dog )底層是時間輪,下面會講,他其實就是一個定時任務,當咱們的得到鎖後,他會把會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP裏面,每過10s看門狗就去咱們存儲得到鎖的線程id的map中進行遍歷,而後拿他去redis中查詢,看他是否還在持有(持有的話就證實程序還在運行),若是持有,看門狗就會延長鎖的時間app

釋放鎖:分佈式

  • 刪除鎖
  • 廣播釋放鎖的消息
  • 取消 Watch Dog 機制

源碼分析

嘗試搶佔鎖ide

//帶有超時時間的邏輯
@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
      //嘗試得到鎖
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //下面邏輯競爭鎖的時候再看
   }

搶佔鎖邏輯源碼分析

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //leaseTime就是租約時間,就是redis key的過時時間。
    //若是設置了過時時間
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //若是沒設置了過時時間,則從配置中獲取key超時時間,默認是30s過時
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //當tryLockInnerAsync執行結束後,觸發下面回調
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        //表示第一次設置鎖鍵
        if (ttlRemaining == null) {
            //表示設置過超時時間,更新internalLockLeaseTime, 並返回
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //leaseTime=-1,啓動Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

LUA腳本實現加鎖的操做ui

  • 判斷lock鍵是否存在,不存在直接調用hset存儲當前線程信息而且設置過時時間,返回nil,告訴客戶端直接獲取到鎖。
  • 判斷lock鍵是否存在,存在則將重入次數加1,並從新設置過時時間,返回nil,告訴客戶端直接獲取到鎖。 
  • 被其它線程已經鎖定,返回鎖有效期的剩餘時間,告訴客戶端須要等待。
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

鎖的釋放流程this

  • 若是lock鍵不存在,經過 publish 指令發送一個消息表示鎖已經可用。
  • 若是鎖不是被當前線程鎖定,則返回nil 
  • 因爲支持可重入,在解鎖時將重入次數須要減1 
  • 若是計算後的重入次數>0,則從新設置過時時間 
  • 若是計算後的重入次數<=0,則發消息說鎖已經可用 
  •     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
        }
    View Code
Redis中的Pub/Sub機制
Redis提供了一組命令可讓開發者實現「發佈/訂閱」模式(publish/subscribe) . 該模式一樣能夠實現進程間的消息傳遞
發佈/訂閱模式包含兩種角色, 分別是發佈者和訂閱者。訂閱者能夠訂閱一個或多個頻道,而發佈者能夠向指定的頻道發送消息,全部訂閱此頻道的訂閱者都會收到該消息 
發佈者發佈消息的命令:PUBLISH, 用法是 :PUBLISH channel message 
好比向channel.1發一條消息:hello  ->PUBLISH channel.1 「hello」 這樣就實現了消息的發送,該命令的返回值表示接收到這條消息的訂閱者數量
訂閱者訂閱消息的命令是:SUBSCRIBE channel [channel …] 
好比訂閱channel.1 SUBSCRIBE channel.1
 RedissonLock有競爭的狀況 有競爭的狀況在redis端的lua腳本是相同的,只是不一樣的條件執行不一樣的redis命令。當經過tryAcquire,發現鎖被其它線程申請時,須要進入等待競爭邏輯中
  • this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗
  • this.await返回true,進入循環嘗試獲取鎖。
//帶有超時時間的邏輯
@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
   
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //去訂閱,若是搶佔分佈式鎖的線程釋放了鎖,這邊就會收到這個消息
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超 過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
             //取消訂閱
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            //表示搶佔鎖失敗
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        //收到訂閱的消息後,走這裏的邏輯
        try {
            //判斷是否超時,若是等待超時,返回獲的鎖失敗
            time -= System.currentTimeMillis() - current;
            //若是time小於零咱們競爭鎖是失敗的
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            //這裏不斷的循環是搶佔鎖
            while (true) {
                long currentTime = System.currentTimeMillis();
                //搶佔鎖的時候會返回一個過時時間
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                //若是是空則表示得到鎖
                if (ttl == null) {
                    return true;
                }
                //判斷是否超時,若是超時,表示獲取鎖失敗
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // 經過信號量(共享鎖)阻塞,等待解鎖消息. (減小申請鎖調用的頻率) // 若是剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取 一個許可(除非被中斷或者一直沒有可用的許可)。
                // 不然就在wait time 時間範圍內等待能夠經過信號量
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
// 更新等待時間(最大等待時間-已經消耗的阻塞時間)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            //取消訂閱
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
鎖過時了怎麼辦?
通常來講,咱們去得到分佈式鎖時,爲了不死鎖的狀況,咱們會對鎖設置一個超時時間,可是有一種狀況是,若是在指定時間內當前線程沒有執行完,因爲鎖超時致使鎖被釋放,那麼其餘線程就會拿到這
把鎖,從而致使一些故障。爲了不這種狀況, Redisson引入了一個Watch Dog機制,這個機制是針對分佈式鎖來實現鎖的自動續約,簡單來講,若是當前得到鎖的線程沒有執行完,那麼Redisson會自動給Redis中目標key延長超時時間。默認狀況下,看門狗的續期時間是30s,也能夠經過修改Confifig.lockWatchdogTimeout來另行指定。
 實際上,當咱們經過tryLock方法沒有傳遞超時時間時, 默認會設置一個30s的超時時間,避免出現死鎖的問題。 
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
  
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //若是沒設置了過時時間,則從配置中獲取key超時時間,默認是30s過時
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
   
    return ttlRemainingFuture;
}
因爲默認設置了一個30s的過時時間,爲了防止過時以後當前線程還未執行完, 因此經過定時任務對過時時間進行續約
  • 會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要仍是判斷在這個服務實例中的加鎖客戶端的鎖key是否存在,
  • 若是已經存在了,就直接返回;主要是考慮到RedissonLock是可重入鎖
//這裏指的是續約機制
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    //EXPIRATION_RENEWAL_MAP表示的是須要續約的key
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        // 第一次加鎖的時候會調用,內部會啓動WatchDog
        entry.addThreadId(threadId);
        renewExpiration();
    }
}
定義一個定時任務(時間輪),該任務中調用 renewExpirationAsync 方法進行續約。
//真正續約
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    //這裏使用的是一個時間輪的機制
    //至關於咱們去添加一個過時時間的任務,延遲10s鍾去執行一個這個任務
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            //從這個map中獲取一個過時的entry
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    //當方法執行完成後,再次調用,這樣就成了一個週期執行了
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
執行Lua腳本,對指定的key進行續約。 
//若是這個key不爲空,則給他增長一個時間
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

LUA腳本

他至關於JavaScript同樣,是一種腳本語言,在Redis中咱們能夠把多個Redis操做封裝成一個LUA指令,從而能夠保證原子性。 

EVAL命令-執行腳本 :EVAL] [腳本內容] [key參數的數量] [key …] [arg …]

好比咱們要給redis上存儲一個key爲lua value爲hello 那語句就是:eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

咱們經過lua腳原本實現一個訪問頻率限制功能:【tonumber 只是將傳遞進來的數值轉換爲數字而已
@RestController
public class LuaController {

    @Autowired
    RedissonClient redissonClient;

    private final String LIMIT_LUA="local times=redis.call('incr',KEYS[1])\n" +
            "if times==1 then\n" +
            "    redis.call('expire',KEYS[1],ARGV[1])\n" +
            "end\n" +
            "if times > tonumber(ARGV[2]) then\n" +
            "    return 0\n" +
            "end \n" +
            "return 1";


    @GetMapping("/lua/{id}")
    public String lua(@PathVariable("id")Integer id) throws ExecutionException, InterruptedException {
        RScript rScript=redissonClient.getScript();
        List<Object> keys= Arrays.asList("LIMIT:"+id);
        //設置10s即爲過時  能夠訪問三次 【腳本類型-讀寫類型】  【LUA腳本】 【返回類型】 【keys】 【value】 
        RFuture<Object> future=rScript.evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);
        return future.get().toString();
    }
}

這樣每隔10s就能夠從新訪問

LUA的原子性:他真的太暴力了!! 當redis正在執行一段lua命令的時候,咱們不能對Redis進行任何操做!! 這就是原子性的緣由。Redis提供了lua-time-limit參數限制腳本的最長運行時間,默認是5秒鐘。咱們能夠配置這個來防止沒法訪問redis.當腳本運行時間超過這個限制後,Redis將開始接受其餘命令但不會執行(以確保腳本的原子性),而是返回BUSY的錯誤,咱們可使用script kill 的命令終止當前執行的腳本,而後redis即恢復正常。

 時間輪

咱們上面說到的看門狗就用到了時間輪,不斷的查看時間。時間輪是由一個環形數組組成,能夠想象它是一個時間表盤,每一個數字時間的間隔就是時間槽,這個槽叫作bucket,槽裏面就是我們須要執行的任務。咱們能夠設置他的時間槽,和每一個槽的時間單位,好比設置他爲1s,設置8個時間槽。而後這個時間輪就開始執行,那整個流程執行完成就是8s。執行的時候,依次訪問每一個數組元素,而後執行元素中我們的任務。咱們可使用他去實現定時關單,由於有時候訂單交易量特別多,直接去輪詢數據的效率有點低,這裏直接使用時間輪,就省去了數據庫的壓力。圖是copy的。

使用:

先構建一個HashedWheelTimer時間輪
  • tickDuration: 100 ,表示每一個時間格表明當前時間輪的基本時間跨度,這裏是100ms,也就是指針100ms跳動一次,每次跳動一個窗格
  • ticksPerWheel:1024,表示時間輪上一共有多少個窗格,分配的窗格越多,佔用內存空間就越大
  • leakDetection:是否開啓內存泄漏檢測。
  • maxPendingTimeouts[可選參數],最大容許等待的任務數,默認沒有限制
  • 經過newTimeout()把須要延遲執行的任務添加到時間輪中
  • 下面的任務就會根據我們傳遞進來的數據進行延遲執行
@RestController
@RequestMapping("/timer")
public class HashedWheelController {

    //時間輪的定義
    HashedWheelTimer hashedWheelTimer=new HashedWheelTimer(
            new DefaultThreadFactory("demo-timer"),
            100, TimeUnit.MILLISECONDS,1024,false);

    @GetMapping("/{delay}")
    public void tick(@PathVariable("delay")Long delay){
        //SCHEDULED(定時執行的線程)
        //Timer(Java原生定時任務執行)
        //訂單關單
        System.out.println("CurrentTime:"+new Date());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("Begin Execute:"+new Date());
        },delay,TimeUnit.SECONDS);
    }
}
時間輪的原理解析
建立時間輪:時間輪本質上是一個環狀數組,好比咱們初始化時間輪時:ticksPerWheel=8,那麼意味着這個環狀數組的長度是8。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
添加任務
當經過newTimeout()方法添加一個延遲任務時,該任務首先會加入到一個阻塞隊列中中。而後會有一個定時任務從該隊列獲取任務,
添加到時間輪的指定位置:
  •   根據任務的延遲時間計算要多少個tick才能執行
  •   計算當前任務須要轉動多少圈才能執行
  •        經過ticks取模mask,獲得一個下標
  •   把任務添加到指定數組下標位置
//當前任務的開始執行時間除以每一個窗口的時間間隔,獲得一個calculated值(表示須要通過多少 tick,指針沒跳動一個窗格,tick會遞增),單位爲nanos(微毫秒) 
long calculated = timeout.deadline / tickDuration;
//計算當前任務須要在時間輪中經歷的圈數,由於當前任務執行時間有可能大於完整一圈的時間,因此 須要計算通過幾圈以後才能執行該任務。
timeout.remainingRounds = (calculated - tick) / wheel.length; //取最大的一個tick,有可能當前任務在隊列中已通過了執行時間,這種狀況下直接用calculated這 個值就沒意義了。 
final long ticks = Math.max(calculated, tick);
// Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask); //經過ticks取模mask,獲得一個下標 
HashedWheelBucket bucket = wheel[stopIndex]; //把任務添加到指定數組下標位置
任務執行:
  •   Worker線程按照每次間隔時間轉動後,獲得該時間窗格中的任務鏈表,而後從鏈表的head開始逐個取出任務,有兩個判斷條件:
  •   當前任務須要轉動的圈數爲0,表示任務是當前圈開始執行
  •   當前任務達到了delay時間,也就是 timeout.deadline <= deadline
  •   最終調用timeout.expire()方法執行任務。

 

相關文章
相關標籤/搜索