在Redis分佈式鎖一文中, 做者介紹瞭如何使用Redis開發分佈式鎖。html
Redis分佈式鎖具備輕量高吞吐量的特色,可是一致性保證較弱。咱們可使用Zookeeper開發分佈式鎖,來知足對高一致性的要求。java
Zookeeper 節點具備一些性質能夠幫助咱們開發分佈式鎖:node
一種比較容易想到的分佈式鎖實現方案是:git
該方案存在的問題是,當鎖被釋放時Zookeeper須要通知大量訂閱了該事件的客戶端,這種現象稱爲"驚羣現象"或"羊羣效應"。數據庫
驚羣現象對Zookeeper正常提供服務很是不利,所以實踐中一般採起另外一種方案:apache
該方案每次鎖釋放時只須要通知一個客戶端,避免驚羣現象發生。網絡
該方案的特徵是優先排隊等待的客戶端會先得到鎖,這種鎖稱爲公平鎖。而鎖釋放後,全部客戶端從新競爭鎖的方案稱爲非公平鎖。session
本節做者將使用Zookeeper官方Java API實現一個簡單的公平鎖。maven
使用Maven進行依賴管理,項目依賴 Zookeeper 官方 java sdk 和 apache commons-lang3工具包:分佈式
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.6</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> <type>pom</type> </dependency>
點擊查看完整代碼:
package zk; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; /** * @author finley */ public class ZKLock { private ZooKeeper zk; private String basePath; private String lockPath; private static final byte[] LOCK_DATA = "".getBytes(); // zk 爲客戶端鏈接實例, basePath 爲鎖節點路徑,咱們將在 basePath 下建立順序子節點 public ZKLock(ZooKeeper zk, String basePath) { // 按照 zk 的路徑規則,以'/'開始,不得以'/'結束 if (basePath.endsWith("/") || !basePath.startsWith("/")) { throw new IllegalArgumentException("base path must start with '/', and must not end with '/'"); } this.zk = zk; this.basePath = basePath; } // 檢測 basePath 節點是否存在, 若不存在則建立 private void ensureBasePath() throws KeeperException, InterruptedException { if (zk.exists(basePath, false) == null) { // basePath 不存在,進行建立 List<String> pathParts = new ArrayList<>(Arrays.asList(basePath.split("/"))); // 將路徑處理爲節點列表 pathParts.remove(0); //由於 basePath 以'/'開始, pathParts[0] 必定是空串,將其移除 // 自底向上,尋找路徑中最後一個存在的節點 int last = 0; for (int i = pathParts.size() - 1; i >= 0; i--) { String path = "/" + StringUtils.join(pathParts.subList(0, i), '/'); if (zk.exists(path, false) != null) { last = i; break; } } // 從最後一個存在的節點開始,依次建立節點 for (int i = last; i < pathParts.size(); i++) { String path = "/" + StringUtils.join(pathParts.subList(0, i + 1), '/'); try { zk.create(path, LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException ignore) {} // may created by other thread } } } // 阻塞直至加鎖成功 public void lock() throws KeeperException, InterruptedException { ensureBasePath(); // 在 basePath 下建立臨時順序子節點 String lockPath = zk.create(basePath + "/lock_", LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " create: " + lockPath); // 循環檢查加鎖是否成功 while(true) { // 取出 basePath 中全部節點並找到最小子節點 // 由於順序子節點老是遞增的,新建立的節點必定比當前 lockPath 更大,因此 create 和 getChildren 兩個操做不保持原子性不會出現異常 List<String> children = zk.getChildren(basePath,false); Collections.sort(children); String minNode = children.get(0); // 當前線程建立了最小子節點,加鎖成功 if (StringUtils.isNotBlank(lockPath) && StringUtils.isNotBlank(minNode) && StringUtils.equals(lockPath, basePath + "/" + minNode) { this.lockPath = lockPath; // 加鎖成功,寫入鎖路徑 return; } // 加鎖失敗,設置 watch String watchNode = null; String node = lockPath.substring(lockPath.lastIndexOf("/") + 1); for (int i = children.size() - 1; i >= 0; i--) { String child = children.get(i); if (child.compareTo(node) < 0) { watchNode = child; break; } } // 找到須要監視的節點,設置 watch if (watchNode != null) { System.out.println(Thread.currentThread().getName() + " watch: " + watchNode); String watchPath = basePath + "/" + watchNode; // 監視 getData 而非 exists 的緣由是: 在獲取子節點和設置 watch 這段時間內,被監視的節點可能已被刪除(鎖釋放/持有者崩潰) // exists 監視會成功設置,但永遠不會觸發NodeDeleted事件(順序子節點序號自增,不會複用使用過的序號)。本方法會無限制等待下去 // 若被監視節點已刪除,getData 會拋出異常,避免線程浪費時間等待 // 該調用中的 watch 回調當事件發生時會在另外一個線程中執行 try { zk.getData(watchPath, event -> { if(event.getType() == Watcher.Event.EventType.NodeDeleted) { // 主線程會調用 this.wait() // fixme: 這裏有一個bug,若事件類型不是 NodeDeleted 應進行處理。分佈式鎖不會產生這種狀況,多是其它客戶端操做所致 synchronized (this) { notifyAll(); } } }, null); } catch(KeeperException.NoNodeException e) { // 由於上一個節點被刪除致使 getData watch 失敗,進入下一個次循環,從新檢查本身是否已持有鎖 continue; } synchronized (this) { // 等待被 watch 喚醒,喚醒後進入下一次循環,從新檢查確認本身已持有鎖 wait(); System.out.println(Thread.currentThread().getName() + " notified"); } } } } // 釋放鎖 public void unlock() throws KeeperException, InterruptedException { // 加鎖成功時會將鎖路徑寫入 lockPath if (StringUtils.isNotBlank(lockPath)) { zk.delete(lockPath, -1); // 刪除鎖記錄釋放鎖 } else { throw new IllegalStateException("don't has lock"); // 未設置鎖記錄說明本線程未持有鎖 } } public static void main(String[] args) { int concurrent = 10; ExecutorService service = Executors.newFixedThreadPool(concurrent); for (int i = 0; i < concurrent; i++) { service.execute(() -> { // 爲保證各線程獨立的持有鎖,每一個線程應持有獨立的 zookeeper 會話 ZooKeeper zk; try { zk = new ZooKeeper("localhost:2181", 6000, watchedEvent -> { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) System.out.println("connection is established..."); }); ZKLock lock = new ZKLock(zk, "/test/node1"); lock.lock(); System.out.println(Thread.currentThread().getName() + " acquire success"); Thread.sleep(1000); System.out.println("do sth, thread: " + Thread.currentThread().getName()); lock.unlock(); System.out.println(Thread.currentThread().getName() + " release success"); } catch (Exception e) { e.printStackTrace(); } }); } service.shutdown(); } }
Cruator 是一個 Zookeeper 工具集, 提供了包括分佈式鎖在內的經常使用應用的封裝,本文以 Cruator 的分佈式鎖實現源碼爲例進行分析。
使用maven安裝依賴:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency>
編寫加鎖代碼:
public class ZkLock { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy); client.start(); // 鎖節點爲 /curator/mutex InterProcessMutex mutex = new InterProcessMutex(client, "/curator/mutex"); try { // 嘗試加鎖 mutex.acquire(); // 完成業務 System.out.println("foo bar"); } finally { // 釋放鎖 mutex.release(); client.close(); } } }
接下來分析InterProcessMutex.acquire()
的實現:
/** * Acquire the mutex - blocking until it's available. Note: the same thread * can call acquire re-entrantly. Each call to acquire must be balanced by a call * to {@link #release()} * * @throws Exception ZK errors, connection interruptions */ @Override public void acquire() throws Exception { if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } }
接下來看internalLock
方法:
private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); // threadData 是一個 ConcurrentMap, 記錄各線程鎖的狀態 LockData lockData = threadData.get(currentThread); if ( lockData != null ) // lockData 不爲空, 說明線程已經持有鎖 { // 重入鎖,重入計數器增長 lockData.lockCount.incrementAndGet(); return true; } // internals.attemptLock 完成實際的訪問Zookeeper獲取鎖的操做 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
分析實際執行加鎖操做的internals.attemptLock
方法:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; // 自旋加鎖 while ( !isDone ) { isDone = true; try { // 在鎖節點下建立臨時順序節點 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); // 等待本身的節點成爲最小的節點,即加鎖成功 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // 當 session 超時會拋出異常,根據重試策略直接進行重試 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
首先閱讀StandardLockInternalsDriver.createsTheLock()
源碼:
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
建立臨時順序節點, 再也不贅述。
接下來查看internalLockLoop
:
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { // 得到全部子節點,按序號升序排列 List<String> children = getSortedChildren(); // 判斷本身是否爲序號最小的節點 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { // 得到前一個節點的路徑 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); // 監聽前一個節點並進行wait(), 當鎖被釋放時會經過notifyall() 喚醒 synchronized(this) { try { // 使用getData()而非exists()監聽器的緣由是: // 若此時前一個節點已被刪除exists()仍會成功設置,但不可能被觸發(順序節點不會再次使用前一個節點的序號)。這會使方法浪費時間等待,也屬於Zookeeper資源浪費 // 若前一個節點被刪除getData() 會拋出異常 client.getData().usingWatcher(watcher).forPath(previousSequencePath); // 若設置了等待時間 if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } // 等待指定的時間 wait(millisToWait); } else { // 永遠等待 wait(); } } catch ( KeeperException.NoNodeException e ) { // getData() 拋出此異常說明前一個節點已被刪除, 從新嘗試獲取鎖。 } } } }