一篇文章完全理解ZooKeeper分佈式鎖實現原理

許多場景中,數據一致性是一個比較重要的話題,在單機環境中,咱們能夠經過Java提供的併發API來解決;而在分佈式環境(會遇到網絡故障、消息重複、消息丟失等各類問題)下要複雜得多,常見的解決方案是分佈式事務分佈式鎖等。redis

本文主要探討如何利用Zookeeper來實現分佈式鎖。數據庫

關於分佈式鎖

分佈式鎖是控制分佈式系統之間同步訪問共享資源的一種方式。apache

實現分佈式鎖的過程當中須要注意的:緩存

  • 鎖的可重入性(遞歸調用不該該被阻塞、避免死鎖)服務器

  • 鎖的超時(避免死鎖、死循環等意外狀況)網絡

  • 鎖的阻塞(保證原子性等)session

  • 鎖的特性支持(阻塞鎖、可重入鎖、公平鎖、聯鎖、信號量、讀寫鎖)

使用分佈式鎖時須要注意:併發

  • 分佈式鎖的開銷(分佈式鎖通常能不用就不用,有些場景能夠用樂觀鎖代替)dom

  • 加鎖的粒度(控制加鎖的粒度,能夠優化系統的性能)分佈式

  • 加鎖的方式

如下是幾種常見的實現分佈式鎖的方案及其優缺點。

基於數據庫

1. 基於數據庫表

最簡單的方式可能就是直接建立一張鎖表,當咱們要鎖住某個方法或資源時,咱們就在該表中增長一條記錄,想要釋放鎖的時候就刪除這條記錄。給某字段添加惟一性約束,若是有多個請求同時提交到數據庫的話,數據庫會保證只有一個操做能夠成功,那麼咱們就能夠認爲操做成功的那個線程得到了該方法的鎖,能夠執行方法體內容。

會引入數據庫單點、無失效時間、不阻塞、不可重入等問題。

2. 基於數據庫排他鎖

若是使用的是MySql的InnoDB引擎,在查詢語句後面增長for update,數據庫會在查詢過程當中(須經過惟一索引查詢)給數據庫表增長排他鎖,咱們能夠認爲得到排它鎖的線程便可得到分佈式鎖,經過 connection.commit() 操做來釋放鎖。

會引入數據庫單點、不可重入、沒法保證必定使用行鎖(部分狀況下MySQL自動使用表鎖而不是行鎖)、排他鎖長時間不提交致使佔用數據庫鏈接等問題。

3. 數據庫實現分佈式鎖總結

優勢:

  • 直接藉助數據庫,容易理解。

缺點:

  • 會引入更多的問題,使整個方案變得愈來愈複雜

  • 操做數據庫須要必定的開銷,有必定的性能問題

  • 使用數據庫的行級鎖並不必定靠譜,尤爲是當咱們的鎖表並不大的時候

基於緩存

相比較於基於數據庫實現分佈式鎖的方案來講,基於緩存來實如今性能方面會表現的更好一點。目前有不少成熟的緩存產品,包括Redis、memcached、tair等。

這裏以Redis爲例舉出幾種實現方法:

1. 基於 redis 的 setnx()、expire() 方法作分佈式鎖

setnx 的含義就是 SET if Not Exists,其主要有兩個參數 setnx(key, value)。該方法是原子的,若是 key 不存在,則設置當前 key 成功,返回 1;若是當前 key 已經存在,則設置當前 key 失敗,返回 0。

expire 設置過時時間,要注意的是 setnx 命令不能設置 key 的超時時間,只能經過 expire() 來對 key 設置。

2. 基於 redis 的 setnx()、get()、getset()方法作分佈式鎖

getset 這個命令主要有兩個參數 getset(key,newValue),該方法是原子的,對 key 設置 newValue 這個值,而且返回 key 原來的舊值。

3. 基於 Redlock 作分佈式鎖

Redlock 是 Redis 的做者 antirez 給出的集羣模式的 Redis 分佈式鎖,它基於 N 個徹底獨立的 Redis 節點(一般狀況下 N 能夠設置成 5)

4. 基於 redisson 作分佈式鎖

redisson 是 redis 官方的分佈式鎖組件

基於緩存實現分佈式鎖總結

優勢:

  • 性能好

缺點:

  • 實現中須要考慮的因素太多

  • 經過超時時間來控制鎖的失效時間並非十分的靠譜

基於Zookeeper

大體思想爲:每一個客戶端對某個方法加鎖時,在 Zookeeper 上與該方法對應的指定節點的目錄下,生成一個惟一的臨時有序節點。判斷是否獲取鎖的方式很簡單,只須要判斷有序節點中序號最小的一個。當釋放鎖的時候,只需將這個臨時節點刪除便可。同時,其能夠避免服務宕機致使的鎖沒法釋放,而產生的死鎖問題

Zookeeper實現分佈式鎖總結

優勢:

  • 有效的解決單點問題,不可重入問題,非阻塞問題以及鎖沒法釋放的問題

  • 實現較爲簡單

缺點:

  • 性能上不如使用緩存實現的分佈式鎖,由於每次在建立鎖和釋放鎖的過程當中,都要動態建立、銷燬臨時節點來實現鎖功能

  • 須要對Zookeeper的原理有所瞭解

Zookeeper 如何實現分佈式鎖?

下面講如何實現排他鎖和共享鎖,以及如何解決羊羣效應。

排他鎖

排他鎖,又稱寫鎖或獨佔鎖。若是事務T1對數據對象O1加上了排他鎖,那麼在整個加鎖期間,只容許事務T1對O1進行讀取或更新操做,其餘任務事務都不能對這個數據對象進行任何操做,直到T1釋放了排他鎖。

排他鎖核心是保證當前有且僅有一個事務得到鎖,而且鎖釋放以後,全部正在等待獲取鎖的事務都可以被通知到

Zookeeper 的強一致性特性,可以很好地保證在分佈式高併發狀況下節點的建立必定可以保證全局惟一性,即Zookeeper將會保證客戶端沒法重複建立一個已經存在的數據節點。能夠利用Zookeeper這個特性,實現排他鎖。

  • 定義鎖:經過Zookeeper上的數據節點來表示一個鎖

  • 獲取鎖:客戶端經過調用 create 方法建立表示鎖的臨時節點,能夠認爲建立成功的客戶端得到了鎖,同時可讓沒有得到鎖的節點在該節點上註冊Watcher監聽,以便實時監聽到lock節點的變動狀況

  • 釋放鎖:如下兩種狀況均可以讓鎖釋放

  • 當前得到鎖的客戶端發生宕機或異常,那麼Zookeeper上這個臨時節點就會被刪除

  • 正常執行完業務邏輯,客戶端主動刪除本身建立的臨時節點

基於Zookeeper實現排他鎖流程:

一篇文章完全理解ZooKeeper分佈式鎖實現原理

共享鎖

共享鎖,又稱讀鎖。若是事務T1對數據對象O1加上了共享鎖,那麼當前事務只能對O1進行讀取操做,其餘事務也只能對這個數據對象加共享鎖,直到該數據對象上的全部共享鎖都被釋放。

共享鎖與排他鎖的區別在於,加了排他鎖以後,數據對象只對當前事務可見,而加了共享鎖以後,數據對象對全部事務均可見。

  • 定義鎖:經過Zookeeper上的數據節點來表示一個鎖,是一個相似於 /lockpath/[hostname]-請求類型-序號的臨時順序節點

  • 獲取鎖:客戶端經過調用 create 方法建立表示鎖的臨時順序節點,若是是讀請求,則建立 /lockpath/[hostname]-R-序號 節點,若是是寫請求則建立 /lockpath/[hostname]-W-序號 節點

  • 判斷讀寫順序:大概分爲4個步驟

  • 1)建立完節點後,獲取 /lockpath 節點下的全部子節點,並對該節點註冊子節點變動的Watcher監聽

  • 2)肯定本身的節點序號在全部子節點中的順序

  • 3.1)對於讀請求:1. 若是沒有比本身序號更小的子節點,或者比本身序號小的子節點都是讀請求,那麼代表本身已經成功獲取到了共享鎖,同時開始執行讀取邏輯 2. 若是有比本身序號小的子節點有寫請求,那麼等待 3.

  • 3.2)對於寫請求,若是本身不是序號最小的節點,那麼等待

  • 4)接收到Watcher通知後,重複步驟1)

  • 釋放鎖:與排他鎖邏輯一致

一篇文章完全理解ZooKeeper分佈式鎖實現原理

基於Zookeeper實現共享鎖流程:

一篇文章完全理解ZooKeeper分佈式鎖實現原理

羊羣效應

在實現共享鎖的 「判斷讀寫順序」 的第1個步驟是:建立完節點後,獲取 /lockpath 節點下的全部子節點,並對該節點註冊子節點變動的Watcher監聽。這樣的話,任何一次客戶端移除共享鎖以後,Zookeeper將會發送子節點變動的Watcher通知給全部機器,系統中將有大量的 「Watcher通知」 和 「子節點列表獲取」 這個操做重複執行,而後全部節點再判斷本身是不是序號最小的節點(寫請求)或者判斷比本身序號小的子節點是否都是讀請求(讀請求),從而繼續等待下一次通知。

然而,這些重複操做不少都是 「無用的」,實際上每一個鎖競爭者只須要關注序號比本身小的那個節點是否存在便可

當集羣規模比較大時,這些 「無用的」 操做不只會對Zookeeper形成巨大的性能影響和網絡衝擊,更爲嚴重的是,若是同一時間有多個客戶端釋放了共享鎖,Zookeeper服務器就會在短期內向其他客戶端發送大量的事件通知–這就是所謂的 「羊羣效應「。

改進後的分佈式鎖實現

具體實現以下:

  1. 客戶端調用 create 方法建立一個相似於 /lockpath/[hostname]-請求類型-序號 的臨時順序節點

  2. 客戶端調用 getChildren 方法獲取全部已經建立的子節點列表(這裏不註冊任何Watcher)
  • 讀請求:向比本身序號小的最後一個寫請求節點註冊Watcher監聽

  • 寫請求:向比本身序號小的最後一個節點註冊Watcher監聽
  1. 若是沒法獲取任何共享鎖,那麼調用 exist 來對比本身小的那個節點註冊Watcher

  2. 等待Watcher監聽,繼續進入步驟2

Zookeeper羊羣效應改進先後Watcher監聽圖

一篇文章完全理解ZooKeeper分佈式鎖實現原理

基於Curator客戶端實現分佈式鎖

Apache Curator是一個Zookeeper的開源客戶端,它提供了Zookeeper各類應用場景(Recipe,如共享鎖服務、master選舉、分佈式計數器等)的抽象封裝,接下來將利用Curator提供的類來實現分佈式鎖。

Curator提供的跟分佈式鎖相關的類有5個,分別是:

  • Shared Reentrant Lock 可重入鎖

  • Shared Lock 共享不可重入鎖

  • Shared Reentrant Read Write Lock 可重入讀寫鎖

  • Shared Semaphore 信號量

  • Multi Shared Lock 多鎖

關於錯誤處理:仍是強烈推薦使用ConnectionStateListener處理鏈接狀態的改變。當鏈接LOST時你再也不擁有鎖。

可重入鎖

Shared Reentrant Lock,全局可重入鎖,全部客戶端均可以請求,同一個客戶端在擁有鎖的同時,能夠屢次獲取,不會被阻塞。它是由類 InterProcessMutex 來實現,它的主要方法:

// 構造方法
public InterProcessMutex(CuratorFramework client, String path)
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
// 經過acquire得到鎖,並提供超時機制:
public void acquire() throws Exception
public boolean acquire(long time, TimeUnit unit) throws Exception
// 撤銷鎖
public void makeRevocable(RevocationListener<InterProcessMutex> listener)
public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)

定義一個 FakeLimitedResource 類來模擬一個共享資源,該資源一次只能被一個線程使用,直到使用結束,下一個線程才能使用,不然會拋出異常

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    // 模擬只能單線程操做的資源
    public void use() throws InterruptedException {
        if (!inUse.compareAndSet(false, true)) {
            // 在正確使用鎖的狀況下,此異常不可能拋出
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (100 * Math.random()));
        } finally {
            inUse.set(false);
        }

下面的代碼將建立 N 個線程來模擬分佈式系統中的節點,系統將經過 InterProcessMutex 來控制對資源的同步使用;每一個節點都將發起10次請求,完成 請求鎖--訪問資源--再次請求鎖--釋放鎖--釋放鎖 的過程;客戶端經過 acquire 請求鎖,經過 release 釋放鎖,得到幾把鎖就要釋放幾把鎖;這個共享資源一次只能被一個線程使用,若是控制同步失敗,將拋異常。

public class SharedReentrantLockTest {
    private static final String lockPath = "/testZK/sharedreentrantlock";
    private static final Integer clientNums = 5;
    final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的資源
    private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < clientNums; i++) {
            String clientName = "client#" + i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = ZKUtils.getClient();
                    client.start();
                    Random random = new Random();
                    try {
                        final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                        // 每一個客戶端請求10次共享資源
                        for (int j = 0; j < 10; j++) {
                            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                throw new IllegalStateException(j + ". " + clientName + " 不能獲得互斥鎖");
                            }
                            try {
                                System.out.println(j + ". " + clientName + " 已獲取到互斥鎖");
                                resource.use(); // 使用資源
                                if (!lock.acquire(10, TimeUnit.SECONDS)) {
                                    throw new IllegalStateException(j + ". " + clientName + " 不能再次獲得互斥鎖");
                                }
                                System.out.println(j + ". " + clientName + " 已再次獲取到互斥鎖");
                                lock.release(); // 申請幾回鎖就要釋放幾回鎖
                            } finally {
                                System.out.println(j + ". " + clientName + " 釋放互斥鎖");
                                lock.release(); // 老是在finally中釋放
                            }
                            Thread.sleep(random.nextInt(100));
                        }
                    } catch (Throwable e) {
                        System.out.println(e.getMessage());
                    } finally {
                        CloseableUtils.closeQuietly(client);
                        System.out.println(clientName + " 客戶端關閉!");
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        System.out.println("結束!");
    }
}

控制檯打印日誌,能夠看到對資源的同步訪問控制成功,而且鎖是可重入的

0. client#3 已獲取到互斥鎖
0. client#3 已再次獲取到互斥鎖
0. client#3 釋放互斥鎖
0. client#1 已獲取到互斥鎖
0. client#1 已再次獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 已再次獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 已再次獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 已再次獲取到互斥鎖
0. client#4 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 已再次獲取到互斥鎖
1. client#1 釋放互斥鎖
2. client#1 已獲取到互斥鎖
2. client#1 已再次獲取到互斥鎖
2. client#1 釋放互斥鎖
1. client#4 已獲取到互斥鎖
1. client#4 已再次獲取到互斥鎖
1. client#4 釋放互斥鎖
1. client#3 已獲取到互斥鎖
1. client#3 已再次獲取到互斥鎖
1. client#3 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 已再次獲取到互斥鎖
1. client#2 釋放互斥鎖
2. client#4 已獲取到互斥鎖
2. client#4 已再次獲取到互斥鎖
2. client#4 釋放互斥鎖
....
....
client#2 客戶端關閉!
9. client#0 已獲取到互斥鎖
9. client#0 已再次獲取到互斥鎖
9. client#0 釋放互斥鎖
9. client#3 已獲取到互斥鎖
9. client#3 已再次獲取到互斥鎖
9. client#3 釋放互斥鎖
client#0 客戶端關閉!
8. client#4 已獲取到互斥鎖
8. client#4 已再次獲取到互斥鎖
8. client#4 釋放互斥鎖
9. client#4 已獲取到互斥鎖
9. client#4 已再次獲取到互斥鎖
9. client#4 釋放互斥鎖
client#3 客戶端關閉!
client#4 客戶端關閉!
結束!

同時在程序運行期間查看Zookeeper節點樹,能夠發現每一次請求的鎖實際上對應一個臨時順序節點

[zk: localhost:2181(CONNECTED) 42] ls /testZK/sharedreentrantlock
[leases, _c_208d461b-716d-43ea-ac94-1d2be1206db3-lock-0000001659, locks, _c_64b19dba-3efa-46a6-9344-19a52e9e424f-lock-0000001658, _c_cee02916-d7d5-4186-8867-f921210b8815-lock-0000001657]

不可重入鎖

Shared Lock 與 Shared Reentrant Lock 類似,可是不可重入。這個不可重入鎖由類 InterProcessSemaphoreMutex 來實現,使用方法和上面的類相似。

將上面程序中的 InterProcessMutex 換成不可重入鎖 InterProcessSemaphoreMutex,若是再運行上面的代碼,結果就會發現線程被阻塞在第二個 acquire 上,直到超時,也就是此鎖不是可重入的。

控制檯輸出日誌

0. client#2 已獲取到互斥鎖
0. client#1 不能獲得互斥鎖
0. client#4 不能獲得互斥鎖
0. client#0 不能獲得互斥鎖
0. client#3 不能獲得互斥鎖
client#1 客戶端關閉!
client#4 客戶端關閉!
client#3 客戶端關閉!
client#0 客戶端關閉!
0. client#2 釋放互斥鎖
0. client#2 不能再次獲得互斥鎖
client#2 客戶端關閉!
結束!

把第二個獲取鎖的代碼註釋,程序才能正常執行

0. client#1 已獲取到互斥鎖
0. client#1 釋放互斥鎖
0. client#2 已獲取到互斥鎖
0. client#2 釋放互斥鎖
0. client#0 已獲取到互斥鎖
0. client#0 釋放互斥鎖
0. client#4 已獲取到互斥鎖
0. client#4 釋放互斥鎖
0. client#3 已獲取到互斥鎖
0. client#3 釋放互斥鎖
1. client#1 已獲取到互斥鎖
1. client#1 釋放互斥鎖
1. client#2 已獲取到互斥鎖
1. client#2 釋放互斥鎖
....
....
9. client#4 已獲取到互斥鎖
9. client#4 釋放互斥鎖
9. client#0 已獲取到互斥鎖
client#2 客戶端關閉!
9. client#0 釋放互斥鎖
9. client#1 已獲取到互斥鎖
client#0 客戶端關閉!
client#4 客戶端關閉!
9. client#1 釋放互斥鎖
9. client#3 已獲取到互斥鎖
client#1 客戶端關閉!
9. client#3 釋放互斥鎖
client#3 客戶端關閉!
結束!

可重入讀寫鎖

Shared Reentrant Read Write Lock,可重入讀寫鎖,一個讀寫鎖管理一對相關的鎖,一個負責讀操做,另一個負責寫操做;讀操做在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不容許讀(阻塞);此鎖是可重入的;一個擁有寫鎖的線程可重入讀鎖,可是讀鎖卻不能進入寫鎖,這也意味着寫鎖能夠降級成讀鎖, 好比 請求寫鎖 ---&gt;讀鎖 ----&gt;釋放寫鎖;從讀鎖升級成寫鎖是不行的。

可重入讀寫鎖主要由兩個類實現:InterProce***eadWriteLock、InterProcessMutex,使用時首先建立一個 InterProce***eadWriteLock 實例,而後再根據你的需求獲得讀鎖或者寫鎖,讀寫鎖的類型是 InterProcessMutex。

public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < clientNums; i++) {
            final String clientName = "client#" + i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    CuratorFramework client = ZKUtils.getClient();
                    client.start();
                    final InterProce***eadWriteLock lock = new InterProce***eadWriteLock(client, lockPath);
                    final InterProcessMutex readLock = lock.readLock();
                    final InterProcessMutex writeLock = lock.writeLock();

                    try {
                        // 注意只能先獲得寫鎖再獲得讀鎖,不能反過來!!!
                        if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
                            throw new IllegalStateException(clientName + " 不能獲得寫鎖");
                        }
                        System.out.println(clientName + " 已獲得寫鎖");
                        if (!readLock.acquire(10, TimeUnit.SECONDS)) {
                            throw new IllegalStateException(clientName + " 不能獲得讀鎖");
                        }
                        System.out.println(clientName + " 已獲得讀鎖");
                        try {
                            resource.use(); // 使用資源
                        } finally {
                            System.out.println(clientName + " 釋放讀寫鎖");
                            readLock.release();
                            writeLock.release();
                        }
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                    } finally {
                        CloseableUtils.closeQuietly(client);
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        System.out.println("結束!");
    }
}

控制檯打印日誌

client#1 已獲得寫鎖
client#1 已獲得讀鎖
client#1 釋放讀寫鎖
client#2 已獲得寫鎖
client#2 已獲得讀鎖
client#2 釋放讀寫鎖
client#0 已獲得寫鎖
client#0 已獲得讀鎖
client#0 釋放讀寫鎖
client#4 已獲得寫鎖
client#4 已獲得讀鎖
client#4 釋放讀寫鎖
client#3 已獲得寫鎖
client#3 已獲得讀鎖
client#3 釋放讀寫鎖
結束!

信號量

Shared Semaphore,一個計數的信號量相似JDK的 Semaphore,JDK中 Semaphore 維護的一組許可(permits),而Cubator中稱之爲租約(Lease)。有兩種方式能夠決定 semaphore 的最大租約數,第一種方式是由用戶給定的 path 決定,第二種方式使用 SharedCountReader 類。若是不使用 SharedCountReader,沒有內部代碼檢查進程是否假定有10個租約而進程B假定有20個租約。因此全部的實例必須使用相同的 numberOfLeases 值.

信號量主要實現類有:

InterProcessSemaphoreV2 - 信號量實現類
Lease - 租約(單個信號)
SharedCountReader - 計數器,用於計算最大租約數量

調用 acquire 會返回一個租約對象,客戶端必須在 finally 中 close 這些租約對象,不然這些租約會丟失掉。可是,若是客戶端session因爲某種緣由好比crash丟掉,那麼這些客戶端持有的租約會自動close,這樣其它客戶端能夠繼續使用這些租約。租約還能夠經過下面的方式返還:

public void returnLease(Lease lease)
public void returnAll(Collection<Lease> leases)

注意一次你能夠請求多個租約,若是 Semaphore 當前的租約不夠,則請求線程會被阻塞。同時還提供了超時的重載方法。

public Lease acquire() throws Exception
public Collection<Lease> acquire(int qty) throws Exception
public Lease acquire(long time, TimeUnit unit) throws Exception
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception

一個Demo程序以下

public class SharedSemaphoreTest {
    private static final int MAX_LEASE = 10;
    private static final String PATH = "/testZK/semaphore";
    private static final FakeLimitedResource resource = new FakeLimitedResource();

    public static void main(String[] args) throws Exception {
        CuratorFramework client = ZKUtils.getClient();
        client.start();
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
        Collection<Lease> leases = semaphore.acquire(5);
        System.out.println("獲取租約數量:" + leases.size());
        Lease lease = semaphore.acquire();
        System.out.println("獲取單個租約");
        resource.use(); // 使用資源
        // 再次申請獲取5個leases,此時leases數量只剩4個,不夠,將超時
        Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
        System.out.println("獲取租約,若是超時將爲null:" + leases2);
        System.out.println("釋放租約");
        semaphore.returnLease(lease);
        // 再次申請獲取5個,此次恰好夠
        leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
        System.out.println("獲取租約,若是超時將爲null:" + leases2);
        System.out.println("釋放集合中的全部租約");
        semaphore.returnAll(leases);
        semaphore.returnAll(leases2);
        client.close();
        System.out.println("結束!");
    }
}

控制檯打印日誌

獲取租約數量:5
獲取單個租約
獲取租約,若是超時將爲null:null
釋放租約
獲取租約,若是超時將爲null:[org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@3108bc, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@370736d9, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@5f9d02cb, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@63753b6d, org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2$3@6b09bb57]
釋放集合中的全部租約
結束!

注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每一個客戶端都按照請求的順序得到鎖,至關公平。

多鎖

Multi Shared Lock 是一個鎖的容器。當調用 acquire,全部的鎖都會被 acquire,若是請求失敗,全部的鎖都會被 release。一樣調用 release 時全部的鎖都被 release(失敗被忽略)。基本上,它就是組鎖的表明,在它上面的請求釋放操做都會傳遞給它包含的全部的鎖。

主要涉及兩個類:

InterProcessMultiLock - 對所對象實現類
InterProcessLock - 分佈式鎖接口類

它的構造函數須要包含的鎖的集合,或者一組 ZooKeeper 的 path,用法和 Shared Lock 相同

public InterProcessMultiLock(CuratorFramework client, List<String> paths)
public InterProcessMultiLock(List<InterProcessLock> locks)

一個Demo程序以下

public class MultiSharedLockTest {
    private static final String lockPath1 = "/testZK/MSLock1";
    private static final String lockPath2 = "/testZK/MSLock2";
    private static final FakeLimitedResource resource = new FakeLimitedResource();

    public static void main(String[] args) throws Exception {
        CuratorFramework client = ZKUtils.getClient();
        client.start();

        InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入鎖
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入鎖
        // 組鎖,多鎖
        InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
        if (!lock.acquire(10, TimeUnit.SECONDS)) {
            throw new IllegalStateException("不能獲取多鎖");
        }
        System.out.println("已獲取多鎖");
        System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
        System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
        try {
            resource.use(); // 資源操做
        } finally {
            System.out.println("釋放多個鎖");
            lock.release(); // 釋放多鎖
        }
        System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
        System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
        client.close();
        System.out.println("結束!");
    }
}

寫在最後

歡迎你們關注個人公衆號【風平浪靜如碼】,海量Java相關文章,學習資料都會在裏面更新,整理的資料也會放在裏面。

以爲寫的還不錯的就點個贊,加個關注唄!點關注,不迷路,持續更新!!!

相關文章
相關標籤/搜索