#0 系列目錄#java
Zookeeper系列node
Zookeeper源碼併發
Zookeeper應用
#1 場景描述# 在分佈式應用, 每每存在多個進程提供同一服務. 這些進程有可能在相同的機器上, 也有可能分佈在不一樣的機器上. 若是這些進程共享了一些資源, 可能就須要分佈式鎖來鎖定對這些資源的訪問
。
#2 思路# 進程須要訪問共享數據時, 就在"/locks"節點下建立一個sequence類型的子節點, 稱爲thisPath
. 當thisPath在全部子節點中最小時, 說明該進程得到了鎖. 進程得到鎖以後, 就能夠訪問共享資源了. 訪問完成後, 須要將thisPath刪除. 鎖由新的最小的子節點得到.
有了清晰的思路以後, 還須要補充一些細節. 進程如何知道thisPath是全部子節點中最小的呢? 能夠在建立的時候, 經過getChildren方法獲取子節點列表, 而後在列表中找到排名比thisPath前1位的節點, 稱爲waitPath, 而後在waitPath上註冊監聽, 當waitPath被刪除後, 進程得到通知, 此時說明該進程得到了鎖.
#3 算法#
首先爲一個lock場景,在zookeeper中指定對應的一個根節點,用於記錄資源競爭的內容;
每一個lock建立後,會lazy在zookeeper中建立一個node節點,代表對應的資源競爭標識。 (小技巧:node節點爲EPHEMERAL_SEQUENTIAL,自增加的臨時節點
);
進行lock操做時,獲取對應lock根節點下的全部子節點,也即處於競爭中的資源標識;
按照Fair(公平)競爭
的原則,按照對應的自增內容作排序,取出編號最小的一個節點作爲lock的owner,判斷本身的節點id是否就爲owner id,若是是則返回,lock成功。
若是本身非owner id,按照排序的結果找到序號比本身前一位的id,關注它鎖釋放的操做(也就是exist watcher),造成一個鏈式的觸發過程
;
將本身id對應的節點刪除便可,對應的下一個排隊的節點就能夠收到Watcher事件,從而被喚醒獲得鎖後退出
;
node節點選擇爲EPHEMERAL_SEQUENTIAL很重要。
自增加的特性,能夠方便構建一個基於Fair特性的鎖
,前一個節點喚醒後一個節點,造成一個鏈式的觸發過程。能夠有效的避免"驚羣效應"(一個鎖釋放,全部等待的線程都被喚醒)
,有針對性的喚醒,提高性能。
選擇一個EPHEMERAL臨時節點的特性
。由於和zookeeper交互是一個網絡操做,不可控因素過多,好比網絡斷了,上一個節點釋放鎖的操做會失敗。臨時節點是和對應的session掛接的,session一旦超時或者異常退出其節點就會消失,相似於ReentrantLock中等待隊列Thread的被中斷處理
。
獲取lock操做是一個阻塞的操做,而對應的Watcher是一個異步事件
,因此須要使用互斥信號共享鎖BooleanMutex進行通知
,能夠比較方便的解決鎖重入的問題。(鎖重入能夠理解爲屢次讀操做,鎖釋放爲寫搶佔操做)
使用EPHEMERAL會引出一個風險:在非正常狀況下,網絡延遲比較大會出現session timeout,zookeeper就會認爲該client已關閉,從而銷燬其id標示,競爭資源的下一個id就能夠獲取鎖
。這時可能會有兩個process同時拿到鎖在跑任務
,因此設置好session timeout很重要。
一樣使用PERSISTENT一樣會存在一個死鎖的風險,進程異常退出後,對應的競爭資源id一直沒有刪除,下一個id一直沒法獲取到鎖對象
。
#4 實現# 1. DistributedLock.java源碼:分佈式鎖
package com.king.lock; import java.io.IOException; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; /** * Zookeeper 分佈式鎖 */ public class DistributedLock { private static final int SESSION_TIMEOUT = 10000; private static final int DEFAULT_TIMEOUT_PERIOD = 10000; private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; private static final byte[] data = {0x12, 0x34}; private ZooKeeper zookeeper; private String root; private String id; private LockNode idName; private String ownerId; private String lastChildId; private Throwable other = null; private KeeperException exception = null; private InterruptedException interrupt = null; public DistributedLock(String root) { try { this.zookeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, null); this.root = root; ensureExists(root); } catch (IOException e) { e.printStackTrace(); other = e; } } /** * 嘗試獲取鎖操做,阻塞式可被中斷 */ public void lock() throws Exception { // 可能初始化的時候就失敗了 if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new Exception("", other); } if (isOwner()) {// 鎖重入 return; } BooleanMutex mutex = new BooleanMutex(); acquireLock(mutex); // 避免zookeeper重啓後致使watcher丟失,會出現死鎖使用了超時進行重試 try { // mutex.lockTimeOut(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值爲true mutex.lock(); } catch (Exception e) { e.printStackTrace(); if (!mutex.state()) { lock(); } } if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new Exception("", other); } } /** * 嘗試獲取鎖對象, 不會阻塞 * * @throws InterruptedException * @throws KeeperException */ public boolean tryLock() throws Exception { // 可能初始化的時候就失敗了 if (exception != null) { throw exception; } if (isOwner()) { // 鎖重入 return true; } acquireLock(null); if (exception != null) { throw exception; } if (interrupt != null) { Thread.currentThread().interrupt(); } if (other != null) { throw new Exception("", other); } return isOwner(); } /** * 釋放鎖對象 */ public void unlock() throws KeeperException { if (id != null) { try { zookeeper.delete(root + "/" + id, -1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } finally { id = null; } } else { //do nothing } } /** * 判斷某path節點是否存在,不存在就建立 * @param path */ private void ensureExists(final String path) { try { Stat stat = zookeeper.exists(path, false); if (stat != null) { return; } zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { exception = e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); interrupt = e; } } /** * 返回鎖對象對應的path */ public String getRoot() { return root; } /** * 判斷當前是否是鎖的owner */ public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } /** * 返回當前的節點id */ public String getId() { return this.id; } // ===================== helper method ============================= /** * 執行lock操做,容許傳遞watch變量控制是否須要阻塞lock操做 */ private Boolean acquireLock(final BooleanMutex mutex) { try { do { if (id == null) { // 構建當前lock的惟一標識 long sessionId = zookeeper.getSessionId(); String prefix = "x-" + sessionId + "-"; // 若是第一次,則建立一個節點 String path = zookeeper.create(root + "/" + prefix, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); int index = path.lastIndexOf("/"); id = StringUtils.substring(path, index + 1); idName = new LockNode(id); } if (id != null) { List<String> names = zookeeper.getChildren(root, false); if (names.isEmpty()) { id = null; // 異常狀況,從新建立一個 } else { // 對節點進行排序 SortedSet<LockNode> sortedNames = new TreeSet<>(); for (String name : names) { sortedNames.add(new LockNode(name)); } if (!sortedNames.contains(idName)) { id = null;// 清空爲null,從新建立一個 continue; } // 將第一個節點作爲ownerId ownerId = sortedNames.first().getName(); if (mutex != null && isOwner()) { mutex.unlock();// 直接更新狀態,返回 return true; } else if (mutex == null) { return isOwner(); } SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { // 關注一下排隊在本身以前的最近的一個節點 LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); // 異步watcher處理 Stat stat = zookeeper.exists(root + "/" + lastChildId, new Watcher() { public void process(WatchedEvent event) { acquireLock(mutex); } }); if (stat == null) { acquireLock(mutex);// 若是節點不存在,須要本身從新觸發一下,watcher不會被掛上去 } } else { if (isOwner()) { mutex.unlock(); } else { id = null;// 可能本身的節點已超時掛了,因此id和ownerId不相同 } } } } } while (id == null); } catch (KeeperException e) { exception = e; if (mutex != null) { mutex.unlock(); } } catch (InterruptedException e) { interrupt = e; if (mutex != null) { mutex.unlock(); } } catch (Throwable e) { other = e; if (mutex != null) { mutex.unlock(); } } if (isOwner() && mutex != null) { mutex.unlock(); } return Boolean.FALSE; } }
2. BooleanMutex.java源碼:互斥信號共享鎖
package com.king.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * 互斥信號共享鎖 */ public class BooleanMutex { private Sync sync; public BooleanMutex() { sync = new Sync(); set(false); } /** * 阻塞等待Boolean爲true * @throws InterruptedException */ public void lock() throws InterruptedException { sync.innerLock(); } /** * 阻塞等待Boolean爲true,容許設置超時時間 * @param timeout * @param unit * @throws InterruptedException * @throws TimeoutException */ public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { sync.innerLock(unit.toNanos(timeout)); } public void unlock(){ set(true); } /** * 從新設置對應的Boolean mutex * @param mutex */ public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } } public boolean state() { return sync.innerState(); } /** * 互斥信號共享鎖 */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** * 狀態爲1,則喚醒被阻塞在狀態爲FALSE的全部線程 */ private static final int TRUE = 1; /** * 狀態爲0,則當前線程阻塞,等待被喚醒 */ private static final int FALSE = 0; /** * 返回值大於0,則執行;返回值小於0,則阻塞 */ protected int tryAcquireShared(int arg) { return getState() == 1 ? 1 : -1; } /** * 實現AQS的接口,釋放共享鎖的判斷 */ protected boolean tryReleaseShared(int ignore) { // 始終返回true,表明能夠release return true; } private boolean innerState() { return getState() == 1; } private void innerLock() throws InterruptedException { acquireSharedInterruptibly(0); } private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } private void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; // 直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操做 releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread } } } private void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; //直接退出 } if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操做 setState(FALSE); } } } } }
3. 相關說明:
4. 測試類:
package com.king.lock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.zookeeper.KeeperException; /** * 分佈式鎖測試 * @author taomk * @version 1.0 * @since 15-11-19 上午11:48 */ public class DistributedLockTest { public static void main(String [] args) { ExecutorService executor = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); for (int i = 0; i < count; i++) { final DistributedLock node = new DistributedLock("/locks"); executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); // node.tryLock(); // 無阻塞獲取鎖 node.lock(); // 阻塞獲取鎖 Thread.sleep(100); System.out.println("id: " + node.getId() + " is leader: " + node.isOwner()); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); try { node.unlock(); } catch (KeeperException e) { e.printStackTrace(); } } } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }
控制檯輸出:
id: x-239027745716109354-0000000248 is leader: true id: x-22854963329433645-0000000249 is leader: true id: x-22854963329433646-0000000250 is leader: true id: x-166970151413415997-0000000251 is leader: true id: x-166970151413415998-0000000252 is leader: true id: x-166970151413415999-0000000253 is leader: true id: x-166970151413416000-0000000254 is leader: true id: x-166970151413416001-0000000255 is leader: true id: x-166970151413416002-0000000256 is leader: true id: x-22854963329433647-0000000257 is leader: true id: x-239027745716109355-0000000258 is leader: true id: x-166970151413416003-0000000259 is leader: true id: x-94912557367427124-0000000260 is leader: true id: x-22854963329433648-0000000261 is leader: true id: x-239027745716109356-0000000262 is leader: true id: x-239027745716109357-0000000263 is leader: true id: x-166970151413416004-0000000264 is leader: true id: x-239027745716109358-0000000265 is leader: true id: x-239027745716109359-0000000266 is leader: true id: x-22854963329433649-0000000267 is leader: true id: x-22854963329433650-0000000268 is leader: true id: x-94912557367427125-0000000269 is leader: true id: x-22854963329433651-0000000270 is leader: true id: x-94912557367427126-0000000271 is leader: true id: x-239027745716109360-0000000272 is leader: true id: x-94912557367427127-0000000273 is leader: true id: x-94912557367427128-0000000274 is leader: true id: x-166970151413416005-0000000275 is leader: true id: x-94912557367427129-0000000276 is leader: true id: x-166970151413416006-0000000277 is leader: true id: x-94912557367427130-0000000278 is leader: true id: x-94912557367427131-0000000279 is leader: true id: x-239027745716109361-0000000280 is leader: true id: x-239027745716109362-0000000281 is leader: true id: x-166970151413416007-0000000282 is leader: true id: x-94912557367427132-0000000283 is leader: true id: x-22854963329433652-0000000284 is leader: true id: x-166970151413416008-0000000285 is leader: true id: x-239027745716109363-0000000286 is leader: true id: x-239027745716109364-0000000287 is leader: true id: x-166970151413416009-0000000288 is leader: true id: x-166970151413416010-0000000289 is leader: true id: x-239027745716109365-0000000290 is leader: true id: x-94912557367427133-0000000291 is leader: true id: x-239027745716109366-0000000292 is leader: true id: x-94912557367427134-0000000293 is leader: true id: x-22854963329433653-0000000294 is leader: true id: x-94912557367427135-0000000295 is leader: true id: x-239027745716109367-0000000296 is leader: true id: x-239027745716109368-0000000297 is leader: true
#5 升級版# 實現了一個分佈式lock後,能夠解決多進程之間的同步問題,但設計多線程+多進程的lock控制需求,單jvm中每一個線程都和zookeeper進行網絡交互成本就有點高了
,因此基於DistributedLock,實現了一個分佈式二層鎖。
大體原理就是ReentrantLock 和 DistributedLock的一個結合:
單jvm的多線程競爭時,首先須要先拿到第一層的ReentrantLock的鎖
;拿到鎖以後這個線程再去和其餘JVM的線程競爭鎖,最後拿到以後鎖以後就開始處理任務
;鎖的釋放過程是一個反方向的操做,先釋放DistributedLock,再釋放ReentrantLock
。 能夠思考一下,若是先釋放ReentrantLock,假如這個JVM ReentrantLock競爭度比較高,一直其餘JVM的鎖競爭容易被餓死
。
1. DistributedReentrantLock.java源碼:多進程+多線程分佈式鎖
package com.king.lock; import java.text.MessageFormat; import java.util.concurrent.locks.ReentrantLock; import org.apache.zookeeper.KeeperException; /** * 多進程+多線程分佈式鎖 */ public class DistributedReentrantLock extends DistributedLock { private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]"; private ReentrantLock reentrantLock = new ReentrantLock(); public DistributedReentrantLock(String root) { super(root); } public void lock() throws Exception { reentrantLock.lock();//多線程競爭時,先拿到第一層鎖 super.lock(); } public boolean tryLock() throws Exception { //多線程競爭時,先拿到第一層鎖 return reentrantLock.tryLock() && super.tryLock(); } public void unlock() throws KeeperException { super.unlock(); reentrantLock.unlock();//多線程競爭時,釋放最外層鎖 } @Override public String getId() { return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId()); } @Override public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && super.isOwner(); } }
2. 測試代碼:
package com.king.lock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.zookeeper.KeeperException; /** * @author taomk * @version 1.0 * @since 15-11-23 下午12:15 */ public class DistributedReentrantLockTest { public static void main(String [] args) { ExecutorService executor = Executors.newCachedThreadPool(); final int count = 50; final CountDownLatch latch = new CountDownLatch(count); final DistributedReentrantLock lock = new DistributedReentrantLock("/locks"); //單個鎖 for (int i = 0; i < count; i++) { executor.submit(new Runnable() { public void run() { try { Thread.sleep(1000); lock.lock(); Thread.sleep(100); System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner()); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); try { lock.unlock(); } catch (KeeperException e) { e.printStackTrace(); } } } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }
#6 最後# 其實再能夠發散一下,實現一個分佈式的read/write lock
,也差很少就是這個理了。大體思路:
read_自增id , write_自增id
;若是隊列的前邊都是read標識,對應的全部read都得到鎖
。若是隊列的前邊是write標識,第一個write節點獲取鎖
;read監聽距離本身最近的一個write節點的exist
,write監聽距離本身最近的一個節點(read或者write節點)
;