在以前的文章中介紹過度布式鎖的特色和利用Redis實現簡單的分佈式鎖。可是分佈式鎖的實現還有不少其餘方式,可是萬變不離其宗,始終遵循一個特色:同一時刻只能有一個操做獲取。這篇文章主要介紹如何基於zookeeper實現分佈式鎖。html
關於分佈式鎖的相關特性,這裏再也不贅述,請參考分佈式鎖。node
這裏回顧下分佈式鎖的特色:算法
zookeeper中有一種臨時順序節點,它具備如下特徵:分佈式
利用以上的特色能夠知足分佈式鎖實現的基本要求:ide
由於順序性,可讓最小順序號的應用獲取到鎖,從而知足分佈式鎖的每次只能一個佔用鎖,由於只有它一個獲取到,因此能夠實現重複進入,只要設置標識便可。鎖的釋放,即刪除應用在zookeeper上註冊的節點,由於每一個節點只被本身註冊擁有,因此只有本身才能刪除,這樣就知足只有佔用者才能夠解鎖性能
zookeeper的序號分配是原子的,分配後即不會再改變,讓最小序號者獲取鎖,因此獲取鎖是原子的fetch
由於註冊的是臨時節點,在會話期間內有效,因此不會產生死鎖ui
zookeeper註冊節點的性能能知足幾千,並且支持集羣,可以知足大部分狀況下的性能this
須要獲取分佈式鎖的應用都向zookeeper的/lock/{resouce}目錄下注冊sequence-前綴的節點,序號最小者獲取到操做資源的權限:線程
Note:
這裏的resource須要依據競爭的具體資源肯定,如競爭帳戶則能夠使用帳戶號做爲resource。
從圖中能夠看出,clientA的順序號最小,由它獲取到鎖,操做資源。
算法步驟:
流程圖:
由於最小的節點只被獲取到鎖的client持有,因此該鎖不可能被其餘client釋放。同時釋放鎖只須要將臨時順序節點刪除,也是原子性操做。
/** * 基於Zookeeper實現分佈式鎖 * * @author huaijin */ public class DistributedLockBaseZookeeper implements DistributedLock { private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class); /** * 利用空串做爲各個節點存儲的數據 */ private static final String EMPTY_DATA = ""; /** * 分佈式鎖的根目錄 */ private static final String LOCK_ROOT = "/lock"; /** * zookeeper目錄分隔符 */ private static final String PATH_SEPARATOR = "/"; /** * 臨時順序節點前綴 */ private static final String LOCK_NODE_PREFIX = "sequence-"; /** * 利用Lock和Condition實現等待通知 */ private Lock waitNotifierLock = new ReentrantLock(); private Condition waitNotifier = waitNotifierLock.newCondition(); /** * 操做zookeeper的client */ private ZkClient zkClient; /** * 分佈式資源的路徑 */ private String resourcePath; /** * 鎖節點完整前綴 */ private String lockNodePrefix; /** * 當前註冊的臨時順序節點路徑 */ private String currentLockNodePath; public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) { Objects.requireNonNull(zkClient, "zkClient must not be null!"); if (resource == null || resource.isEmpty()) { throw new IllegalArgumentException("resource must not be null!"); } this.zkClient = zkClient; this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource; this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX; // 建立分佈式鎖根目錄 if (!this.zkClient.exists(LOCK_ROOT)) { try { this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT); } catch (ZkNodeExistsException e) { // ignore, logging log.warn("The root path for lock already exists."); } } // 建立資源目錄 if (!this.zkClient.exists(resourcePath)) { try { this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT); } catch (ZkNodeExistsException e) { // ignore, logging log.warn("The resource path for [" + resourcePath + "] already exists."); } } } @Override public void lock() throws DistributedLockException { if (!acquireLock()) { // 若是獲取鎖不成功,則等待 waitNotifierLock.lock(); try { waitNotifier.await(); } catch (Exception e) { throw new DistributedLockException("Interrupt when waiting notification."); } finally { waitNotifierLock.unlock(); } } } @Override public void unlock() { // 刪除自身節點,釋放鎖 zkClient.delete(currentLockNodePath); } private boolean acquireLock() throws DistributedLockException { // 若是當前未註冊臨時順序節點,則註冊 if (this.currentLockNodePath == null) { this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL); } // 獲取順序號 long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath); // 獲取全部子節點 List<String> childNodePaths = zkClient.getChildren(resourcePath); if (childNodePaths == null || childNodePaths.isEmpty()) { throw new DistributedLockException("Not exists child nodes."); } // 從全部子節點中獲取最小子節點的順序號 long minSeq = 1000000L; int minIndex = -1; for (int i = 0; i < childNodePaths.size(); i++) { long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i)); if (nodeSeq < minSeq) { minSeq = nodeSeq; minIndex = i; } } // 比較自身順序號與最小序號 if (lockNodeSeq > minSeq) { // 若是存在更小序號,則監控最小序號的子節點 String minLockNodePath = childNodePaths.get(minIndex); zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath, new ListenerForLockRelease()); return false; } // 成功獲取鎖,返回 return true; } private long fetchSeqFromNodePath(String nodePath) { String seq = nodePath.substring(lockNodePrefix.length()); return Long.valueOf(seq); } private class ListenerForLockRelease implements IZkDataListener { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { // 若是成功獲取鎖,則通知,讓主線程返回 if (acquireLock()) { waitNotifierLock.lock(); try { waitNotifier.signal(); } finally { waitNotifierLock.unlock(); } } } } }