zookeeper — 實現分佈式鎖

一.前言

在以前的文章中介紹過度布式鎖的特色和利用Redis實現簡單的分佈式鎖。可是分佈式鎖的實現還有不少其餘方式,可是萬變不離其宗,始終遵循一個特色:同一時刻只能有一個操做獲取。這篇文章主要介紹如何基於zookeeper實現分佈式鎖。html

  • zookeeper可以做爲分佈式鎖實現的基礎
  • 算法流程
  • 實現

關於分佈式鎖的相關特性,這裏再也不贅述,請參考分佈式鎖node


二.zookeeper可以做爲分佈式鎖實現的基礎

這裏回顧下分佈式鎖的特色:算法

  • 每次只能一個佔用鎖;
  • 能夠重複進入鎖;
  • 只有佔用者才能夠解鎖;
  • 獲取鎖和釋放鎖都須要原子
  • 不能產生死鎖
  • 儘可能知足性能

zookeeper中有一種臨時順序節點,它具備如下特徵:分佈式

  • 時效性,當會話結束,節點將自動被刪除
  • 順序性,當多個應用向其註冊順序節點時,每一個順序號將只能被一個應用獲取

利用以上的特色能夠知足分佈式鎖實現的基本要求:ide

  1. 由於順序性,可讓最小順序號的應用獲取到鎖,從而知足分佈式鎖的每次只能一個佔用鎖,由於只有它一個獲取到,因此能夠實現重複進入,只要設置標識便可。鎖的釋放,即刪除應用在zookeeper上註冊的節點,由於每一個節點只被本身註冊擁有,因此只有本身才能刪除,這樣就知足只有佔用者才能夠解鎖性能

  2. zookeeper的序號分配是原子的,分配後即不會再改變,讓最小序號者獲取鎖,因此獲取鎖是原子的fetch

  3. 由於註冊的是臨時節點,在會話期間內有效,因此不會產生死鎖ui

  4. zookeeper註冊節點的性能能知足幾千,並且支持集羣,可以知足大部分狀況下的性能this

三.算法流程

1.獲取鎖

須要獲取分佈式鎖的應用都向zookeeper的/lock/{resouce}目錄下注冊sequence-前綴的節點,序號最小者獲取到操做資源的權限:線程

Note:
這裏的resource須要依據競爭的具體資源肯定,如競爭帳戶則能夠使用帳戶號做爲resource。

從圖中能夠看出,clientA的順序號最小,由它獲取到鎖,操做資源。

算法步驟

  1. client判斷/lock目錄是否存在,若是不存在則向其註冊/lock的持久節點
  2. client判斷/lock目錄下是否存在競爭的資源resouce目錄,若是不存在則向其註冊/lock/resource的持久節點
  3. client向/lock/resource目錄下注冊/lock/resource/sequence-前綴的臨時順序節點,並獲得順序號
  4. client獲取/lock/resource目錄下的全部臨時順序子節點
  5. client判斷臨時子節點序號中是否存在比自身的序號小的節點。若是不存在,則獲取到鎖;若是存在,則對象該臨時節點作watch監控
  6. 若是收到監控的臨時節點被刪除的通知,則再重複四、5步驟,直到獲取到鎖

流程圖

2.釋放鎖

由於最小的節點只被獲取到鎖的client持有,因此該鎖不可能被其餘client釋放。同時釋放鎖只須要將臨時順序節點刪除,也是原子性操做。


三.實現

/**
 * 基於Zookeeper實現分佈式鎖
 *
 * @author huaijin
 */
public class DistributedLockBaseZookeeper implements DistributedLock {

    private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);

    /**
     * 利用空串做爲各個節點存儲的數據
     */
    private static final String EMPTY_DATA = "";

    /**
     * 分佈式鎖的根目錄
     */
    private static final String LOCK_ROOT = "/lock";

    /**
     * zookeeper目錄分隔符
     */
    private static final String PATH_SEPARATOR = "/";

    /**
     * 臨時順序節點前綴
     */
    private static final String LOCK_NODE_PREFIX = "sequence-";

    /**
     * 利用Lock和Condition實現等待通知
     */
    private Lock waitNotifierLock = new ReentrantLock();
    private Condition waitNotifier = waitNotifierLock.newCondition();

    /**
     * 操做zookeeper的client
     */
    private ZkClient zkClient;

    /**
     * 分佈式資源的路徑
     */
    private String resourcePath;

    /**
     * 鎖節點完整前綴
     */
    private String lockNodePrefix;

    /**
     * 當前註冊的臨時順序節點路徑
     */
    private String currentLockNodePath;

    public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
        Objects.requireNonNull(zkClient, "zkClient must not be null!");
        if (resource == null || resource.isEmpty()) {
            throw new IllegalArgumentException("resource must not be null!");
        }
        this.zkClient = zkClient;
        this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
        this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;

        // 建立分佈式鎖根目錄
        if (!this.zkClient.exists(LOCK_ROOT)) {
            try {
                this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
            } catch (ZkNodeExistsException e) {
                // ignore, logging
                log.warn("The root path for lock already exists.");
            }
        }

        // 建立資源目錄
        if (!this.zkClient.exists(resourcePath)) {
            try {
                this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
            } catch (ZkNodeExistsException e) {
                // ignore, logging
                log.warn("The resource path for [" + resourcePath + "] already exists.");
            }
        }
    }


    @Override
    public void lock() throws DistributedLockException {
        if (!acquireLock()) {
            // 若是獲取鎖不成功,則等待
            waitNotifierLock.lock();
            try {
                waitNotifier.await();
            } catch (Exception e) {
                throw new DistributedLockException("Interrupt when waiting notification.");
            } finally {
                waitNotifierLock.unlock();
            }
        }
    }

    @Override
    public void unlock() {
        // 刪除自身節點,釋放鎖
        zkClient.delete(currentLockNodePath);
    }

    private boolean acquireLock() throws DistributedLockException {
        // 若是當前未註冊臨時順序節點,則註冊
        if (this.currentLockNodePath == null) {
            this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
        // 獲取順序號
        long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
        // 獲取全部子節點
        List<String> childNodePaths = zkClient.getChildren(resourcePath);
        if (childNodePaths == null || childNodePaths.isEmpty()) {
            throw new DistributedLockException("Not exists child nodes.");
        }
        // 從全部子節點中獲取最小子節點的順序號
        long minSeq = 1000000L;
        int minIndex = -1;
        for (int i = 0; i < childNodePaths.size(); i++) {
            long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
            if (nodeSeq < minSeq) {
                minSeq = nodeSeq;
                minIndex = i;
            }
        }
        // 比較自身順序號與最小序號
        if (lockNodeSeq > minSeq) {
            // 若是存在更小序號,則監控最小序號的子節點
            String minLockNodePath = childNodePaths.get(minIndex);
            zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
                    new ListenerForLockRelease());
            return false;
        }
        // 成功獲取鎖,返回
        return true;
    }

    private long fetchSeqFromNodePath(String nodePath) {
        String seq = nodePath.substring(lockNodePrefix.length());
        return Long.valueOf(seq);
    }

    private class ListenerForLockRelease implements IZkDataListener {

        @Override
        public void handleDataChange(String dataPath, Object data) throws Exception {
        }

        @Override
        public void handleDataDeleted(String dataPath) throws Exception {
            // 若是成功獲取鎖,則通知,讓主線程返回
            if (acquireLock()) {
                waitNotifierLock.lock();
                try {
                    waitNotifier.signal();
                } finally {
                    waitNotifierLock.unlock();
                }
            }
        }
    }
}
相關文章
相關標籤/搜索