死磕 java同步系列之zookeeper分佈式鎖

問題

(1)zookeeper如何實現分佈式鎖?java

(2)zookeeper分佈式鎖有哪些優勢?node

(3)zookeeper分佈式鎖有哪些缺點?mysql

簡介

zooKeeper是一個分佈式的,開放源碼的分佈式應用程序協調服務,它能夠爲分佈式應用提供一致性服務,它是Hadoop和Hbase的重要組件,同時也能夠做爲配置中心、註冊中心運用在微服務體系中。redis

本章咱們將介紹zookeeper如何實現分佈式鎖運用在分佈式系統中。sql

基礎知識

什麼是znode?

zooKeeper操做和維護的爲一個個數據節點,稱爲 znode,採用相似文件系統的層級樹狀結構進行管理,若是 znode 節點包含數據則存儲爲字節數組(byte array)。apache

並且,同一個節點多個客戶同時建立【本篇文章由公衆號「彤哥讀源碼」原創】,只有一個客戶端會成功,其它客戶端建立時將失敗。數組

zooKeeper

節點類型

znode 共有四種類型:安全

  • 持久(無序)
  • 持久有序
  • 臨時(無序)
  • 臨時有序

其中,持久節點若是不手動刪除會一直存在,臨時節點當客戶端session失效就會自動刪除節點。服務器

什麼是watcher?

watcher(事件監聽器),是zookeeper中的一個很重要的特性。session

zookeeper容許用戶在指定節點上註冊一些watcher,而且在一些特定事件觸發的時候,zooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現分佈式協調服務的重要特性

KeeperState EventType 觸發條件 說明 操做
SyncConnected(3) None(-1) 客戶端與服務端成功創建鏈接 此時客戶端和服務器處於鏈接狀態 -
同上 NodeCreated(1) Watcher監聽的對應數據節點被建立 同上 Create
同上 NodeDeleted(2) Watcher監聽的對應數據節點被刪除 同上 Delete/znode
同上 NodeDataChanged(3) Watcher監聽的對應數據節點的數據內容發生變動 同上 setDate/znode
同上 NodeChildChanged(4) Wather監聽的對應數據節點的子節點列表發生變動 同上 Create/child
Disconnected(0) None(-1) 客戶端與ZooKeeper服務器斷開鏈接 此時客戶端和服務器處於斷開鏈接狀態 -
Expired(-112) None(-1) 會話超時 此時客戶端會話失效,一般同時也會受到SessionExpiredException異常 -
AuthFailed(4) None(-1) 一般有兩種狀況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 一般同時也會收到AuthFailedException異常 -

原理解析

方案一

既然,同一個節點只能建立一次,那麼,加鎖時檢測節點是否存在,不存在則建立之,存在或者建立失敗則監聽這個節點的刪除事件,這樣,當釋放鎖的時候監聽的客戶端再次競爭去建立這個節點,成功的則獲取到鎖,不成功的則再次監聽該節點。

zooKeeper

好比,有三個客戶端client一、client二、client3同時獲取/locker/user_1這把鎖,它們將按照以下步驟運行:

(1)三者同時嘗試建立/locker/user_1節點;

(2)client1建立成功,它獲取到鎖;

(3)client2和client3建立失敗,它們監聽/locker/user_1的刪除事件;

(4)client1執行鎖內業務邏輯;

(5)client1釋放鎖,刪除節點/locker/user_1;

(6)client2和client3都捕獲到節點/locker/user_1被刪除的事件,兩者皆被喚醒;

(7)client2和client3同時去建立/locker/user_1節點;

(8)回到第二步,依次類推【本篇文章由公衆號「彤哥讀源碼」原創】;

不過,這種方案有個很嚴重的弊端——驚羣效應。

若是併發量很高,多個客戶端同時監聽同一個節點,釋放鎖時同時喚醒這麼多個客戶端,而後再競爭,最後仍是隻有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統資源,那麼有沒有更好的方案呢?答案是固然有,請看方案二。

方案二

爲了解決方案一中的驚羣效應,咱們可使用有序子節點的形式來實現分佈式鎖,並且爲了規避客戶端獲取鎖後忽然斷線的風險,咱們有必要使用臨時有序節點。

zooKeeper

好比,有三個客戶端client一、client二、client3同時獲取/locker/user_1這把鎖,它們將按照以下步驟運行:

(1)三者同時在/locker/user_1/下面建立臨時有序子節點;

(2)三者皆建立成功,分別爲/locker/user1/000000000一、/locker/user1/000000000三、/locker/user_1/0000000002;

(3)檢查本身建立的節點是否是子節點中最小的;

(4)client1發現本身是最小的節點,它獲取到鎖;

(5)client2和client3發現本身不是最小的節點,它們沒法獲取到鎖;

(6)client2建立的節點爲/locker/user1/0000000003,它監聽其上一個節點/locker/user1/0000000002的刪除事件;

(7)client3建立的節點爲/locker/user1/0000000002,它監聽其上一個節點/locker/user1/0000000001的刪除事件;

(8)client1執行鎖內業務邏輯;

(9)client1釋放鎖,刪除節點/locker/user_1/0000000001;

(10)client3監聽到節點/locker/user_1/0000000001的刪除事件,被喚醒;

(11)client3再次檢查本身是否是最小的節點,發現是,則獲取到鎖;

(12)client3執行鎖內業務邏輯【本篇文章由公衆號「彤哥讀源碼」原創】;

(13)client3釋放鎖,刪除節點/locker/user_1/0000000002;

(14)client2監聽到節點/locker/user_1/0000000002的刪除事件,被喚醒;

(15)client2執行鎖內業務邏輯;

(16)client2釋放鎖,刪除節點/locker/user_1/0000000003;

(17)client2檢查/locker/user1/下是否還有子節點,沒有了則刪除/locker/user1節點;

(18)流程結束;

這種方案相對於方案一來講,每次釋放鎖時只喚醒一個客戶端,減小了線程喚醒的代價,提升了效率。

zookeeper原生API實現

pom文件

pom中引入如下jar包:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>複製代碼

Locker接口

定義一個Locker接口,與上一章mysql分佈式鎖使用同一個接口。

public interface Locker {
    void lock(String key, Runnable command);
}複製代碼

zookeeper分佈式鎖實現

這裏經過內部類ZkLockerWatcher處理zookeeper的相關操做,須要注意如下幾點:

(1)zk鏈接創建完畢以前不要進行相關操做,不然會報ConnectionLoss異常,這裏經過LockSupport.park();阻塞鏈接線程並在監聽線程中喚醒處理;

(2)客戶端線程與監聽線程不是同一個線程,因此能夠經過LockSupport.park();及LockSupport.unpark(thread);來處理;

(3)中間不少步驟不是原子的(坑),因此須要再次檢測,詳見代碼中註釋;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待鏈接創建完畢
                LockSupport.park();
                // 根節點若是不存在,就建立一個(併發問題,若是兩個線程同時檢測不存在,兩個同時去建立必須有一個會失敗)
                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 若是節點已存在,則建立失敗,這裏捕獲異常,並不阻擋程序正常運行
                        log.info("建立節點 {} 失敗", LOCKER_ROOT);
                    }
                }
                // 當前加鎖的節點是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 若是節點已存在,則建立失敗,這裏捕獲異常,並不阻擋程序正常運行
                        log.info("建立節點 {} 失敗", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 建立子節點【本篇文章由公衆號「彤哥讀源碼」原創】
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 檢查本身是否是最小的節點,是則獲取成功,不是則監聽上一個節點
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 釋放鎖,刪除節點
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最後一個釋放的刪除鎖節點
                List<String> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 若是刪除以前又新加了一個子節點,會刪除失敗
                        log.info("刪除節點 {} 失敗", parentLockPath);
                    }
                }
                // 關閉zk鏈接
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<String> children = zooKeeper.getChildren(parentLockPath, false);
            // 必需要排序一下,這裏取出來的順序多是亂的
            Collections.sort(children);
            // 若是當前節點是第一個子節點,則獲取鎖成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 若是不是第一個子節點,就監聽前一個節點
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞當前線程
                LockSupport.park();
                // 喚醒以後從新檢測本身是否是最小的節點,由於有可能上一個節點斷線了
                return getLockOrWatchLast();
            } else {
                // 若是上一個節點不存在,說明還沒來得及監聽就釋放了,從新檢查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 喚醒阻塞的線程(這是在監聽線程,跟獲取鎖的線程不是同一個線程)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}複製代碼

測試代碼

咱們這裏起兩批線程,一批獲取user1這個鎖,一批獲取user2這個鎖。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("user_1", ()-> {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i < 2000; i++) {
            new Thread(()->{
                locker.lock("user_2", ()-> {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}複製代碼

運行結果:

能夠看到穩定在500ms左右打印兩個鎖的結果。

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621複製代碼

curator實現

上面的原生API實現更易於理解zookeeper實現分佈式鎖的邏輯,可是不免保證沒有什麼問題,好比不是重入鎖,不支持讀寫鎖等。

下面咱們一塊兒看看現有的輪子curator是怎麼實現的。

pom文件

pom文件中引入如下jar包:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>複製代碼

代碼實現

下面是互斥鎖的一種實現方案:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 【本篇文章由公衆號「彤哥讀源碼」原創】
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}複製代碼

除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現方式,並且他們是可重入的鎖。

總結

(1)zookeeper中的節點有四種類型:持久、持久有序、臨時、臨時有序;

(2)zookeeper提供了一種很是重要的特性——監聽機制,它能夠用來監聽節點的變化;

(3)zookeeper分佈式鎖是基於 臨時有序節點 + 監聽機制 實現的;

(4)zookeeper分佈式鎖加鎖時在鎖路徑下建立臨時有序節點;

(5)若是本身是第一個節點,則得到鎖;

(6)若是本身不是第一個節點,則監聽前一個節點,並阻塞當前線程;

(7)當監聽到前一個節點的刪除事件時,喚醒當前節點的線程,並再次檢查本身是否是第一個節點;

(8)使用臨時有序節點而不是持久有序節點是爲了讓客戶端無端斷線時可以自動釋放鎖;

彩蛋

zookeeper分佈式鎖有哪些優勢?

答:1)zookeeper自己能夠集羣部署,相對於mysql的單點更可靠;

2)不會佔用mysql的鏈接數,不會增長mysql的壓力;

3)使用監聽機制,減小線程上下文切換的次數;

4)客戶端斷線可以自動釋放鎖,很是安全;

5)有現有的輪子curator可使用;

6)curator實現方式是可重入的,對現有代碼改形成本小;

zookeeper分佈式鎖有哪些缺點?

答:1)加鎖會頻繁地「寫」zookeeper,增長zookeeper的壓力;

2)寫zookeeper的時候會在集羣進行同步,節點數越多,同步越慢,獲取鎖的過程越慢;

3)須要另外依賴zookeeper,而大部分服務是不會使用zookeeper的,增長了系統的複雜性;

4)相對於redis分佈式鎖,性能要稍微略差一些;

推薦閱讀

一、死磕 java同步系列之開篇

二、死磕 java魔法類之Unsafe解析

三、死磕 java同步系列之JMM(Java Memory Model)

四、死磕 java同步系列之volatile解析

五、死磕 java同步系列之synchronized解析

六、死磕 java同步系列之本身動手寫一個鎖Lock

七、死磕 java同步系列之AQS起篇

八、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

九、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

十、死磕 java同步系列之ReentrantLock VS synchronized

十一、死磕 java同步系列之ReentrantReadWriteLock源碼解析

十二、死磕 java同步系列之Semaphore源碼解析

1三、死磕 java同步系列之CountDownLatch源碼解析

1四、死磕 java同步系列之AQS終篇

1五、死磕 java同步系列之StampedLock源碼解析

1六、死磕 java同步系列之CyclicBarrier源碼解析

1七、死磕 java同步系列之Phaser源碼解析

1八、死磕 java同步系列之mysql分佈式鎖

歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索