此次咱們來簡單說說分佈式鎖,我記得過去我也過一篇JMM的內存一致性算法,就是說拿到鎖的能夠繼續操做,沒拿到的自旋等待。html
思路與場景node
咱們在Zookeeper中提到過度布式鎖,這裏咱們先用redis實現一個簡單的分佈式鎖,這裏是咱們一個簡單的售賣減庫存的小實例,剩餘庫存假設存在數據庫內。web
@GetMapping(value = "/getLock") public String getLock() { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; }else{ System.out.println("剩餘庫存不足"); return "fail"; } }
這樣簡單的實現了一個售賣的過程,如今看來確實沒什麼問題的,可是若是是一個併發下的場景就可能會出現超賣的狀況了,咱們來改造一下代碼。redis
@GetMapping(value = "/getLock") public String getLock() { synchronized (this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } }
貌似這回就能夠了,能夠抗住高併發了,可是新的問題又來了,咱們若是是分佈式的場景下,synchronized關鍵字是不起做用的啊。也就是說仍是會出現超賣的狀況的啊,咱們再來改造一下算法
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai");//至關於咱們的setnx命令 if(!bool){ return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); stringRedisTemplate.delete(lockKey); return "success"; } else { System.out.println("剩餘庫存不足"); stringRedisTemplate.delete(lockKey); return "fail"; } }
此次咱們看來基本能夠了,使用咱們的setnx命令來作一次惟一的限制,萬一報錯了呢?解鎖怎麼辦?再來改造一下。spring
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai", 10, TimeUnit.SECONDS);//至關於咱們的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { if (bool) { stringRedisTemplate.delete(lockKey); } } }
此次貌似真的能夠了,能夠加鎖,最後在finally解鎖,若是解鎖仍是不成功,咱們還設置了咱們的超時時間,貌似完美了,咱們再來提出一個場景。數據庫
就是什麼意思呢?咱們的線程來爭搶鎖,拿到鎖的線程開始執行,可是咱們並不知道什麼時候執行完成,咱們只是設定了10秒自動釋放掉鎖,若是說咱們的線程10秒尚未結束,其它線程會拿到鎖資源,開始執行代碼,可是過了一段時間(藍色線程還未執行完成),這時咱們的綠色線程執行完畢了,開始釋放鎖資源,他釋放的其實已經不是他本身的鎖了,他本身的鎖超時了,自動釋放了,實則綠色線程釋放的藍色的資源,這也就形成了釋放其它的鎖,其它的線程又會重複的拿到鎖,重複執行該操做。明顯有點亂了,這不合理,咱們來改善一下。json
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; String lockValue = UUID.randomUUID().toString(); Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);//至關於咱們的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { if (lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } }
此次再來看一下流程,咱們設置一個UUID,設置爲鎖的值,也就是說,每次上鎖的UUID都是不一致的,咱們的線程A的鎖此次只能由咱們的線程A來釋放掉,不會形成釋放其它鎖的問題了,仍是上次的圖,咱們回過頭來看一下,10秒?真的合理嗎?萬一10秒尚未執行完成呢?有的人還會問,那設置100秒?萬一執行到delete操做的時候,服務宕機了呢?是否是還要等待100秒才能夠釋放鎖。別說那只是萬一,咱們的代碼但願達到咱們能力範圍以內的最嚴謹。此次來講一下咱們本節的其中一個重點,Lua腳本,後面會去說,咱們來先用咱們此次博文的Redisson吧api
Redissonspringboot
剛纔咱們提到了咱們鎖的時間設置,多長才是合理的,100秒?可能宕機,形成等待100秒自動釋放,1秒?線程可能執行不完,咱們可不能夠這樣來作呢?咱們設置一個30秒,或者說設置10秒,而後咱們給予一個固定時間來檢查咱們的主線程是否執行完成,執行完成再釋放咱們的鎖,思路有了,可是代碼實現起來並不簡單,彆着急,咱們已經有了現成的包供咱們使用的,就是咱們的Redisson,首先咱們來引入咱們的依賴,修改一下pom文件。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.4</version> </dependency>
而後經過@Bean的方式注入容器,三種方式我都寫在上面了。
@Bean public Redisson redisson(){ Config config = new Config(); //主從(單機) config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); //哨兵 // config.useSentinelServers().setMasterName("mymaster"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.1:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.2:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1..3:26379"); // config.useSentinelServers().setDatabase(0); // //集羣 // config.useClusterServers() // .addNodeAddress("redis://192.168.0.1:8001") // .addNodeAddress("redis://192.168.0.2:8002") // .addNodeAddress("redis://192.168.0.3:8003") // .addNodeAddress("redis://192.168.0.4:8004") // .addNodeAddress("redis://192.168.0.5:8005") // .addNodeAddress("redis://192.168.0.6:8006"); // config.useSentinelServers().setPassword("xiaocai");//密碼設置 return (Redisson) Redisson.create(config); }
若是咱們的是springboot也能夠經過配置來實現的。
application.properties
## 由於springboot-data-redis 是用到了jedis,所已這裏得配置 spring.redis.database=10 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 ## jedis 哨兵配置 spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=192.168.1.241:26379,192.168.1.241:36379,192.168.1.241:46379 spring.redis.password=admin ## 關鍵地方 redisson spring.redis.redisson.config=classpath:redisson.json
redisson.json
## redisson.json 文件 { "sentinelServersConfig":{ "sentinelAddresses": ["redis://192.168.1.241:26379","redis://192.168.1.241:36379","redis://192.168.1.241:46379"], "masterName": "mymaster", "database": 0, "password":"admin" } }
這樣咱們就創建了咱們的Redisson的鏈接了,咱們來看一下如何使用吧。
package com.redisclient.cluster; import org.redisson.Redisson; import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RedisCluster { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private Redisson redisson; @GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; RLock redissonLock = redisson.getLock(lockKey); try { redissonLock.lock(); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售賣成功,剩餘" + realStock + ""); return "success"; } else { System.out.println("剩餘庫存不足"); return "fail"; } } finally { redissonLock.unlock(); } } }
使用也是超級簡單的,Redisson還有重入鎖功能等等,有興趣的能夠去Redisson查看,地址:https://redisson.org/ 國外的地址打開可能會慢一些。Redis的分佈式鎖使用就差很少說到這裏了,咱們來回到咱們剛纔說到的Lua腳本這裏。
Lua腳本和管道
Lua腳本
lua腳本就是一個事務控制的過程,咱們能夠在lua腳本中寫一些列的命令,一次性的塞入到咱們的redis客戶端,保證了原子性,要麼都成功,要麼都失敗。好處在於減小與reidis的屢次鏈接,能夠替代redis的事務操做以及保證咱們的原子性。
String luaString = "";//Lua腳本 jedis.eval(luaString, Arrays.asList("keysList"),Arrays.asList("valueList"));
腳本我就不寫了(我也不熟悉),我來解釋一下eval的三個參數,第一個是咱們的寫好的腳本,而後咱們的腳本可能傳參數的,也就是咱們KEYS[1]或者是ARGV[4],意思就是咱們的KEYS[1]就是咱們的ArrayList("keysList")中的第一項,ARGV[4]就是咱們的ArrayList("valueList")的第四項。
管道
管道和咱們的和咱們的Lua腳本差很少,不同就是管道不會保證咱們的事務,也就是說咱們如今塞給管道10條命令 ,咱們執行到第三條時報錯了,後面的依然會執行,前面執行過的兩條仍是生效的。雖然能夠減小咱們的網絡開銷,也別一次塞太多命令進去,畢竟redis的是單線程的,不建議使用管道來操做redis,想深刻了解的能夠參照https://www.runoob.com/redis/redis-pipelining.html
redis的分佈式鎖差很少就說這麼多了,關鍵是實現思路,使用Redisson卻是很簡單的,還有咱們的Lua腳本和管道,Lua腳本能夠保證事務,管道一次性能夠執行多條命令,減小網絡開銷,但不建議使用,下次咱們來講下,大廠用redis的一些使用注意事項和優化吧。
最進弄了一個公衆號,小菜技術,歡迎你們的加入