Zookeeper分佈式鎖

💛分佈式解決方案源碼,請幫我點個star哦!
💛原文地址爲http://www.javashuo.com/article/p-gquycsdz-c.html,轉載請註明出處!html

zookeeper客戶端選型

  • 原生zookeeper客戶端,有watcher一次性、無超時重連機制等一系列問題
  • ZkClient,解決了原生客戶端一些問題,一些存量老系統中還在使用
  • curator,提供了各類應用場景(封裝了分佈式鎖,計數器等),新項目首選

分佈式鎖使用場景

在單體項目中jvm中的鎖便可完成須要,可是微服務、分佈式環境下,同一個服務可能部署在多臺服務器上,多個jvm之間沒法經過經常使用的jvm鎖來完成同步操做,須要借用分佈式鎖來完成上鎖、釋放鎖。例如在訂單服務中,咱們須要根據日期來生成訂單號流水,就有可能產生相同的時間日期,從而出現重複訂單號。(jdk8使用LocalDateTime線程安全,不會存在這樣的問題)java

zookeeper分佈式鎖實現原理

  • zookeeper中規定,在同一時刻,不能有多個客戶端建立同一個節點,咱們能夠利用這個特性實現分佈式鎖。zookeeper臨時節點只在session生命週期存在,session一結束會自動銷燬。
  • watcher機制,在表明鎖資源的節點被刪除,便可以觸發watcher解除阻塞從新去獲取鎖,這也是zookeeper分佈式鎖較其餘分佈式鎖方案的一大優點。

基於臨時節點方案

第一種方案實現較爲簡單,邏輯就是誰建立成功該節點,誰就持有鎖,建立失敗的本身進行阻塞,A線程先持有鎖,B線程獲取失敗就會阻塞,同時對/lockPath設置監聽,A線程執行完操做後刪除節點,觸發監聽器,B線程此時解除阻塞,從新去獲取鎖。node

咱們模仿原生jdk的lock接口設計,採用模板方法設計模式來編寫分佈式鎖,這樣的好處是擴展性強,咱們能夠快速切換到redis分佈式鎖、數據庫分佈式鎖等實現方式。git

建立Lock接口github

public interface Lock {
    /**
     * 獲取鎖
     */
    void getLock() throws Exception;

    /**
     * 釋放鎖
     */
    void unlock() throws Exception;
}

AbstractTemplateLock抽象類redis

public abstract class AbstractTemplateLock implements Lock {
    @Override
    public void getLock() {
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "獲取鎖成功");
        } else {
            //等待
            waitLock();//事件監聽 若是節點被刪除則能夠從新獲取
            //從新獲取
            getLock();
        }
    }
    protected abstract void waitLock();
    protected abstract boolean tryLock();
    protected abstract void releaseLock();
    @Override
    public void unlock() {
        releaseLock();
    }
}

zookeeper分佈式鎖邏輯數據庫

@Slf4j
public class ZkTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;

    private static final String lockPath = "/lockPath";


    private ZkClient client;

    public ZkTemplateLock() {
        client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
        log.info("zk client 鏈接成功:{}",zkServers);
    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);

        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("監聽到節點被刪除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //完成 watcher 註冊
        client.subscribeDataChanges(lockPath, listener);

        //阻塞本身
        if (client.exists(lockPath)) {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher註冊
        client.unsubscribeDataChanges(lockPath, listener);
    }

    @Override
    protected boolean tryLock() {
        try {
            client.createEphemeral(lockPath);
            System.out.println(Thread.currentThread().getName()+"獲取到鎖");
        } catch (Exception e) {
            log.error("建立失敗");
            return false;
        }
        return true;
    }

    @Override
    public void releaseLock() {
       client.delete(this.lockPath);
    }
}

缺點設計模式

每次去競爭鎖,都只會有一個線程拿到鎖,當線程數龐大時會發生「驚羣」現象,zookeeper節點可能會運行緩慢甚至宕機。這是由於其餘線程沒獲取到鎖時都會監聽/lockPath節點,當A線程釋放完畢,海量的線程都同時中止阻塞,去爭搶鎖,這種操做十分耗費資源,且性能大打折扣。安全

基於臨時順序節點方案

臨時順序節點與臨時節點不一樣的是產生的節點是有序的,咱們能夠利用這一特色,只讓當前線程監聽上一序號的線程,每次獲取鎖的時候判斷本身的序號是否爲最小,最小即獲取到鎖,執行完畢就刪除當前節點繼續判斷誰爲最小序號的節點。
服務器

臨時順序節點操做源碼

@Slf4j
public class ZkSequenTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;
    private static final String lockPath = "/lockPath";
    private String beforePath;
    private String currentPath;
    private ZkClient client;

    public ZkSequenTemplateLock() {
        client = new ZkClient(zkServers);
        if (!client.exists(lockPath)) {
            client.createPersistent(lockPath);

        }
        log.info("zk client 鏈接成功:{}",zkServers);

    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("監聽到節點被刪除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //給排在前面的節點增長數據刪除的watcher,本質是啓動另外一個線程去監聽上一個節點
        client.subscribeDataChanges(beforePath, listener);

        //阻塞本身
        if (client.exists(beforePath)) {
            try {
                System.out.println("阻塞"+currentPath);
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher註冊
        client.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    protected boolean tryLock() {
        if (currentPath == null) {
            //建立一個臨時順序節點
            currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
            System.out.println("current:" + currentPath);
        }

        //得到全部的子節點並排序。臨時節點名稱爲自增加的字符串
        List<String> childrens = client.getChildren(lockPath);
        //排序list,按天然順序排序
        Collections.sort(childrens);
        if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
            return true;
        } else {
            //若是當前節點不是排第一,則獲取前面一個節點信息,賦值給beforePath
            int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
            beforePath = lockPath + "/" + childrens.get(curIndex - 1);
        }
        System.out.println("beforePath"+beforePath);
        return false;
    }

    @Override
    public void releaseLock() {
        System.out.println("delete:" + currentPath);
        client.delete(currentPath);
    }
}

Curator分佈式鎖工具

curator提供瞭如下種類的鎖:

  • 共享可重入鎖(Shared Reentrant Lock):全局同步鎖,同一時間不會有兩個客戶端持有一個鎖
  • 共享鎖:與共享可重入鎖相似,可是不可重入(有時候會由於這個緣由形成死鎖)
  • 共享可重入讀寫鎖
  • 共享信號量
  • Multi Shared Lock:管理多種鎖的容器實體

咱們採用第一種Shared Reentrant Lock中的InterProcessMutex來完成上鎖、釋放鎖的的操做

public class ZkLockWithCuratorTemplate implements Lock {
    // zk host地址
    private String host = "localhost";

    // zk自增存儲node
    private String lockPath = "/curatorLock";

    // 重試休眠時間
    private static final int SLEEP_TIME_MS = 1000;
    // 最大重試1000次
    private static final int MAX_RETRIES = 1000;
    //會話超時時間
    private static final int SESSION_TIMEOUT = 30 * 1000;
    //鏈接超時時間
    private static final int CONNECTION_TIMEOUT = 3 * 1000;
		//curator核心操做類
    private CuratorFramework curatorFramework;

    InterProcessMutex lock;

   public ZkLockWithCuratorTemplate() {
       curatorFramework = CuratorFrameworkFactory.builder()
               .connectString(host)
               .connectionTimeoutMs(CONNECTION_TIMEOUT)
               .sessionTimeoutMs(SESSION_TIMEOUT)
               .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES))
               .build();
       curatorFramework.start();
       lock = new InterProcessMutex (curatorFramework, lockPath);
    }

    @Override
    public void getLock() throws Exception {
        //5s後超時釋放鎖
         lock.acquire(5, TimeUnit.SECONDS);
    }

    @Override
    public void unlock() throws Exception {
        lock.release();
    }
}

源碼以及測試類地址

https://github.com/Motianshi/distribute-tool
相關文章
相關標籤/搜索