在當今這個時代,單體應用(standalone)已經不多了,java提供的synchronized已經不能知足需求,你們天然 而然的想到了分佈式鎖。談到分佈式鎖,比較流行的方法有3中:html
今天咱們重點說一下基於redis的分佈式鎖,redis分佈式鎖的實現咱們能夠參照redis的官方文檔。 實現Redis分佈式鎖的最簡單的方法就是在Redis中建立一個key,這個key有一個失效時間(TTL),以保證鎖最終會被自動釋放掉。當客戶端釋放資源(解鎖)的時候,會刪除掉這個key。java
獲取鎖使用命令:git
SET resource_name my_random_value NX PX 30000
這個命令僅在不存在key的時候才能被執行成功(NX選項),而且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是「my_random_value」(一個隨機值), 這個值在全部的客戶端必須是惟一的,全部同一key的獲取者(競爭者)這個值都不能同樣。github
value的值必須是隨機數主要是爲了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在而且存儲的值和我指定的值同樣才能告訴我刪除成功。redis
能夠經過如下Lua腳本實現:數據庫
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
使用這種方式釋放鎖能夠避免刪除別的客戶端獲取成功的鎖。舉個例子:客戶端A取得資源鎖,可是緊接着被一個其餘操做阻塞了,當客戶端A運行完畢其餘操做後要釋放鎖時, 原來的鎖早已超時而且被Redis自動釋放,而且在這期間資源鎖又被客戶端B再次獲取到。若是僅使用DEL命令將key刪除,那麼這種狀況就會把客戶端B的鎖給刪除掉。 使用Lua腳本就不會存在這種狀況,由於腳本僅會刪除value等於客戶端A的value的key(value至關於客戶端的一個簽名)。安全
這種方法已經足夠安全,若是擔憂redis故障轉移時,鎖失效的問題,請參照Redis官方文檔中的RedLock,這裏不作具體討論。app
知道了Redis鎖的實現原理,咱們再來看看如何實現。其實關鍵的步驟只有兩步:dom
你們在寫程序的時候是否是總忘記釋放鎖呢?就像之前對流操做時,忘記了關閉流。從java7開始,加入了try-with-resources的方式,它能夠 自動的執行close()方法,釋放資源,不再用寫finally塊了。咱們就按照這種思路編寫Redis鎖,在具體寫代碼以前,咱們先談談 Redis的客戶端,Redis的客戶端官方推薦有3種:分佈式
Redis官方比較推薦Redisson,可是Spring-data中並無這種方式,Spring-Data-Redis支持Jedis和Lecttuce兩種方式。 國內用的比較多的是Jedis,可是Spring-Data默認用Lecttuce。無論那麼多了,直接用Spring-Boot,配置好鏈接,直接使用就行了。
Redis鎖的try-with-resources實現:
public class RedisLock implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class); private RedisTemplate redisTemplate; private String lockKey; private String lockValue; private int expireTime; public RedisLock(RedisTemplate redisTemplate,String lockKey,String lockValue,int expireTime){ this.redisTemplate = redisTemplate; //redis key this.lockKey = lockKey; //redis value this.lockValue = lockValue; //過時時間 單位:s this.expireTime = expireTime; } /** * 獲取分佈式鎖 */ public boolean getLock(){ //獲取鎖的操做 return (boolean) redisTemplate.execute((RedisCallback) connection -> { //過時時間 單位:s Expiration expiration = Expiration.seconds(expireTime); //執行NX操做 SetOption setOption = SetOption.ifAbsent(); //序列化key byte[] serializeKey = redisTemplate.getKeySerializer().serialize(lockKey); //序列化value byte[] serializeVal = redisTemplate.getValueSerializer().serialize(lockValue); //獲取鎖 boolean result = connection.set(serializeKey, serializeVal, expiration, setOption); LOGGER.info("獲取redis鎖結果:" + result); return result; }); } /** * 自動釋放鎖 * @throws IOException */ @Override public void close() throws IOException { //釋放鎖的lua腳本 String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; RedisScript<String> redisScript = RedisScript.of(script,Boolean.class); //是否redis鎖 Boolean result = (Boolean) redisTemplate.execute(redisScript, Arrays.asList(lockKey), lockValue); LOGGER.info("釋放redis鎖結果:"+result); } }
只要實現了Closeable
接口,並重寫了close()
方法,就可使用try-with-resources的方式了。
具體的使用代碼以下:
@SpringBootApplication public class Application { private static final Logger LOGGER = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class, args); RedisTemplate redisTemplate = applicationContext.getBean("redisTemplate",RedisTemplate.class); try (RedisLock lock = new RedisLock(redisTemplate,"test_key","test_val",60)){ //獲取鎖 if (lock.getLock()){ //模擬執行業務 Thread.sleep(5*1000); LOGGER.info("獲取到鎖,執行業務操做耗時5s"); } }catch (Exception e){ LOGGER.error(e.getMessage(),e); } } }
這樣咱們就不用關心鎖的釋放問題了。