1.根據lockKey區進行setnx(set not exist,若是key值爲空,則正常設置,返回1,不然不會進行設置並返回0)操做,若是設置成功,表示已經得到鎖,不然並無獲取鎖。node
2.若是沒有得到鎖,去Redis上拿到該key對應的值,在該key上咱們存儲一個時間戳(用毫秒錶示,t1),爲了不死鎖以及其餘客戶端佔用該鎖超過必定時間(5秒),使用該客戶端當前時間戳,與存儲的時間戳做比較。redis
3.若是沒有超過該key的使用時限,返回false,表示其餘人正在佔用該key,不能強制使用;若是已經超過期限,那咱們就能夠進行解鎖,使用咱們的時間戳來代替該字段的值。算法
4.可是若是在setnx失敗後,get該值卻沒法拿到該字段時,說明操做以前該鎖已經被釋放,這個時候,最好的辦法就是從新執行一遍setnx方法來獲取其值以得到該鎖。session
釋放鎖:刪除redis中key異步
1 public class RedisKeyLock { 2 private static Logger logger = Logger.getLogger(RedisKeyLock.class); 3 private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000; 4 private final static int EXPIRE_IN_SECOND = 5;//鎖失效時間 5 private final static long WAIT_INTERVAL_IN_MS = 100; 6 private static RedisKeyLock lock; 7 private JedisPool jedisPool; 8 private RedisKeyLock(JedisPool pool){ 9 this.jedisPool = pool; 10 } 11 public static RedisKeyLock getInstance(JedisPool pool){ 12 if(lock == null){ 13 lock = new RedisKeyLock(pool); 14 } 15 return lock; 16 } 17 18 public void lock(final String redisKey) { 19 Jedis resource = null; 20 try { 21 long now = System.currentTimeMillis(); 22 resource = jedisPool.getResource(); 23 long timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS; 24 boolean flag = false; 25 while (true) { 26 String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000); 27 long ret = resource.setnx(redisKey, expireAt); 28 if (ret == 1) {//已獲取鎖 29 flag = true; 30 break; 31 } else {//未獲取鎖,重試獲取鎖 32 String oldExpireAt = resource.get(redisKey); 33 if (oldExpireAt != null && Long.parseLong(oldExpireAt) < now) { 34 oldExpireAt = resource.getSet(redisKey, expireAt); 35 if (Long.parseLong(oldExpireAt) < now) { 36 flag = true; 37 break; 38 } 39 } 40 } 41 if (timeoutAt < now) { 42 break; 43 } 44 TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS); 45 } 46 if (!flag) { 47 throw new RuntimeException("canot acquire lock now ..."); 48 } 49 } catch (JedisException je) { 50 logger.error("lock", je); 51 je.printStackTrace(); 52 if (resource != null) { 53 jedisPool.returnBrokenResource(resource); 54 } 55 } catch (Exception e) { 56 e.printStackTrace(); 57 logger.error("lock", e); 58 } finally { 59 if (resource != null) { 60 jedisPool.returnResource(resource); 61 } 62 } 63 } 64 public boolean unlock(final String redisKey) { 65 Jedis resource = null; 66 try { 67 resource = jedisPool.getResource(); 68 resource.del(redisKey); 69 return true; 70 } catch (JedisException je) { 71 je.printStackTrace(); 72 if (resource != null) { 73 jedisPool.returnBrokenResource(resource); 74 } 75 return false; 76 } catch (Exception e) { 77 logger.error("lock", e); 78 return false; 79 } finally { 80 if (resource != null) { 81 jedisPool.returnResource(resource); 82 } 83 } 84 } 85 }
另外一個版本:分佈式
SET my:lock 隨機值 NX PX 30000性能
這個的NX的意思就是隻有key不存在的時候纔會設置成功,PX 30000的意思是30秒後鎖自動釋放。別人建立的時候若是發現已經有了就不能加鎖了。ui
釋放鎖就是刪除key,可是通常能夠用lua腳本刪除,判斷value同樣才刪除this
爲啥要用隨機值呢?由於若是某個客戶端獲取到了鎖,可是阻塞了很長時間才執行完,此時可能已經自動釋放鎖了,此時可能別的客戶端已經獲取到了這個鎖,要是你這個時候直接刪除key的話會有問題,因此得用隨機值加上面的lua腳原本釋放鎖。(就是根據這個隨機值來判斷這個鎖是否是本身加的)lua
若是是Redis是單機,會有問題。由於若是是普通的redis單實例,那就是單點故障。單節點掛了會致使鎖失效。
若是是redis普通主從,那redis主從異步複製,若是主節點掛了,key還沒同步到從節點,此時從節點切換爲主節點,別人就會拿到鎖。
RedLock算法
這個場景是假設有一個redis cluster,有5個redis master實例。而後執行以下步驟獲取一把鎖:
獲取當前時間戳,單位是毫秒
跟上面相似,輪流嘗試在每一個master節點上建立鎖,過時時間較短,通常就幾十毫秒
嘗試在大多數節點上創建一個鎖,好比5個節點就要求是3個節點(n / 2 +1)
客戶端計算創建好鎖的時間,若是創建鎖的時間小於超時時間,就算創建成功了
要是鎖創建失敗了,那麼就依次刪除這個鎖
只要別人創建了一把分佈式鎖,你就得不斷輪詢去嘗試獲取鎖
基於臨時順序節點:
1.客戶端調用create()方法建立名爲「locknode/guid-lock-」的節點,須要注意的是,這裏節點的建立類型須要設置爲EPHEMERAL_SEQUENTIAL。
2.客戶端調用getChildren(「locknode」)方法來獲取全部已經建立的子節點。
3.客戶端獲取到全部子節點path以後,若是發現本身在步驟1中建立的節點是全部節點中序號最小的,那麼就認爲這個客戶端得到了鎖。
4.若是建立的節點不是全部節點中序號最小的,那麼則監視比本身建立節點的序列號小的最大的節點,進入等待。直到下次監視的子節點變動的時候,再進行子節點的獲取,判斷是否獲取鎖。
釋放鎖的過程相對比較簡單,就是刪除本身建立的那個子節點便可。
不太嚴謹的代碼:
1 public class ZooKeeperDistributedLock implements Watcher{ 2 3 private ZooKeeper zk; 4 private String locksRoot= "/locks"; 5 private String productId; 6 private String waitNode; 7 private String lockNode; 8 private CountDownLatch latch; 9 private CountDownLatch connectedLatch = new CountDownLatch(1); 10 private int sessionTimeout = 30000; 11 12 public ZooKeeperDistributedLock(String productId){ 13 this.productId = productId; 14 try { 15 String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181"; 16 zk = new ZooKeeper(address, sessionTimeout, this); 17 connectedLatch.await(); 18 } catch (IOException e) { 19 throw new LockException(e); 20 } catch (KeeperException e) { 21 throw new LockException(e); 22 } catch (InterruptedException e) { 23 throw new LockException(e); 24 } 25 } 26 27 public void process(WatchedEvent event) { 28 if(event.getState()==KeeperState.SyncConnected){ 29 connectedLatch.countDown(); 30 return; 31 } 32 33 if(this.latch != null) { 34 this.latch.countDown(); 35 } 36 } 37 38 public void acquireDistributedLock() { 39 try { 40 if(this.tryLock()){ 41 return; 42 } 43 else{ 44 waitForLock(waitNode, sessionTimeout); 45 } 46 } catch (KeeperException e) { 47 throw new LockException(e); 48 } catch (InterruptedException e) { 49 throw new LockException(e); 50 } 51 } 52 53 public boolean tryLock() { 54 try { 55 // 傳入進去的locksRoot + 「/」 + productId 56 // 假設productId表明了一個商品id,好比說1 57 // locksRoot = locks 58 // /locks/10000000000,/locks/10000000001,/locks/10000000002 59 lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 60 61 // 看看剛建立的節點是否是最小的節點 62 // locks:10000000000,10000000001,10000000002 63 List<String> locks = zk.getChildren(locksRoot, false); 64 Collections.sort(locks); 65 66 if(lockNode.equals(locksRoot+"/"+ locks.get(0))){ 67 //若是是最小的節點,則表示取得鎖 68 return true; 69 } 70 71 //若是不是最小的節點,找到比本身小1的節點 72 int previousLockIndex = -1; 73 for(int i = 0; i < locks.size(); i++) { 74 if(lockNode.equals(locksRoot + 「/」 + locks.get(i))) { 75 previousLockIndex = i - 1; 76 break; 77 } 78 } 79 80 this.waitNode = locks.get(previousLockIndex); 81 } catch (KeeperException e) { 82 throw new LockException(e); 83 } catch (InterruptedException e) { 84 throw new LockException(e); 85 } 86 return false; 87 } 88 89 private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException { 90 Stat stat = zk.exists(locksRoot + "/" + waitNode, true); 91 if(stat != null){ 92 this.latch = new CountDownLatch(1); 93 this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; 94 } 95 return true; 96 } 97 98 public void unlock() { 99 try { 100 // 刪除/locks/10000000000節點 101 // 刪除/locks/10000000001節點 102 System.out.println("unlock " + lockNode); 103 zk.delete(lockNode,-1); 104 lockNode = null; 105 zk.close(); 106 } catch (InterruptedException e) { 107 e.printStackTrace(); 108 } catch (KeeperException e) { 109 e.printStackTrace(); 110 } 111 } 112 113 public class LockException extends RuntimeException { 114 private static final long serialVersionUID = 1L; 115 public LockException(String e){ 116 super(e); 117 } 118 public LockException(Exception e){ 119 super(e); 120 } 121 } 122 123 // 若是有一把鎖,被多我的給競爭,此時多我的會排隊,第一個拿到鎖的人會執行,而後釋放鎖,後面的每一個人都會去監聽排在本身前面的那我的建立的node上,一旦某我的釋放了鎖,排在本身後面的人就會被zookeeper給通知,一旦被通知了以後,就ok了,本身就獲取到了鎖,就能夠執行代碼了 124 125 }
另外一個版本:
zk分佈式鎖,就是某個節點嘗試建立臨時znode,此時建立成功了就獲取了這個鎖;這個時候別的客戶端來建立鎖會失敗,只能註冊個監聽器監聽這個鎖。
釋放鎖就是刪除這個znode,一旦釋放掉就會通知客戶端,而後有一個等待着的客戶端就能夠再次從新加鎖。
redis分佈式鎖,其實須要本身不斷去嘗試獲取鎖,比較消耗性能
zk分佈式鎖,獲取不到鎖,註冊個監聽器便可,不須要不斷主動嘗試獲取鎖,性能開銷較小
另一點就是,若是是redis獲取鎖的那個客戶端bug了或者掛了,那麼只能等待超時時間以後才能釋放鎖;而zk的話,由於建立的是臨時znode,只要客戶端掛了,znode就沒了,此時就自動釋放鎖