zookeeper分佈式鎖(代碼實現)

前言

這裏是zookeeper響應式編程的第二篇——自定義分佈式鎖,第一篇zookeeper分佈式註冊配置中心見以下連接:
https://segmentfault.com/a/11...

分佈式鎖

因爲在分佈式系統中,任意2個節點的線程須要獲取同一個資源的時候就須要鎖來維持程序的正確運行,可是呢,若是使用JVM提供的鎖只能鎖住本身,由於這是2臺主機,這就引入了分佈式鎖的概念。也便是說須要一把鎖在主機的外面,而不少框架均可以實現分佈式鎖,好比redis,mysql和zookeeper,目前最爲方便的是使用zookeeper,由於就其高可用性和統一視圖而言就比其餘的技術方便不少。java

對於zookeeper作分佈式鎖的分析過程以下:首先對於2臺主機搶佔同一把鎖的時候,只能有一臺主機成功搶佔,那麼有可能出現得到鎖的主機「掛了」,那麼咱們可使用臨時節點解決該問題,那麼在一個主機成功搶佔該鎖以後,若是它釋放了鎖,其餘主機是如何知道它已經成功釋放了呢?第一種方式就是能夠採用主動輪詢的方式不斷檢測該鎖是否已經被釋放,可是這種方式有延遲,而且在主機特別多的時候多臺主機輪詢一把鎖會形成zookeeper很大的壓力。第二種方式就是使用watch機制,這種方式能夠解決延遲的問題,可是在得到鎖的主機釋放鎖的時候,zookeeper會回調哪些全部沒有搶到鎖的線程,而後那些主機又會發起強鎖的操做,會形成較大的通訊壓力。第三種方式就可使用watche機制+序列節點,而後讓每個臨時序列節點都watch前一個節點,這樣只有一個編號最小的才能得到鎖,而且在釋放鎖後會只通知後面的一個主機。mysql

代碼實現

首選咱們須要在編寫配置中心的Utils工具類,而且建立TestLock類實現分佈式鎖。而後咱們開闢10個線程模擬多臺主機搶佔鎖的過程,基本流程就是搶佔鎖,而後執行業務代碼(這裏使用睡眠來代替),最後再釋放鎖。redis

Utils代碼以下:
package org.qzx.config;

import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 1:02 下午
 * @Description: org.qzx.config
 * @version: 1.0
 */
public class Utils {
    // zookeeper對象
    private static ZooKeeper zooKeeper;
    // 鏈接地址
    private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
    private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    // 鎖
    private static CountDownLatch latch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception{
        zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
        defaultWatcher.setLatch(latch);
        latch.await();
        return zooKeeper;
    }
}
TestLock大致框架以下:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    // 搶佔鎖

                    // 業務代碼
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 釋放鎖
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
}

咱們在這裏提供另一個工具類能夠爲每個線程實現搶鎖和釋放鎖的過程,同時因爲搶佔的鎖其實是zookeeper的臨時序列節點,因此一定會使用wather和回調機制,這裏就把這個工具類叫作MyWatcherAndCallBack,該類提供搶佔鎖、釋放鎖,節點變化回調方法。其大致框架以下:sql

package org.qzx.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher {
    private ZooKeeper zooKeeper;

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    @Override
    public void process(WatchedEvent event) {

    }

    // 搶佔鎖
    public void tryLock(){
        
    }

    // 釋放鎖
    public void unlock(){

    }
}
TestLock的代碼也能夠進行稍微的修改
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 搶佔鎖
                    myWatcherAndCallBack.tryLock();
                    // 業務代碼
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 釋放鎖
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
這樣框架就已經搭建完畢,接下來就是編寫具體的搶佔鎖和釋放的邏輯代碼了。

首先對於搶佔鎖的過程必定是阻塞的,直到搶佔成功的時候纔會接着往下走,這裏使用CountDownLatch實現。而每個線程都會建立屬於本身的臨時序列節點做爲本身的鎖,不過只有編號最小的那個纔會得到被對應的線程所佔有,其餘的線程在建立節點後都會阻塞。這裏爲了方便看到那些線程建立了哪些鎖,將線程的名字做爲數據寫入到節點中。而後咱們在建立節點的回調函數中輸出當前線程的名字和節點的名字,目的是爲了檢驗代碼寫到如今是否正確。apache

MyWatcherAndCallBack代碼以下:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 搶佔鎖
                    myWatcherAndCallBack.tryLock();
                    // 業務代碼
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 釋放鎖
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        while (true){

        }
    }
}

爲了防止主線程執行太快致使回調函數尚未執行完畢就結束,在TestLock方法最後加上死循環進行阻塞。編程

@Test
public void TestLock(){
    for (int i = 0; i < 10; i++) {
        new Thread(()->{
            MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
            myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
            myWatcherAndCallBack.setZooKeeper(zooKeeper);
            try {
                // 搶佔鎖
                myWatcherAndCallBack.tryLock();
                // 業務代碼
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                // 釋放鎖
                myWatcherAndCallBack.unlock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    while (true){

    }
}

啓動測試,在測試前記得建立工做目錄/test,結果以下:
image.png
能夠看到節點建立的是有序的,可是線程是無序的。
接下來在建立節點成功的回調函數中,咱們就須要獲取鎖了,使用getChildren方法得到工做目錄下的孩子節點,也就是建立的臨時序列節點,該方法不須要使用watch機制,由於不須要監測父節點,同時對於其回調對象咱們也是一樣封裝在MyWatcherAndCallBack中。最後因爲建立節點的名字在後面會用到,使用pathName屬性保存當前線程建立的節點名字。segmentfault

MyWatcherAndCallBack修改後代碼以下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {

    }

    // 搶佔鎖
    public void tryLock(){
        try {
            // 建立一個臨時序列節點做爲鎖
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 釋放鎖
    public void unlock(){

    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似於/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        
    }
}

在getChildren的回調方法中,當前線程必定成功建立了節點,而且能夠看到全部在它以前建立的節點。那麼咱們如今遍歷輸出全部的children中的全部節點,目的是爲了看到當前線程所看到的全部節點是無序的,這樣就爲後面須要排序提供了必要性。修改的部分代碼以下:框架

//Children2Callback
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    System.out.println(threadName+"能看到的節點以下:");
    for (String child : children) {
        System.out.println(child);
    }
}

輸出結果爲:
image.png
能夠看到第一個建立節點的線程爲Thread-6,而且看到的節點都是無序的。而且節點的名字少了個/。
接下來就是對於當前線程,得判斷它建立的鎖是否是第一個,因此咱們先對children進行排序,而後再獲取當前鎖在children的位置,若是是第一個說明該線程能夠得到鎖,執行latch.countdown(),這樣該線程就能夠去執行相應的任務了。若是不是第一個,那麼就得判斷前一個鎖是否已經釋放,判斷的方法爲exists,若是前面一個節點不存在了,說明已經釋放,對於exists方法有可能會出現尚未成功監控到前一個節點就出現釋放鎖的狀況,也就是exists執行失敗了,沒能監控前一個節點,那麼說明鎖已經釋放,當前線程所須要進行的操做不在watcher中執行而是在回調函數中中執行,因此在這裏exists的回調函數是必須的。分佈式

MyWatcherAndCallBack修改後的代碼以下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {

    }

    // 搶佔鎖
    public void tryLock(){
        try {
            // 建立一個臨時序列節點做爲鎖
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 釋放鎖
    public void unlock(){

    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似於/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 當前線程建立的鎖是第一個,能夠得到
            latch.countDown();
        }else {
            // 不是第一個,得判斷前一個鎖是否已經釋放
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }
}

接下來須要對前一把鎖的釋放事件作處理,首先是在節點刪除後,會觸發節點刪除時間,在該線程中會作出響應,具體作法就是要麼直接latch.coutdown()得到鎖或者經過getChildren判斷當前鎖是不是第一個了,是第一個就得到鎖。同時對於當前線程得到鎖後釋放鎖進行處理,直接對其建立的節點進行刪除便可。ide

MyWatcherAndCallBack修改後的代碼以下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把鎖被刪除,當前線程得到鎖
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 搶佔鎖
    public void tryLock(){
        try {
            // 建立一個臨時序列節點做爲鎖
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 釋放鎖
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);// -1表明忽略版本號
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似於/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 當前線程建立的鎖是第一個,能夠得到
            latch.countDown();
        }else {
            // 不是第一個,得判斷前一個鎖是否已經釋放
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }
}

咱們如今運行程序能夠看到每個線程均可以得到鎖而且順序執行。
image.png
對於上述代碼,存在一個問題,當前去除主線程業務代碼中睡眠的操做,就會出現只有一個線程能夠成功得到鎖而且執行響應的操做,其餘線程會出現相似於死鎖的現象,可是這裏不是死鎖。這裏的緣由是執行速度太快了,很快就把當前線程得到的鎖刪除了,那麼後面的線程在執行完排序,監控前面的鎖就會出現失敗的狀況,這裏的一種解決方法就是在exists的回調函數中針對節點不存在,也就是stat==null的時候,從新調用getChildren方法判斷當前是不是第一把鎖,若是是就會執行。

MyWatcherAndCallBack修改後的代碼以下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把鎖被刪除,當前線程得到鎖
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 搶佔鎖
    public void tryLock(){
        try {
            // 建立一個臨時序列節點做爲鎖
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 釋放鎖
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似於/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 當前線程建立的鎖是第一個,能夠得到
            latch.countDown();
        }else {
            // 不是第一個,得判斷前一個鎖是否已經釋放
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat==null){
            // 監控失敗,自動獲取鎖
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }
}

在主線程的睡眠操做去除掉後,程序運行的結果以下:
image.png
能夠看到全部線程又重新正常運行了。
到此,zookeeper自定義分佈式鎖的小demo就編寫完畢。對於zookeeper分佈式鎖的全部代碼整理以下。

Utils類:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.qzx.config.DefaultWatcher;

import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 1:02 下午
 * @Description: org.qzx.config
 * @version: 1.0
 */
public class Utils {
    // zookeeper對象
    private static ZooKeeper zooKeeper;
    // 鏈接地址
    private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
    private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    // 鎖
    private static CountDownLatch latch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception{
        zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
        defaultWatcher.setLatch(latch);
        latch.await();
        return zooKeeper;
    }
}
TestLock類:
package org.qzx.lock;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 搶佔鎖
                    myWatcherAndCallBack.tryLock();
                    // 業務代碼
//                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 釋放鎖
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        while (true){

        }
    }
}
MyWatcherAndCallBack類:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把鎖被刪除,當前線程得到鎖
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 搶佔鎖
    public void tryLock(){
        try {
            // 建立一個臨時序列節點做爲鎖
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 釋放鎖
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似於/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 當前線程建立的鎖是第一個,能夠得到
            latch.countDown();
        }else {
            // 不是第一個,得判斷前一個鎖是否已經釋放
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat==null){
            // 監控失敗,自動獲取鎖
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }
}
相關文章
相關標籤/搜索