ZooKeeper 裏實現分佈式鎖的基本邏輯:html
1.zookeeper中建立一個根節點(Locks),用於後續各個客戶端的鎖操做。node
2.想要獲取鎖的client都在Locks中建立一個自增序的子節點,每一個client獲得一個序號,若是本身的序號是最小的則得到鎖。服務器
3.若是沒有獲得鎖,就監控排在本身前面的序號節點,而且設置默認時間,等待它的釋放。session
4.業務操做後釋放鎖,而後監控本身的節點的client就被喚醒獲得鎖。(例如client A須要釋放鎖,只須要把對應的節點1刪除掉,由於client B已經關注了節點1,那麼當節點1被刪除後,zookeeper就會通知client B:你是序號最小的了,能夠獲取鎖了)架構
釋放鎖的過程相對比較簡單,就是刪除本身建立的那個子節點便可。分佈式
解決方案目錄: ide
Demo1 Demo2爲測試場景 測試
ZooKeepr_Lock爲鎖操做代碼this
下面貼一下代碼看看spa
1 public class ZooKeeprDistributedLock : IWatcher 2 { 3 /// <summary> 4 /// zk連接字符串 5 /// </summary> 6 private String connectString = "127.0.0.1:2181"; 7 private ZooKeeper zk; 8 private string root = "/locks"; //根 9 private string lockName; //競爭資源的標誌 10 private string waitNode; //等待前一個鎖 11 private string myZnode; //當前鎖 12 private AutoResetEvent autoevent; 13 private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(50000); 14 private IList<Exception> exception = new List<Exception>(); 15 16 /// <summary> 17 /// 建立分佈式鎖 18 /// </summary> 19 /// <param name="lockName">競爭資源標誌,lockName中不能包含單詞lock</param> 20 public ZooKeeprDistributedLock(string lockName) 21 { 22 this.lockName = lockName; 23 // 建立一個與服務器的鏈接 24 try 25 { 26 zk = new ZooKeeper(connectString, sessionTimeout, this); 27 Stopwatch sw = new Stopwatch(); 28 sw.Start(); 29 while (true) 30 { 31 if (zk.State == States.CONNECTING) { break; } 32 if (zk.State == States.CONNECTED) { break; } 33 } 34 sw.Stop(); 35 TimeSpan ts2 = sw.Elapsed; 36 Console.WriteLine("zoo鏈接總共花費{0}ms.", ts2.TotalMilliseconds); 37 38 var stat = zk.Exists(root, false); 39 if (stat == null) 40 { 41 // 建立根節點 42 zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent); 43 } 44 } 45 catch (KeeperException e) 46 { 47 throw e; 48 } 49 } 50 51 /// <summary> 52 /// zookeeper節點的監視器 53 /// </summary> 54 public virtual void Process(WatchedEvent @event) 55 56 { 57 if (this.autoevent != null) 58 { 59 //將事件狀態設置爲終止狀態,容許一個或多個等待線程繼續;若是該操做成功,則返回true;不然,返回false 60 this.autoevent.Set(); 61 } 62 } 63 64 public virtual bool tryLock() 65 { 66 try 67 { 68 string splitStr = "_lock_"; 69 if (lockName.Contains(splitStr)) 70 { 71 //throw new LockException("lockName can not contains \\u000B"); 72 } 73 //建立臨時子節點 74 myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential); 75 Console.WriteLine(myZnode + " 建立完成! "); 76 //取出全部子節點 77 IList<string> subNodes = zk.GetChildren(root, false).ToList<string>(); 78 //取出全部lockName的鎖 79 IList<string> lockObjNodes = new List<string>(); 80 foreach (string node in subNodes) 81 { 82 if (node.StartsWith(lockName)) 83 { 84 lockObjNodes.Add(node); 85 } 86 } 87 Array alockObjNodes = lockObjNodes.ToArray(); 88 Array.Sort(alockObjNodes); 89 Console.WriteLine(myZnode + "==" + lockObjNodes[0]); 90 if (myZnode.Equals(root + "/" + lockObjNodes[0])) 91 { 92 //若是是最小的節點,則表示取得鎖 93 Console.WriteLine(myZnode + " 獲取鎖成功! "); 94 return true; 95 } 96 //若是不是最小的節點,找到比本身小1的節點 97 string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1); 98 waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1]; 99 } 100 catch (KeeperException e) 101 { 102 throw e; 103 } 104 return false; 105 } 106 107 108 public virtual bool tryLock(TimeSpan time) 109 { 110 try 111 { 112 if (this.tryLock()) 113 { 114 return true; 115 } 116 return waitForLock(waitNode, time); 117 } 118 catch (KeeperException e) 119 { 120 throw e; 121 } 122 } 123 124 /// <summary> 125 /// 等待鎖 126 /// </summary> 127 /// <param name="lower">需等待的鎖節點</param> 128 /// <param name="waitTime">等待時間</param> 129 /// <returns></returns> 130 private bool waitForLock(string lower, TimeSpan waitTime) 131 { 132 var stat = zk.Exists(root + "/" + lower, true); 133 //判斷比本身小一個數的節點是否存在,若是不存在則無需等待鎖,同時註冊監聽 134 if (stat != null) 135 { 136 Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower); 137 autoevent = new AutoResetEvent(false); 138 //阻止當前線程,直到當前實例收到信號,使用 TimeSpan 度量時間間隔並指定是否在等待以前退出同步域 139 bool r = autoevent.WaitOne(waitTime); 140 autoevent.Dispose(); 141 autoevent = null; 142 return r; 143 } 144 else return true; 145 } 146 147 /// <summary> 148 /// 解除鎖 149 /// </summary> 150 public virtual void unlock() 151 { 152 try 153 { 154 Console.WriteLine("unlock " + myZnode); 155 zk.Delete(myZnode, -1); 156 myZnode = null; 157 zk.Dispose(); 158 } 159 catch (KeeperException e) 160 { 161 throw e; 162 } 163 } 164 }
而後先看demo2 : 當前獲取到鎖之後 釋放鎖的操做被阻塞 而後運行demo1 進行測試
int count = 1;//庫存 商品編號1079233 if (count == 1) { ZooKeeprDistributedLock zklock = new ZooKeeprDistributedLock("Getorder_Pid1079233"); //建立鎖 if (zklock.tryLock(TimeSpan.FromMilliseconds(50000))) { Console.WriteLine("Demo2建立訂單成功!"); } else { Console.WriteLine("Demo2建立訂單失敗了!"); } Thread.Sleep(30000);//對操做釋放鎖進行阻塞 Console.WriteLine(DateTime.Now.ToString("yyyyMMdd HH:mm:ss")); //要進行釋放鎖的操做時間 主要測試當前鎖釋放後 Demo1的節點監控是否喚起 zklock.unlock();//釋放鎖 Console.ReadKey(); }
demo1:demo1會對排在前面的節點進行監控 當demo2釋放鎖後 demo1獲取鎖 demo1建立訂單與釋放鎖之間打印了操做時間
能夠跟demo2進行釋放鎖的時間進行對比下
int count = 1;//庫存 商品編號1079233 if (count == 1) { ZooKeeprDistributedLock zklock = new ZooKeeprDistributedLock("Getorder_Pid1079233"); if (zklock.tryLock(TimeSpan.FromMilliseconds(50000))) { Console.WriteLine("Demo1建立訂單成功!"); } else { Console.WriteLine("Demo1建立訂單失敗了!"); } Console.WriteLine(DateTime.Now.ToString("yyyyMMdd HH:mm:ss")); zklock.unlock(); Console.ReadKey(); }
這裏是我運行後的結果,只精確到秒,能夠看到demo2釋放鎖後,demo1的AutoResetEvent當即被阻斷了而後demo1也就得到了鎖!
場景二:將demo1的監聽註釋後,demo2未釋放鎖,demo1建立訂單失敗
//if (zklock.tryLock(TimeSpan.FromMilliseconds(50000))) if (zklock.tryLock())
有關此篇一些圖片及內容借鑑了幾位園友的博文,在此感謝!