跟着小白學zookeeper: 分佈式鎖的實現

前言

最近小白在作一個系統功能時,發現有個方法是須要作同步的,but,生產環境中項目的部署是多個tomcat作集羣的,而簡單的使用synchronized加鎖只是針對同一個JVM進程中的多線程實現同步,對於跨進程的同步沒法達到統一加鎖的目的。因而,小白便想到了分佈式鎖。前段時間恰好看到一幅有意思的漫畫,其中就提到Zookeeper被設計的初衷,就是利用臨時順序節點,能夠輕鬆實現分佈式鎖,便研究了下利用zk實現分佈式鎖。本文只研究了zk的基本特性以及使用java實現一個簡單的分佈式鎖,若有錯誤,歡迎拍磚,另外稍微白話,不喜勿噴。java

假設背景

假設小白的系統生產環境上部署了2臺tomcat(t1 和 t2),而同一時間用戶A、B的請求恰好分別由t1和t2進行響應處理,用戶A、B的請求都須要調用方法m做相關處理(對共享數據的處理),爲了保證數據的準確性,小白但願一個時間點只有一個線程能夠執行方法m,也就是說t1中有線程執行m時,t一、t2的其餘線程都不能執行m,直至那個線程對m調用結束。node

思考方案

單機環境下如何實現同步的?可使用synchronized或是ReentrantLock實現,究其原理也是存在一個鎖標誌變量,線程每次要執行同步代碼時先去查看該標誌是否已經被其餘線程佔有,如果則阻塞等待其餘線程釋放鎖,若不是則設置標誌後執行(此處只是簡單描述,具體原理博大精深)。apache

爲什麼跨進程就不行了呢?由於同一個進程內,鎖是全部這個進程內全部線程均可以訪問的,可是其餘進程中的線程時訪問不了的。OK,那隻要提供一個全部進程內線程均可見的鎖標誌,問題就解決咯。so,zookeeper就能夠充當第三方進程,對須要管理的進程開放訪問權限,全部須要跨進程同步的代碼在被執行前,都須要先來我大zk這裏查看是否能夠執行。tomcat

1、動手前多問幾個問題

爲何zookeeper能夠實現分佈式鎖?

多個進程內同一時間都有線程在執行方法m,鎖就一把,你得到了鎖得以執行,我就得被阻塞,那你執行完了誰來喚醒我呢?你並不知道我被阻塞了,你也就不能通知我「嗨,小白,我用完了,你用吧」。你能作的只有用的時候設置鎖標誌,用完了再取消你設置的標誌。我就必須在阻塞的時候隔一段時間主動去看看,但這樣總歸是有點麻煩的,最好有人來通知我能夠執行了。zookeeper對於自身節點的監聽者提供事件通知功能,是否是有點雪中送炭的感受呢。bash

節點是什麼? 節點是zookeeper中數據存儲的基礎結構,zk中萬物皆節點,就比如java中萬物皆對象是同樣的。zk的數據模型就是基於好多個節點的樹結構,但zk規定每一個節點的引用規則是路徑引用。每一個節點中包含子節點引用、存儲數據、訪問權限以及節點元數據等四部分。session

zk中節點有類型區分嗎? 有。zk中提供了四種類型的節點,各類類型節點及其區別以下:多線程

  • 持久節點(PERSISTENT):節點建立後,就一直存在,直到有刪除操做來主動清除這個節點
  • 持久順序節點(PERSISTENT_SEQUENTIAL):保留持久節點的特性,額外的特性是,每一個節點會爲其第一層子節點維護一個順序,記錄每一個子節點建立的前後順序,ZK會自動爲給定節點名加上一個數字後綴(自增的),做爲新的節點名。
  • 臨時節點(EPHEMERAL):和持久節點不一樣的是,臨時節點的生命週期和客戶端會話綁定,固然也能夠主動刪除。
  • 臨時順序節點(EPHEMERAL_SEQUENTIAL):保留臨時節點的特性,額外的特性如持久順序節點的額外特性。

如何操做節點? 節點的增刪改查分別是creat\delete\setData\getData,exists判斷節點是否存在,getChildren獲取全部子節點的引用。分佈式

上面提到了節點的監聽者,咱們能夠在對zk的節點進行查詢操做時,設置當前線程是否監聽所查詢的節點。getData、getChildren、exists都屬於對節點的查詢操做,這些方法都有一個boolean類型的watch參數,用來設置是否監聽該節點。一旦某個線程監聽了某個節點,那麼這個節點發生的creat(在該節點下新建子節點)、setData、delete(刪除節點自己或是刪除其某個子節點)都會觸發zk去通知監聽該節點的線程。但須要注意的是,線程對節點設置的監聽是一次性的,也就是說zk通知監聽線程後須要改線程再次設置監聽節點,不然該節點再次的修改zk不會再次通知。ide

zookeeper具有了實現分佈式鎖的基礎條件:多進程共享、能夠存儲鎖信息、有主動通知的機制。工具

怎麼使用zookeeper實現分佈式鎖呢?

分佈式鎖也是鎖,沒什麼牛的,它也須要一個名字來告訴別人本身管理的是哪塊同步資源,也一樣須要一個標識告訴別人本身如今是空閒仍是被使用。zk中,須要建立一個專門的放鎖的節點,而後各類鎖節點都做爲該節點的子節點方便管理,節點名稱用來代表本身管理的同步資源。那麼鎖標識呢?

方案一:使用節點中的存儲數據區域,zk中節點存儲數據的大小不能超過1M,可是隻是存放一個標識是足夠的。線程得到鎖時,先檢查該標識是不是無鎖標識,如果可修改成佔用標識,使用完再恢復爲無鎖標識。

方案二:使用子節點,每當有線程來請求鎖的時候,便在鎖的節點下建立一個子節點,子節點類型必須維護一個順序,對子節點的自增序號進行排序,默認老是最小的子節點對應的線程得到鎖,釋放鎖時刪除對應子節點即可。

死鎖風險

兩種方案其實都是可行的,可是使用鎖的時候必定要去規避死鎖。方案一看上去是沒問題的,用的時候設置標識,用完清除標識,可是要是持有鎖的線程發生了意外,釋放鎖的代碼沒法執行,鎖就沒法釋放,其餘線程就會一直等待鎖,相關同步代碼便沒法執行。方案二也存在這個問題,但方案二能夠利用zk的臨時順序節點來解決這個問題,只要線程發生了異常致使程序中斷,就會丟失與zk的鏈接,zk檢測到該連接斷開,就會自動刪除該連接建立的臨時節點,這樣就能夠達到即便佔用鎖的線程程序發生意外,也能保證鎖正常釋放的目的。

那要是zk掛了怎麼辦?sad,zk要是掛了就沒轍了,由於線程都沒法連接到zk,更何談獲取鎖執行同步代碼呢。不過,通常部署的時候,爲了保證zk的高可用,都會使用多個zk部署爲集羣,集羣內部一主多從,主zk一旦掛掉,會馬上經過選舉機制有新的主zk補上。zk集羣掛了怎麼辦?很差意思,除非全部zk同時掛掉,zk集羣纔會掛,機率超級小。

2、開始動手搞一搞

要什麼東西

  1. 須要一個鎖對象,每次建立這個鎖對象的時候須要鏈接zk(也可將鏈接操做放在加鎖的時候);
  2. 鎖對象須要提供一個加鎖的方法;
  3. 鎖對象須要提供一個釋放鎖的方法;
  4. 鎖對象須要監聽zk節點,提供接收zk通知的回調方法。

實現分析

  1. 構造器中,建立zk鏈接,建立鎖的根節點,相關API以下:

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    建立zk鏈接。該構造器要求傳入三個參數分別是:ip:端口(String)、會話超時時間、本次鏈接的監聽器。
    public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 建立節點。參數:節點路徑、節點數據、權限策略、節點類型

  2. 加鎖時,首先須要在鎖的根節點下建立一個臨時順序節點(該節點名稱規則統一,由zk拼接自增序號),而後獲取根節點下全部子節點,將節點根據自增序號進行排序,判斷最小的節點是否爲本次加鎖建立的節點,如果,加鎖成功,若否,阻塞當前線程,等待鎖釋放(阻塞線程可使用)。相關API以下:

    public List<String> getChildren(String path, boolean watch)
    獲取某節點的全部子節點。參數:節點路徑、是否監控該節點

  3. 釋放鎖時,刪除線程建立的子節點,同時關閉zk鏈接。相關API以下:

    public void delete(String path, int version)
    刪除指定節點。參數:節點路徑、數據版本號
    public synchronized void close()
    斷開zk連接

  4. 監聽節點。首先須要明確監聽哪一個節點,咱們能夠監聽鎖的根節點,這樣每當有線程釋放鎖刪除對應子節點時,zk就會通知監聽線程,有鎖被釋放了,這個時候只須要獲取根節點的全部子節點,根據自增序號判斷本身對應的節點是否爲最小,即可知道本身可否獲取鎖。可是上述作法很明顯有一點不太好,只要有子節點被移除,zk就會從新通知全部等待鎖的線程。得到不到鎖的線程接收到通知後發現本身還需等待,又得從新設置監聽再次等待。因爲咱們要採用臨時有序節點,該類型節點的特性就是有序,那麼就能夠只監聽上一個節點,也就是等待被移除的節點,這樣能夠保證接到通知時,就是對應子節點時最小,能夠得到鎖的時候。在實現分佈式鎖的時候,線程加鎖時若是不能立馬得到鎖,便會被經過特定方式阻塞,那麼既然接到通知時即是能夠得到鎖的時候,那麼對應的操做就應該是恢復線程的執行,取消阻塞

    zk提供了Watcher接口,鎖對象須要監聽zk中上一個節點,便須要實現該接口。Watcher接口內部包含封裝了事件類型和鏈接類型的Event接口,還提供了惟一一個須要實現的方法。
    void process(WatchedEvent var1)
    該方法即是用來接收zk通知的回調方法。參數爲監聽節點發生的事件。當監聽器監聽的節點發生變化時,zk會通知監聽者,同時該方法被執行,參數即是zk通知的信息。

開寫代碼

雖然是一個簡單的分佈式鎖的實現,代碼也有點略長。建議跟小白同樣從零開始瞭解分佈式鎖實現的朋友,先從上面的大步驟分析簡單思考下每一個方法內部的具體實現再看代碼,印象更爲深入,理解也更容易。若有不一樣思路,歡迎留言討論。代碼中判斷加鎖的方法中,使用分隔符字符串是爲了區分各個資源的鎖。項目中有臨界資源A和B,那麼管理A的鎖釋放與否,跟線程要持有管理B的鎖是沒有關係的。固然,也能夠每一類鎖單獨創建獨立的根節點。

public class ZooKeeperLock implements Watcher {

    private ZooKeeper zk = null;
    private String rootLockNode;            // 鎖的根節點
    private String lockName;                // 競爭資源,用來生成子節點名稱
    private String currentLock;             // 當前鎖
    private String waitLock;                // 等待的鎖(前一個鎖)
    private CountDownLatch countDownLatch;  // 計數器(用來在加鎖失敗時阻塞加鎖線程)
    private int sessionTimeout = 30000;     // 超時時間
    
    // 1. 構造器中建立ZK連接,建立鎖的根節點
    public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
        this.rootLockNode = rootLockNode;
        this.lockName = lockName;
        try {
            // 建立鏈接,zkAddress格式爲:IP:PORT
            zk = new ZooKeeper(zkAddress,this.sessionTimeout,this);
            // 檢測鎖的根節點是否存在,不存在則建立
            Stat stat = zk.exists(rootLockNode,false);
            if (null == stat) {
                zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    // 2. 加鎖方法,先嚐試加鎖,不能加鎖則等待上一個鎖的釋放
    public boolean lock() {
        if (this.tryLock()) {
            System.out.println("線程【" + Thread.currentThread().getName() + "】加鎖(" + this.currentLock + ")成功!");
            return true;
        } else {
            return waitOtherLock(this.waitLock, this.sessionTimeout);
        }
    }
    
    public boolean tryLock() {
        // 分隔符
        String split = "_lock_";
        if (this.lockName.contains("_lock_")) {
            throw new RuntimeException("lockName can't contains '_lock_' ");
        }
        try {
            // 建立鎖節點(臨時有序節點)
            this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("線程【" + Thread.currentThread().getName() 
                        + "】建立鎖節點(" + this.currentLock + ")成功,開始競爭...");
            // 取全部子節點
            List<String> nodes = zk.getChildren(this.rootLockNode, false);
            // 取全部競爭lockName的鎖
            List<String> lockNodes = new ArrayList<String>();
            for (String nodeName : nodes) {
                if (nodeName.split(split)[0].equals(this.lockName)) {
                    lockNodes.add(nodeName);
                }
            }
            Collections.sort(lockNodes);
            // 取最小節點與當前鎖節點比對加鎖
            String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
            if (this.currentLock.equals(currentLockPath)) {
                return true;
            }
            // 加鎖失敗,設置前一節點爲等待鎖節點
            String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
            int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
            this.waitLock = lockNodes.get(preNodeIndex);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitOtherLock(String waitLock, int sessionTimeout) {
        boolean islock = false;
        try {
            // 監聽等待鎖節點
            String waitLockNode = this.rootLockNode + "/" + waitLock;
            Stat stat = zk.exists(waitLockNode,true);
            if (null != stat) {
                System.out.println("線程【" + Thread.currentThread().getName() 
                            + "】鎖(" + this.currentLock + ")加鎖失敗,等待鎖(" + waitLockNode + ")釋放...");
                // 設置計數器,使用計數器阻塞線程
                this.countDownLatch = new CountDownLatch(1);
                islock = this.countDownLatch.await(sessionTimeout,TimeUnit.MILLISECONDS);
                this.countDownLatch = null;
                if (islock) {
                    System.out.println("線程【" + Thread.currentThread().getName() + "】鎖(" 
                                + this.currentLock + ")加鎖成功,鎖(" + waitLockNode + ")已經釋放");
                } else {
                    System.out.println("線程【" + Thread.currentThread().getName() + "】鎖(" 
                                + this.currentLock + ")加鎖失敗...");
                }
            } else {
                islock = true;
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return islock;
    }
    
    // 3. 釋放鎖
    public void unlock() throws InterruptedException {
        try {
            Stat stat = zk.exists(this.currentLock,false);
            if (null != stat) {
                System.out.println("線程【" + Thread.currentThread().getName() + "】釋放鎖 " + this.currentLock);
                zk.delete(this.currentLock, -1);
                this.currentLock = null;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } finally {
            zk.close();
        }
    }
    
    // 4. 監聽器回調
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
            // 計數器減一,恢復線程操做
            this.countDownLatch.countDown();
        }
    }
}
複製代碼

測試類以下:

public class Test {
    public static void doSomething() {
        System.out.println("線程【" + Thread.currentThread().getName() + "】正在運行...");
    }

    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            public void run() {
                ZooKeeperLock lock = null;
                lock = new ZooKeeperLock("10.150.27.51:2181","/locks", "test1");
                if (lock.lock()) {
                    doSomething();
                    try {
                        Thread.sleep(1000);
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}
複製代碼

這裏啓動了5個線程來進行驗證,輸出結果以下。須要注意的是,子節點的建立順序必定是從小到大的,可是下面輸出結果中顯示建立順序的隨機是因爲建立節點和輸出語句不是原子操做致使的。重點是鎖的獲取和釋放,從輸出結果中能夠看出,每一個線程只有在上一個節點被刪除後才能執行。ok,一個基於zk的簡單的分佈式鎖就實現了。

線程【Thread-3】建立鎖節點(/locks/test1_lock_0000000238)成功,開始競爭...
線程【Thread-2】建立鎖節點(/locks/test1_lock_0000000237)成功,開始競爭...
線程【Thread-1】建立鎖節點(/locks/test1_lock_0000000236)成功,開始競爭...
線程【Thread-0】建立鎖節點(/locks/test1_lock_0000000240)成功,開始競爭...
線程【Thread-4】建立鎖節點(/locks/test1_lock_0000000239)成功,開始競爭...
線程【Thread-1】加鎖(/locks/test1_lock_0000000236)成功!
線程【Thread-1】正在運行...
線程【Thread-3】鎖(/locks/test1_lock_0000000238)加鎖失敗,等待鎖(/locks/test1_lock_0000000237)釋放...
線程【Thread-2】鎖(/locks/test1_lock_0000000237)加鎖失敗,等待鎖(/locks/test1_lock_0000000236)釋放...
線程【Thread-0】鎖(/locks/test1_lock_0000000240)加鎖失敗,等待鎖(/locks/test1_lock_0000000239)釋放...
線程【Thread-4】鎖(/locks/test1_lock_0000000239)加鎖失敗,等待鎖(/locks/test1_lock_0000000238)釋放...
線程【Thread-1】釋放鎖 /locks/test1_lock_0000000236
線程【Thread-2】鎖(/locks/test1_lock_0000000237)加鎖成功,鎖(/locks/test1_lock_0000000236)已經釋放
線程【Thread-2】正在運行...
線程【Thread-2】釋放鎖 /locks/test1_lock_0000000237
線程【Thread-3】鎖(/locks/test1_lock_0000000238)加鎖成功,鎖(/locks/test1_lock_0000000237)已經釋放
線程【Thread-3】正在運行...
線程【Thread-3】釋放鎖 /locks/test1_lock_0000000238
線程【Thread-4】鎖(/locks/test1_lock_0000000239)加鎖成功,鎖(/locks/test1_lock_0000000238)已經釋放
線程【Thread-4】正在運行...
線程【Thread-4】釋放鎖 /locks/test1_lock_0000000239
線程【Thread-0】鎖(/locks/test1_lock_0000000240)加鎖成功,鎖(/locks/test1_lock_0000000239)已經釋放
線程【Thread-0】正在運行...
線程【Thread-0】釋放鎖 /locks/test1_lock_0000000240
複製代碼

3、別人造好的輪子

話說zookeeper紅火了這麼久,就沒有幾個牛逼的人物去開源一些好用的工具,還須要本身這麼費勁去寫分佈式鎖的實現?是的,有的,上面小白也只是爲了加深本身對zk實現分佈式鎖的理解去嘗試作一個簡單實現。有個叫Jordan Zimmerman的牛人提供了Curator來更好地操做zookeeper。

curator的分佈式鎖

curator提供了四種分佈式鎖,分別是:

curator的四種鎖方案

  • InterProcessMutex:分佈式可重入排它鎖
  • InterProcessSemaphoreMutex:分佈式排它鎖
  • InterProcessReadWriteLock:分佈式讀寫鎖
  • InterProcessMultiLock:將多個鎖做爲單個實體管理的容器

pom依賴:

<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
複製代碼

這裏使用InterProcessMutex,即分佈式可重入排他鎖,用法以下:

// 設置重試策略,建立zk客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.150.27.51:2181",retryPolicy);
// 啓動客戶端
client.start();
// 建立分佈式可重入排他鎖,監聽客戶端爲client,鎖的根節點爲/locks
InterProcessMutex mutex = new InterProcessMutex(client,"/locks");
try {
    // 加鎖
    if (mutex.acquire(3,TimeUnit.SECONDS)) {
        // TODO-同步操做
        //釋放鎖
        mutex.release();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    client.close();
}
複製代碼

InterProcessMutex源碼解讀

InterProcessMutex改造器較多,這裏就不展現改造器源碼了,建議感興趣的朋友本身看看。InterProcessMutex內部有個ConcurrentMap類型的threadData屬性,該屬性會以線程對象爲鍵,線程對應的LcokData對象爲值,記錄每一個鎖的相關信息。在new一個InterProcessMutex實例時,其構造器主要是爲threadData進行Map初始化,校驗鎖的根節點的合法性並使用basePath屬性記錄,此外還會實例化一個LockInternals對象由屬性internals引用,LockInternalsInterProcessMutex加鎖的核心。

加鎖

// InterProcessMutex.class
    public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
    
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return this.internalLock(time, unit);
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData != null) {
            // 鎖的可重入性
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            // 加鎖並返回鎖節點
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }
複製代碼

加鎖提供了兩個接口,分別爲不設置超時和設置超時。不設置超時的話,線程等待鎖時會一直阻塞,直到獲取到鎖。無論哪一個加鎖接口,都調用了internalLock()方法。這個方法裏的代碼體現了鎖的可重入性。InterProcessMutex會直接從threadData中根據當前線程獲取其LockData,若LockData不爲空,則意味着當前線程擁有此,在鎖的次數上加一就直接返回true。若爲空,則經過internals屬性的attemptLock()方法去競爭鎖,該方法返回一個鎖對應節點的路徑。若該路徑不爲空,表明當前線程得到到了鎖,而後爲當前線程建立對應的LcokData並記錄進threadData中。

競爭鎖

// LockInternals.class
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;
            try {
                // 建立鎖節點
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                // 競爭鎖
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,  
                        System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }
複製代碼

一看這個方法,一大堆的變量定義,所有先忽略掉。最終的返回值由hasTheLock決定,爲true時返回ourPathourPath初始化爲null,後經this.driver.createsTheLock(this.client, this.path, localLockNodeBytes)賦值,這個方法點擊去可看到默認的鎖驅動類的建立鎖節點方法,可知這裏只是建立了鎖節點。再看hasTheLock,爲internalLockLoop()方法的返回值,只有該方法返回true時,attemptLock()纔會返回鎖節點路徑,纔會加鎖成功。那OK,鎖的競爭實現是由internalLockLoop進行。上面循環中的異常捕捉中是根據客戶端的重試策略進行重試。

// LockInternals.class
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                // 獲取全部子節點
                List<String> children = this.getSortedChildren();
                // 獲取當前鎖節點
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                // 使用鎖驅動加鎖
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, 
                            sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                    // 阻塞等待上一個鎖釋放
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                // 未設置超時一直阻塞
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                // 根據時間設置阻塞時間
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    // 已經超時,設置刪除節點標識
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                            ;
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                // 刪除已超時的鎖節點
                this.deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
複製代碼

好吧,又是一大堆代碼。仍是先挑着看,返回值是haveTheLock,布爾型,看名字就知道這個變量表明競爭鎖的成功與否。該變量的賦值發生在循環內,ok,看循環。先是獲取全部子節點以及當前節點名稱,再由驅動類進行鎖競爭,競爭結果封裝在PredicateResults類中,該類中包含一個布爾型的結果標識getsTheLock和一個監聽節點路徑pathToWatch。最後根據所競爭結果決定是否阻塞線程等待監聽鎖節點的釋放。須要注意的是,這裏阻塞使用的是對象的wait()機制,同時根據是否設置超時時間,是否已經超時決定線程阻塞時間或是刪除超時節點。but,鎖競爭的具體實現仍是不在這裏,這裏只是有詳細的鎖等待實現。Curator默認的鎖驅動類是StandardLockInternalsDriver

// StandardLockInternalsDriver.class
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, 
            int maxLeases) throws Exception {
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = ourIndex < maxLeases;
        String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
複製代碼

首先獲取全部子節點中當前節點所在的位置索引,而後校驗該索引,內部實現爲判斷是否小於0,成立則拋出一個NoNodeException。那確定不是0啦。最終可否得到鎖取決於該位置索引是否爲0,也就是當前節點是否最小(maxLeases在InterProcessMutex構造器中初始化LockInternals設定的是1)。

總結

本文基於ZK實現分佈式鎖的思路、實現以及Curator的分佈式可重入排他鎖的原理剖析,算是小白研究ZK實現分佈式鎖的全部收穫了。我的覺的關鍵點仍是在於如下幾點:

  • 利用臨時節點避免客戶端程序異常致使的死鎖;
  • 利用有序節點設定鎖的獲取規則;
  • 利用進程內的線程同步機制實現跨進程的分佈式鎖等待。

嗯,應該就這些了,要是小白有哪裏遺漏的,後續再補。

參考資料

漫畫:什麼是ZooKeeper?

Curator官方文檔

相關文章
相關標籤/搜索