前面講了redis的使用,本篇聊聊如何使用利用redis的客戶端Redisson去實現分佈式鎖,而且分析他的源碼,剖析如何實現,源碼中包含一些點,咱們也會聊到html
- Lua腳本
- Redis的Pub&Sub
- 時間輪
實際上分佈式鎖和咱們以前講的排它鎖同樣(同一時間只能有一個線程/進程訪問一個資源),只不是以前的Synchronize和ReentrantLock是一種線程之間的,而分佈式鎖是進程之間的,咱們看他們兩種實現鎖的方式的時候發現,他們都是有一個標識去存儲是否能夠訪問,MarkWord以及AQS中鎖的狀態,那咱們是否是也能夠把把鎖的狀態存儲在一個地方,當咱們要訪問共享資源的時候去查詢是否有進程正在佔用鎖?是的Redis就是這樣一個第三方的東西,因此咱們能夠用它來作分佈式鎖。一樣能夠作分佈式鎖的第三方還有:MySQL、ZK、Etcd 等。以前講到setNx能夠實現分佈式鎖,setNx咱們說能夠知足共享互斥(當咱們設置的時候,若是有其餘進程正在更改他則返回0),而且知足是原子性,只要知足這個兩個特徵就能實現鎖的機制,前面咱們聊到CAS的時候也是同樣的原理特性。redis
public class RedisLockMain { private static RedissonClient redissonClient; static{ Config config=new Config(); config.useSingleServer().setAddress("redis://ip:6379"); redissonClient= Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateRepo"); for (int i = 0; i < 10; i++) { if(rLock.tryLock()){ //返回true,表示得到鎖成功 System.out.println("得到鎖成功"); }else{ System.out.println("得到鎖失敗"); } Thread.sleep(2000); rLock.unlock(); } } }
加鎖:數據庫
- 使用腳本進行判斷咱們要加鎖的那個 key,
- 不存在的話,進行加鎖,
- 加鎖的的時候會存儲當前的線程id,默認 這個鎖 key 的生存時間是 30 秒。
互斥鎖(若是此時有第二個客戶端請求加鎖):數組
- 由於他們執行的都是同一個Lua,首先仍是判斷key,
- 發現已經已經有人加鎖了,因此他就會執行Lua的下一行,會返回一個當前想操做鎖的過時時間。
- 若是得到鎖是失敗,那就對使用channel訂閱釋放鎖的事件,
- 若是得到了鎖的通知,則開始對鎖進行不斷的循環獲取
- 循環中嘗試得到鎖,而且得到鎖的剩餘時間,
- 若是拿到了鎖,就直接返回,沒有拿到鎖,那就繼續等待
鎖的續期機制:
由於怕產生死鎖,因此每個鎖都有過時時間,可是程序若是執行的時間,比過時的時間還要長,簡而言之就是,好比過時時間是30s,而程序執行了32s,這個時候可能別的進程就會搶到鎖,那就有可能兩個進程同時執行一個邏輯,那就有問題,這裏就有一個續約機制,只要咱們搶到鎖那就有會啓動一個續約機制,叫作看門狗(Watch Dog )底層是時間輪,下面會講,他其實就是一個定時任務,當咱們的得到鎖後,他會把會將持有鎖的線程放入到一個
RedissonLock.EXPIRATION_RENEWAL_MAP
裏面,每過10s看門狗就去咱們存儲得到鎖的線程id的map中進行遍歷,而後拿他去redis中查詢,看他是否還在持有(持有的話就證實程序還在運行),若是持有,看門狗就會延長鎖的時間app釋放鎖:分佈式
- 刪除鎖
- 廣播釋放鎖的消息
- 取消 Watch Dog 機制
嘗試搶佔鎖ide
//帶有超時時間的邏輯 @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //嘗試得到鎖 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } //下面邏輯競爭鎖的時候再看 }
搶佔鎖邏輯源碼分析
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //leaseTime就是租約時間,就是redis key的過時時間。 //若是設置了過時時間 if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //若是沒設置了過時時間,則從配置中獲取key超時時間,默認是30s過時 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //當tryLockInnerAsync執行結束後,觸發下面回調 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired //表示第一次設置鎖鍵 if (ttlRemaining == null) { //表示設置過超時時間,更新internalLockLeaseTime, 並返回 if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //leaseTime=-1,啓動Watch Dog scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
LUA腳本實現加鎖的操做ui
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
鎖的釋放流程this
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
Redis提供了一組命令可讓開發者實現「發佈/訂閱」模式(publish/subscribe) . 該模式一樣能夠實現進程間的消息傳遞:發佈/訂閱模式包含兩種角色, 分別是發佈者和訂閱者。訂閱者能夠訂閱一個或多個頻道,而發佈者能夠向指定的頻道發送消息,全部訂閱此頻道的訂閱者都會收到該消息發佈者發佈消息的命令:PUBLISH, 用法是 :PUBLISH channel message好比向channel.1發一條消息:hello ->PUBLISH channel.1 「hello」 這樣就實現了消息的發送,該命令的返回值表示接收到這條消息的訂閱者數量訂閱者訂閱消息的命令是:SUBSCRIBE channel [channel …]好比訂閱channel.1 SUBSCRIBE channel.1
//帶有超時時間的邏輯 @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); //去訂閱,若是搶佔分佈式鎖的線程釋放了鎖,這邊就會收到這個消息 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超 過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { //取消訂閱 if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } //表示搶佔鎖失敗 acquireFailed(waitTime, unit, threadId); return false; } //收到訂閱的消息後,走這裏的邏輯 try { //判斷是否超時,若是等待超時,返回獲的鎖失敗 time -= System.currentTimeMillis() - current; //若是time小於零咱們競爭鎖是失敗的 if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //這裏不斷的循環是搶佔鎖 while (true) { long currentTime = System.currentTimeMillis(); //搶佔鎖的時候會返回一個過時時間 ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired //若是是空則表示得到鎖 if (ttl == null) { return true; } //判斷是否超時,若是超時,表示獲取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 經過信號量(共享鎖)阻塞,等待解鎖消息. (減小申請鎖調用的頻率) // 若是剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取 一個許可(除非被中斷或者一直沒有可用的許可)。 // 不然就在wait time 時間範圍內等待能夠經過信號量 currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新等待時間(最大等待時間-已經消耗的阻塞時間) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { //取消訂閱 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //若是沒設置了過時時間,則從配置中獲取key超時時間,默認是30s過時 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } return ttlRemainingFuture; }
//這裏指的是續約機制 protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); //EXPIRATION_RENEWAL_MAP表示的是須要續約的key ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { // 第一次加鎖的時候會調用,內部會啓動WatchDog entry.addThreadId(threadId); renewExpiration(); } }
//真正續約 private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //這裏使用的是一個時間輪的機制 //至關於咱們去添加一個過時時間的任務,延遲10s鍾去執行一個這個任務 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { //從這個map中獲取一個過時的entry ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { //當方法執行完成後,再次調用,這樣就成了一個週期執行了 // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
//若是這個key不爲空,則給他增長一個時間 protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
他至關於JavaScript同樣,是一種腳本語言,在Redis中咱們能夠把多個Redis操做封裝成一個LUA指令,從而能夠保證原子性。
EVAL命令-執行腳本 :EVAL] [腳本內容] [key參數的數量] [key …] [arg …]
好比咱們要給redis上存儲一個key爲lua value爲hello 那語句就是:eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
咱們經過lua腳原本實現一個訪問頻率限制功能:【tonumber 只是將傳遞進來的數值轉換爲數字而已】@RestController public class LuaController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA="local times=redis.call('incr',KEYS[1])\n" + "if times==1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end \n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id")Integer id) throws ExecutionException, InterruptedException { RScript rScript=redissonClient.getScript(); List<Object> keys= Arrays.asList("LIMIT:"+id); //設置10s即爲過時 能夠訪問三次 【腳本類型-讀寫類型】 【LUA腳本】 【返回類型】 【keys】 【value】 RFuture<Object> future=rScript.evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }這樣每隔10s就能夠從新訪問
LUA的原子性:他真的太暴力了!! 當redis正在執行一段lua命令的時候,咱們不能對Redis進行任何操做!! 這就是原子性的緣由。Redis提供了lua-time-limit參數限制腳本的最長運行時間,默認是5秒鐘。咱們能夠配置這個來防止沒法訪問redis.當腳本運行時間超過這個限制後,Redis將開始接受其餘命令但不會執行(以確保腳本的原子性),而是返回BUSY的錯誤,咱們可使用script kill 的命令終止當前執行的腳本,而後redis即恢復正常。
咱們上面說到的看門狗就用到了時間輪,不斷的查看時間。時間輪是由一個環形數組組成,能夠想象它是一個時間表盤,每一個數字時間的間隔就是時間槽,這個槽叫作bucket,槽裏面就是我們須要執行的任務。咱們能夠設置他的時間槽,和每一個槽的時間單位,好比設置他爲1s,設置8個時間槽。而後這個時間輪就開始執行,那整個流程執行完成就是8s。執行的時候,依次訪問每一個數組元素,而後執行元素中我們的任務。咱們可使用他去實現定時關單,由於有時候訂單交易量特別多,直接去輪詢數據的效率有點低,這裏直接使用時間輪,就省去了數據庫的壓力。圖是copy的。
使用:
先構建一個HashedWheelTimer時間輪
- tickDuration: 100 ,表示每一個時間格表明當前時間輪的基本時間跨度,這裏是100ms,也就是指針100ms跳動一次,每次跳動一個窗格
- ticksPerWheel:1024,表示時間輪上一共有多少個窗格,分配的窗格越多,佔用內存空間就越大
- leakDetection:是否開啓內存泄漏檢測。
- maxPendingTimeouts[可選參數],最大容許等待的任務數,默認沒有限制
經過newTimeout()把須要延遲執行的任務添加到時間輪中- 下面的任務就會根據我們傳遞進來的數據進行延遲執行。
@RestController @RequestMapping("/timer") public class HashedWheelController { //時間輪的定義 HashedWheelTimer hashedWheelTimer=new HashedWheelTimer( new DefaultThreadFactory("demo-timer"), 100, TimeUnit.MILLISECONDS,1024,false); @GetMapping("/{delay}") public void tick(@PathVariable("delay")Long delay){ //SCHEDULED(定時執行的線程) //Timer(Java原生定時任務執行) //訂單關單 System.out.println("CurrentTime:"+new Date()); hashedWheelTimer.newTimeout(timeout -> { System.out.println("Begin Execute:"+new Date()); },delay,TimeUnit.SECONDS); } }時間輪的原理解析建立時間輪:時間輪本質上是一個環狀數組,好比咱們初始化時間輪時:ticksPerWheel=8,那麼意味着這個環狀數組的長度是8。HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];添加任務當經過newTimeout()方法添加一個延遲任務時,該任務首先會加入到一個阻塞隊列中中。而後會有一個定時任務從該隊列獲取任務,添加到時間輪的指定位置:
- 根據任務的延遲時間計算要多少個tick才能執行
- 計算當前任務須要轉動多少圈才能執行
- 經過ticks取模mask,獲得一個下標
- 把任務添加到指定數組下標位置
//當前任務的開始執行時間除以每一個窗口的時間間隔,獲得一個calculated值(表示須要通過多少 tick,指針沒跳動一個窗格,tick會遞增),單位爲nanos(微毫秒) long calculated = timeout.deadline / tickDuration; //計算當前任務須要在時間輪中經歷的圈數,由於當前任務執行時間有可能大於完整一圈的時間,因此 須要計算通過幾圈以後才能執行該任務。 timeout.remainingRounds = (calculated - tick) / wheel.length; //取最大的一個tick,有可能當前任務在隊列中已通過了執行時間,這種狀況下直接用calculated這 個值就沒意義了。 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. int stopIndex = (int) (ticks & mask); //經過ticks取模mask,獲得一個下標 HashedWheelBucket bucket = wheel[stopIndex]; //把任務添加到指定數組下標位置任務執行:
- Worker線程按照每次間隔時間轉動後,獲得該時間窗格中的任務鏈表,而後從鏈表的head開始逐個取出任務,有兩個判斷條件:
- 當前任務須要轉動的圈數爲0,表示任務是當前圈開始執行
- 當前任務達到了delay時間,也就是 timeout.deadline <= deadline
- 最終調用timeout.expire()方法執行任務。