在單機應用的場景下,咱們常使用的鎖主要是synchronized與Lock;可是在分佈式橫行的大環境下,顯然僅僅這兩種鎖已經沒法知足咱們的需求;git
需求:秒殺場景下,有若干服務實例,假設有2個,那麼分別會有若干請求分別請求這2個服務實例。要求只能有一個請求秒殺成功,本質是秒殺方法在同一時間內只能被同一個線程執行,這就須要使用到分佈式鎖。redis
咱們已知道set用於設置String類型的key/value值,以下:數據庫
127.0.0.1:6379> set name gaoyuan OK 127.0.0.1:6379> get name "gaoyuan"
在redis2.6.12版本以前,分佈式鎖常使用setnx來實現。setnx是set if not exists
的意思,也就是當值不存在時,才能夠建立成功,這樣就能保證在同一時間只能有個設置成功。緩存
可是,setnx沒法在插入值的同時設置超時時間,setnx 與 expire 是兩條獨立的語句,這樣加鎖操做就是非原子性的,那麼就會帶來問題。(好比,當setnx成功後,準備執行expire前,程序忽然出現錯誤,則添加的數據就沒法清除了,由於沒有超時時間,不會自動清除)微信
在redis2.6.12版本以後,redis支持經過set在設置值得同時設置超時時間,此操做是原子操做。dom
// 設置lock的值爲123,存在6秒 127.0.0.1:6379> set lock 123 EX 6 NX OK // 6秒內,重複設置lock的值爲123,返回nil(也就是null) 127.0.0.1:6379> set lock 123 EX 6 NX (nil) // 6秒內,獲取值,可以獲取到 127.0.0.1:6379> get lock "123" // 6秒後,獲取值,獲取爲nil,又能夠從新set值了 127.0.0.1:6379> get lock (nil)
下面咱們利用set的特性來實現分佈式鎖。分佈式
咱們先構造一個對象 MyThread
ide
class MyThread implements Runnable{ int i = 0; @Override public void run() { try { for(int j=0;j<10;j++){ i = i + 1; // 這裏延時,爲了讓其餘線程進行干擾 TimeUnit.MILLISECONDS.sleep(10); i = i - 1; System.out.println("i=" + i); } }catch (Exception e){ e.printStackTrace(); } } }
執行lua
ExecutorService executorService = Executors.newFixedThreadPool(3); MyThread myThread = new MyThread(); executorService.submit(myThread); executorService.submit(myThread); executorService.submit(myThread); executorService.shutdown();
輸出.net
i=0 i=0 i=0 i=3 i=3 i=3 i=4 i=4 ...
能夠看出,i竟然會出現不等於0的狀況。
獲取鎖的方法
/** * 獲取鎖 * 利用set key value [EX seconds] [PX milliseconds] [NX|XX] 命令實現鎖機制 * @author GaoYuan */ public static String tryLock(Jedis jedis, int timeout) throws Exception{ if(timeout == 0){ timeout = 5000; } String returnId = null; // 生成隨機標識 String id = UUID.randomUUID().toString(); // 設置鎖超時10秒 int lockExpireMs = 10000; long startTime = System.currentTimeMillis(); // 超時時間內循環獲取 while ((System.currentTimeMillis() - startTime) < timeout){ String result = jedis.set(lockKey, id, "NX", "PX", lockExpireMs); if(result != null){ returnId = id; break; } TimeUnit.MILLISECONDS.sleep(100); } if(returnId == null){ // 獲取鎖超時,拋出異常 throw new Exception("獲取鎖超時"); } // 將set的值返回,用於後續的解鎖 return returnId; }
釋放鎖的方法(釋放鎖的方式有兩種)
釋放方法一:
/** * 釋放鎖 - 利用redis的watch + del * @author GaoYuan */ public static boolean unLock(Jedis jedis, String id){ boolean result = false; while(true){ if(jedis.get(lockKey) == null){ return false; } // 配置監聽 jedis.watch(lockKey); // 這裏確保是加鎖者進行解鎖 if(id!=null && id.equals(jedis.get(lockKey))){ Transaction transaction = jedis.multi(); transaction.del(lockKey); List<Object> results = transaction.exec(); if(results == null){ continue; } result = true; } // 釋放監聽 jedis.unwatch(); break; } return result; }
釋放方法二:
/** * 釋放鎖 - 利用lua腳本 * @author GaoYuan */ public static boolean unLockByLua(Jedis jedis, String id){ String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(id)); if (Objects.equals(1, result)) { return true; } return false; }
改造以前的例子
class MyThread implements Runnable{ int i = 0; @Override public void run() { try { for(int j=0;j<10;j++){ Jedis jedis = new Jedis(JedisConfig.HOST, JedisConfig.PORT); try { // 嘗試獲取鎖,有超時時間 String id = RedisLock.tryLock(jedis,5000); i = i + 1; // 這裏延時,爲了讓其餘線程進行干擾(固然,加鎖就不會有干擾) TimeUnit.MILLISECONDS.sleep(10); i = i - 1; // 加鎖後,指望值 i=0 System.out.println("i=" + i); // 釋放鎖 RedisLock.unLock(jedis, id); }catch (Exception e){ // e.printStackTrace(); System.out.println("獲取鎖超時"); } } }catch (Exception e){ e.printStackTrace(); } } }
運行輸出
i=0 i=0 i=0 i=0 i=0 i=0 ...
將run方法中的延時時間設置成1秒(1000)後,會打印超時的狀況
i=0 i=0 i=0 獲取鎖超時 獲取鎖超時 i=0 ...
至此利用jedis實現了分佈式鎖。
完整代碼見: https://gitee.com/gmarshal/foruo-demo/tree/master/foruo-demo-redis/foruo-demo-redis-lock
https://my.oschina.net/gmarshal/blog/2120428
歡迎關注個人我的微信訂閱號:(聽說這個頭像程序猿專用)