zookeeper-分佈式鎖的代碼實現-【每日五分鐘搞定大數據】

本文涉及到幾個zookeeper簡單的知識點,永久節點、有序節點、watch機制。比較基礎,熟悉的就別看了跳過這篇吧
node

  • 每一個線程在/locks節點下建立一個臨時有序節點test_lock_0000000040
  • 得到/locks節點下全部子節點A、B、C,排序得到最小值
  • 若當前節點B爲最小值則得到鎖,執行業務邏輯
  • 若當前節點B不是最小值則watch比本身小1的節點A,節點A存在則await,不然得到鎖

總結:臨時有序節點排序後watch比本身小1的節點。
session

下面看代碼ide

1.線程初始化

建立一個名字爲lockName的永久節點(只有永久節點才能夠建立子節點)this

DistributedLock(String url, String lockName) {
    this.lockName = lockName;
    try {
        zkConn = new ZooKeeper(url, sessionTimeout, this);
        if (zkConn.exists(ROOT_LOCK, false) == null)
            zkConn.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    } catch (IOException | InterruptedException | KeeperException e) {
        e.printStackTrace();
    }
}

複習一下:zookeeper的節點有如下幾種類型:永久,永久有序,臨時,臨時有序url

public enum CreateMode {
    PERSISTENT(0, false, false),
    PERSISTENT_SEQUENTIAL(2, false, true),
    EPHEMERAL(1, true, false),
    EPHEMERAL_SEQUENTIAL(3, true, true);

2.線程嘗試得到鎖tryLock

建立臨時有序節點,會在你的lockName後面加上一串編號,例如/locks/test_lock_0000000035線程

CURRENT_LOCK = zkConn.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

獲取當前lockName下的全部子臨時節點3d

List<String> subNodes = zkConn.getChildren(ROOT_LOCK, false);

把名字都取出來放進list排個序code

List<String> lockObjects = new ArrayList<>();
for (String node : subNodes) {
    String _node = node.split(splitStr)[0];
    if (_node.equals(lockName)) lockObjects.add(node);
}

若當前節點爲最小節點,則直接獲取鎖成功blog

if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
    return true;
}

若不是最小節點,開始等待,見下一步。排序

3.開始等待鎖

得到當前線程須要等待的節點名

  • 得到當前節點的節點名即test_lock_0000000035
  • 得到當前幾點在全部排隊等鎖的線程中的排序
  • 根據排序-1,得到排在本身前面的線程的節點名
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
int prevNodePosition = Collections.binarySearch(lockObjects, prevNode);
WAIT_LOCK = lockObjects.get(prevNodePosition - 1);

給exists加上watcher,監控當前線程等待的節點waitLock是否還存在

若存在即stat不爲null,當前線程先開始await

同時watch線程也啓動了開始監控exists操做

如果exists狀態有變化了(即waitLock不存在了)觸發watch線程的countDown操做。

countDown操做使當前線程結束waiting,得到鎖,開始繼續日後執行

private boolean waitForLock(String waitLock, long waitTime) throws KeeperException, InterruptedException {
    Stat stat = zkConn.exists(ROOT_LOCK + "/" + waitLock, new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    });
    if (stat != null) {
        System.out.println(Thread.currentThread().getName() + " is waiting for " + ROOT_LOCK + "/" + waitLock);
        this.countDownLatch = new CountDownLatch(1);
        this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
        this.countDownLatch = null;
        System.out.println(Thread.currentThread().getName() + " get the lock ");
    }
    return true;
}

完整的代碼能夠在個人公衆號後臺回覆9得到,感謝閱讀。

相關文章
相關標籤/搜索