Zookeeper分佈式鎖

原創轉載請註明出處:http://www.javashuo.com/article/p-tzhhwqih-hp.htmlhtml

 

Zookeeper是一種提供「分佈式服務協調「的中心化服務,分佈式應用程序才能夠基於Zookeeper的如下兩個特性實現分佈式鎖功能。java

  • 順序臨時節點:Zookeeper提供一個多層級的節點命名空間(節點稱爲Znode),每一個節點都用一個以斜槓(/)分隔的路徑來表示,並且每一個節點都有父節點(根節點除外),很是相似於文件系統。節點類型能夠分爲持久節點(PERSISTENT )、臨時節點(EPHEMERAL),每一個節點還能被標記爲有序性(SEQUENTIAL),一旦節點被標記爲有序性,那麼整個節點就具備順序自增的特色。通常能夠組合這幾類節點來建立所須要的節點,例如,建立一個持久節點做爲父節點,在父節點下面建立臨時節點,並標記該臨時節點爲有序性。
  • Watch機制:Zookeeper還提供了另一個重要的特性,Watcher(事件監聽器)。ZooKeeper容許用戶在指定節點上註冊一些Watcher,而且在一些特定事件觸發的時候,ZooKeeper服務端會將事件通知給用戶。

熟悉了Zookeeper的這兩個特性以後,就能夠看看Zookeeper是如何實現分佈式鎖的了。node

首先,須要創建一個父節點,節點類型爲持久節點(PERSISTENT) ,每當須要訪問共享資源時,就會在父節點下創建相應的順序子節點,節點類型爲臨時節點(EPHEMERAL),且標記爲有序性(SEQUENTIAL),而且以臨時節點名稱+父節點名稱+順序號組成特定的名字。spring

在創建子節點後,對父節點下面的全部以臨時節點名稱name開頭的子節點進行排序,判斷剛剛創建的子節點順序號是不是最小的節點,若是是最小節點,則得到鎖。apache

若是不是最小節點,則阻塞等待鎖,而且得到該節點的上一順序節點,爲其註冊監聽事件,等待節點對應的操做得到鎖。session

當調用完共享資源後,刪除該節點,關閉zk,進而能夠觸發監聽事件,釋放該鎖。併發

以上實現的分佈式鎖是嚴格按照順序訪問的併發鎖。通常還能夠直接引用Curator框架來實現Zookeeper分佈式鎖,代碼以下:框架

Maven Dependency分佈式

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

DistributedLock.javaide

 1 package org.fool.spring.util;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 public interface DistributedLock {
 6 
 7     void lock() throws Exception;
 8 
 9     boolean tryLock(long time, TimeUnit unit) throws Exception;
10 
11     void unlock() throws Exception;
12 
13     boolean isAcquiredInThisProcess();
14 }

ZkDistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 5 
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class ZkDistributedLock implements DistributedLock {
 9 
10     private InterProcessMutex mutex;
11 
12     ZkDistributedLock(ZkClient zkClient, String lockPath) {
13         CuratorFramework client = zkClient.getClient();
14         this.mutex = new InterProcessMutex(client, lockPath);
15     }
16 
17     @Override
18     public void lock() throws Exception {
19         this.mutex.acquire();
20     }
21 
22     @Override
23     public boolean tryLock(long time, TimeUnit unit) throws Exception {
24         return this.mutex.acquire(time, unit);
25     }
26 
27     @Override
28     public void unlock() throws Exception {
29         this.mutex.release();
30     }
31 
32     @Override
33     public boolean isAcquiredInThisProcess() {
34         return this.mutex.isAcquiredInThisProcess();
35     }
36 
37 }

ZkClient.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 
 5 public class ZkClient {
 6 
 7     private final CuratorFramework client;
 8 
 9     ZkClient(CuratorFramework client) {
10         this.client = client;
11     }
12 
13     CuratorFramework getClient() {
14         return this.client;
15     }
16 
17     /**
18      * start the client
19      */
20     public void start() {
21         this.client.start();
22     }
23 
24     /**
25      * close the client
26      */
27     public void close() {
28         this.client.close();
29     }
30 
31 }

DistributedLocks.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.retry.ExponentialBackoffRetry;
 7 
 8 public final class DistributedLocks {
 9 
10     private DistributedLocks() {
11     }
12 
13     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
14     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
15     private static final int BASE_SLEEP_TIME_MS = 1000;
16     private static final int MAX_RETRIES = 3;
17 
18     /**
19      * Define the default retry policy
20      */
21     private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES);
22 
23     /**
24      * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout.
25      */
26     public static ZkClient newZkClient(String connectString) {
27         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS,
28                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY);
29         return new ZkClient(client);
30     }
31 
32     /**
33      * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout
34      */
35     public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) {
36         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs,
37                 connectionTimeoutMs, DEFAULT_RETRY_POLICY);
38         return new ZkClient(client);
39     }
40 
41     /**
42      * Create a new DistributedLock with ZkClient and lock path.
43      */
44     public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) {
45         return new ZkDistributedLock(zkClient, lockPath);
46     }
47 }

TestZkDistributedLock.java

 1 package org.fool.spring.test;
 2 
 3 import org.fool.spring.util.DistributedLock;
 4 import org.fool.spring.util.DistributedLocks;
 5 import org.fool.spring.util.ZkClient;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import java.util.concurrent.TimeUnit;
10 
11 public class TestZkDistributedLock {
12 
13     private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class);
14 
15     private static final String lockPath = "/curator/test";
16 
17     public static void main(String[] args) throws Exception {
18         ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181");
19         zkClient.start();
20 
21         DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath);
22 
23         boolean isAcquired = lock.isAcquiredInThisProcess();
24         logger.info("==========lock acquired: " + isAcquired + "==========");
25 
26         if (lock.tryLock(3, TimeUnit.SECONDS)) {
27             try {
28                 isAcquired = lock.isAcquiredInThisProcess();
29                 logger.info("==========lock acquired: " + isAcquired + "==========");
30 
31                 // mock to do business logic
32                 Thread.sleep(60000);
33             } finally {
34                 lock.unlock();
35                 logger.info("==========release the lock !!!==========");
36             }
37         } else {
38             logger.info("==========failed to get the lock !!!==========");
39         }
40 
41         zkClient.close();
42     }
43 }

 

Test

執行TestZkDistributedLock,模擬業務執行佔用60s時間

在60s內,再次執行TestZkDistributedLock,能夠看到嘗試獲取鎖失敗

打開zk client,查看執行期間內的順序臨時節點的變化狀況

 

 

Summary

Zookeeper實現的分佈式鎖

優勢

  • Zookeeper是集羣實現,能夠避免單點問題,且能保證每次操做均可以有效地釋放鎖,這是由於一旦應用服務掛掉了,臨時節點會由於session鏈接斷開而自動刪除掉。

缺點

  • 因爲頻繁地建立和刪除結點,加上大量的Watch事件,對Zookeeper集羣來講,壓力很是大。且從性能上來講,與Redis實現的分佈式鎖相比,仍是存在必定的差距。

 

Reference

https://time.geekbang.org/column/article/125983

http://curator.apache.org/

相關文章
相關標籤/搜索