讀寫鎖是計算機程序的併發控制的一種同步機制,用於解決讀寫問題,讀操做可併發重入,寫操做是互斥的。 讀寫鎖有多種讀寫權限的優先級策略,能夠設計爲讀優先、寫優先或不指定優先級。java
不指定優先級的策略,最適合使用ZooKeeper的子節點模式來實現,今天就來嘗試這種策略。併發
同前面介紹的普通分佈式鎖,也使用子節點模式實現。先用容器模式(CreateMode.CONTAINER)建立惟一的鎖節點,每一個鎖客戶端在鎖節點下使用臨時循序模式(CreateMode. SEQUENTIAL)建立子節點。這些子節點會自動在名稱後面追加10位數字。dom
有兩種簡單的方案:在子節點名中標識、在節點的值中標識。若是採用在值中標識,每次子節點列表後,還須要再分別讀一會兒節點的值,才能判斷是讀鎖仍是寫鎖,會比較耗時。若是在子節點名稱中標識,會面臨一個問題:在同一個節點中建立的子節點,若是給定的名稱不一樣,追加的10位數字是否仍然是遞歸的?分佈式
寫個測試用例驗證一下。ide
1函數 2測試 3ui 4this 5spa 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
測試用例經過,說明在同一個Container中建立的子節點,不論提供的節點名是什麼,後續追加的10位數字都是順序遞增的。這樣,就可使用節點名來區分讀鎖和寫鎖。
介紹分佈式鎖的時候,已經建立了阻塞鎖 ChildrenBlockingLock,讀寫鎖正好能夠基於這個類作重載。
寫鎖是一個獨佔鎖,邏輯跟普通分佈式鎖相同,只要它以前有鎖就必須等待。因此,徹底沿用阻塞鎖的邏輯便可。
讀鎖容許併發,它以前能夠有任意讀鎖,但不能有寫鎖。因此只須要判斷有沒有寫鎖便可。
這個類,主要是增長了一個獲取排序後子節點列表的方法,這樣方便實現讀寫鎖的代碼。固然,這個操做會增長一些耗時,若是子節點數量太大,可能不適用。
首先定義一個函數,用來返回子節點的前綴
/** 子節點的前綴,缺省是element,子類能夠重載 */ protected String getChildPrefix() { return "element"; }
而後定義一個內部類,子節點排序時會用到
/** 子節點名稱比較 */ private class StringCompare implements Comparator<String> { @Override public int compare(String string1, String string2) { return string1.substring(string1.length() - 10) .compareTo(string2.substring(string2.length() - 10)); } }
最後實現子節點排序方法,用於代替 getChildren 函數
/** 獲取排好序的子節點列表 */ final public List<String> getOrderedChildren(String path, boolean watch) throws KeeperException, InterruptedException { List<String> children = getZooKeeper().getChildren(path, watch); Collections.sort(children, new StringCompare()); return children; }
在多客戶端隨機測試時,常常出現程序卡死的狀況,沒法正常退出。通過添加日誌跟蹤,發現WatchedEvent可能會丟失,也可能會發送給並非註冊事件的ZooKeeper客戶端。在網上搜索,發現不少人也碰到相似問題。
簡單修改了一下ChildrenBlockingLock#isLockSuccess等待信號的代碼,從無參數的死等變成設置必定超時時間等待。關鍵代碼以下
protected boolean isLockSuccess() { boolean lockSuccess; try { while (true) { String prevElementName = getPrevElementName(); if (prevElementName == null) { log.trace("{} 沒有更靠前的子節點,加鎖成功", elementNodeName); lockSuccess = true; break; } else { // 有更小的節點,說明當前節點沒搶到鎖,註冊前一個節點的監聽。 log.trace("{} 監控 {} 的事件", elementNodeName, prevElementName); getZooKeeper().exists(this.guidNodeName + "/" + prevElementName, true); synchronized (mutex) { // 等待最多一秒 mutex.wait(1000); log.trace("{} 監控的 {} 有子節點變化", elementNodeName, guidNodeName); } } } } catch (KeeperException e) { lockSuccess = false; } catch (InterruptedException e) { lockSuccess = false; } return lockSuccess; }
代碼基本是沿用父類,只須要重載getChildPrefix()方法,
/** 返回寫鎖的前綴 */ protected String getChildPrefix() { return "w-lock-"; }
同寫鎖相比,除了重載getChildPrefix()方法,還重載了getPrevElementName()用來查找最近一個寫鎖。
/** 返回讀鎖的前綴 */ protected String getChildPrefix() { return "r-lock-"; } /** 是寫鎖 */ private boolean isWriteLock(String elementName) { return elementName.startsWith(ZooKeeperWriteLock.FLAG); } /** 讀取前一個寫鎖 */ protected String getPrevElementName() throws KeeperException, InterruptedException { List<String> elementNames = super.getOrderedChildren(this.guidNodeName, false); super.traceOrderedChildren(this.guidNodeName, elementNames); String prevWriteElementName = null; for (String oneElementName : elementNames) { if (this.elementNodeFullName.endsWith(oneElementName)) { // 已經到了當前節點 break; } if (isWriteLock(oneElementName)) { prevWriteElementName = oneElementName; } } return prevWriteElementName; }
測試用例沒想到好的判斷方法,很難使用assert判斷結果,所以作了簡化,根據日誌輸出,靠人眼判斷是否正確。
分別爲都鎖和寫鎖構建了兩個內部類
/** 寫鎖線程 */ class WriteLockClient extends Thread { ZooKeeperWriteLock writeLock; public WriteLockClient() { try { this.writeLock = new ZooKeeperWriteLock(address); } catch (IOException e) { } } public void run() { writeLock.lock(guidNodeName, this.getName()); try { Thread.sleep(1000 + random.nextInt(20) * 100); } catch (InterruptedException e) { } writeLock.release(guidNodeName, this.getName()); } } /** 讀鎖線程 */ class ReadLockClient extends Thread { ZooKeeperReadLock readLock; public ReadLockClient() { try { this.readLock = new ZooKeeperReadLock(address); } catch (IOException e) { } } public void run() { readLock.lock(guidNodeName, this.getName()); try { Thread.sleep(1000 + random.nextInt(20) * 100); } catch (InterruptedException e) { } readLock.release(guidNodeName, this.getName()); try { readLock.getZooKeeper().close(); } catch (InterruptedException e) { } } }
代碼
@Test public void testReadRead() throws IOException, InterruptedException { ReadLockClient readLock1 = new ReadLockClient(); ReadLockClient readLock2 = new ReadLockClient(); readLock1.start(); readLock2.start(); readLock1.join(); readLock2.join(); }
測試結果能夠看到,兩個讀鎖併發執行
22:18.861 [Thread-2 INFO] r-lock-0000000000 get read lock : true 22:18.865 [Thread-1 INFO] r-lock-0000000001 get read lock : true 22:20.065 [Thread-2 INFO] r-lock-0000000000 release read lock 22:21.366 [Thread-1 INFO] r-lock-0000000001 release read lock
代碼
@Test public void testReadWrite() throws IOException, InterruptedException { ReadLockClient readLock1 = new ReadLockClient(); WriteLockClient writeLock1 = new WriteLockClient(); readLock1.start(); Thread.sleep(50); writeLock1.start(); readLock1.join(); writeLock1.join(); }
測試結果能夠看到,首先獲取讀鎖,釋放以後才獲取到寫鎖。
27:40.800 [Thread-1 INFO] r-lock-0000000000 get read lock : true 27:43.310 [Thread-1 INFO] r-lock-0000000000 release read lock 27:43.423 [Thread-2 INFO] w-lock-0000000001 get write lock : true 27:44.423 [Thread-2 INFO] w-lock-0000000001 release write lock
代碼
@Test public void testWriteRead() throws IOException, InterruptedException { ReadLockClient readLock1 = new ReadLockClient(); WriteLockClient writeLock1 = new WriteLockClient(); writeLock1.start(); Thread.sleep(50); readLock1.start(); writeLock1.join(); readLock1.join(); }
測試結果能夠看到,首先獲取寫鎖,釋放以後才獲取到讀鎖。
29:17.661 [Thread-2 INFO] w-lock-0000000000 get write lock : true 29:19.966 [Thread-2 INFO] w-lock-0000000000 release write lock 29:19.976 [Thread-1 INFO] r-lock-0000000001 get read lock : true 29:22.476 [Thread-1 INFO] r-lock-0000000001 release read lock
測試代碼
@Test public void testRandomReadWriteLock() throws IOException, InterruptedException { int threadCount = 20; Thread[] lockThreads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { // 必定機率是寫鎖 boolean writeLock = random.nextInt(5) == 0; if (writeLock) { lockThreads[i] = new WriteLockClient(); } else { lockThreads[i] = new ReadLockClient(); } lockThreads[i].start(); } for (int i = 0; i < threadCount; i++) { lockThreads[i].join(); } }
測試結果能夠看出,若是連續多個讀鎖會併發執行。爲了方便查看,我添加了一些橫線分隔。
30:31.317 [Thread-1 INFO] w-lock-0000000000 get write lock : true 30:32.824 [Thread-1 INFO] w-lock-0000000000 release write lock ------------------------------------------------------------------ 30:32.834 [Thread-17 INFO] r-lock-0000000004 get read lock : true 30:32.835 [Thread-19 INFO] r-lock-0000000002 get read lock : true 30:32.835 [Thread-20 INFO] r-lock-0000000001 get read lock : true 30:32.836 [Thread-18 INFO] r-lock-0000000003 get read lock : true 30:34.135 [Thread-20 INFO] r-lock-0000000001 release read lock 30:34.634 [Thread-17 INFO] r-lock-0000000004 release read lock 30:34.935 [Thread-19 INFO] r-lock-0000000002 release read lock 30:35.036 [Thread-18 INFO] r-lock-0000000003 release read lock ------------------------------------------------------------------ 30:35.053 [Thread-16 INFO] w-lock-0000000005 get write lock : true 30:36.154 [Thread-16 INFO] w-lock-0000000005 release write lock ------------------------------------------------------------------ 30:36.160 [Thread-14 INFO] r-lock-0000000007 get read lock : true 30:36.160 [Thread-15 INFO] r-lock-0000000006 get read lock : true 30:38.160 [Thread-14 INFO] r-lock-0000000007 release read lock 30:38.661 [Thread-15 INFO] r-lock-0000000006 release read lock ------------------------------------------------------------------ 30:38.669 [Thread-13 INFO] w-lock-0000000008 get write lock : true 30:39.969 [Thread-13 INFO] w-lock-0000000008 release write lock ------------------------------------------------------------------ 30:39.976 [Thread-12 INFO] r-lock-0000000009 get read lock : true 30:39.977 [Thread-8 INFO] r-lock-0000000014 get read lock : true 30:39.977 [Thread-6 INFO] r-lock-0000000015 get read lock : true 30:39.984 [Thread-10 INFO] r-lock-0000000011 get read lock : true 30:39.985 [Thread-3 INFO] r-lock-0000000018 get read lock : true 30:39.984 [Thread-7 INFO] r-lock-0000000013 get read lock : true 30:39.984 [Thread-11 INFO] r-lock-0000000010 get read lock : true 30:39.983 [Thread-9 INFO] r-lock-0000000012 get read lock : true 30:39.983 [Thread-2 INFO] r-lock-0000000019 get read lock : true 30:39.982 [Thread-5 INFO] r-lock-0000000016 get read lock : true 30:39.986 [Thread-4 INFO] r-lock-0000000017 get read lock : true 30:40.986 [Thread-3 INFO] r-lock-0000000018 release read lock 30:41.086 [Thread-2 INFO] r-lock-0000000019 release read lock 30:41.285 [Thread-6 INFO] r-lock-0000000015 release read lock 30:41.576 [Thread-12 INFO] r-lock-0000000009 release read lock 30:42.185 [Thread-10 INFO] r-lock-0000000011 release read lock 30:42.186 [Thread-5 INFO] r-lock-0000000016 release read lock 30:42.187 [Thread-11 INFO] r-lock-0000000010 release read lock 30:42.286 [Thread-9 INFO] r-lock-0000000012 release read lock 30:42.586 [Thread-7 INFO] r-lock-0000000013 release read lock 30:42.677 [Thread-8 INFO] r-lock-0000000014 release read lock 30:42.887 [Thread-4 INFO] r-lock-0000000017 release read lock