原創轉載請註明出處:http://www.javashuo.com/article/p-tzhhwqih-hp.htmlhtml
Zookeeper是一種提供「分佈式服務協調「的中心化服務,分佈式應用程序才能夠基於Zookeeper的如下兩個特性實現分佈式鎖功能。java
熟悉了Zookeeper的這兩個特性以後,就能夠看看Zookeeper是如何實現分佈式鎖的了。node
首先,須要創建一個父節點,節點類型爲持久節點(PERSISTENT) ,每當須要訪問共享資源時,就會在父節點下創建相應的順序子節點,節點類型爲臨時節點(EPHEMERAL),且標記爲有序性(SEQUENTIAL),而且以臨時節點名稱+父節點名稱+順序號組成特定的名字。spring
在創建子節點後,對父節點下面的全部以臨時節點名稱name開頭的子節點進行排序,判斷剛剛創建的子節點順序號是不是最小的節點,若是是最小節點,則得到鎖。apache
若是不是最小節點,則阻塞等待鎖,而且得到該節點的上一順序節點,爲其註冊監聽事件,等待節點對應的操做得到鎖。session
當調用完共享資源後,刪除該節點,關閉zk,進而能夠觸發監聽事件,釋放該鎖。併發
以上實現的分佈式鎖是嚴格按照順序訪問的併發鎖。通常還能夠直接引用Curator框架來實現Zookeeper分佈式鎖,代碼以下:框架
Maven Dependency分佈式
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
DistributedLock.javaide
1 package org.fool.spring.util; 2 3 import java.util.concurrent.TimeUnit; 4 5 public interface DistributedLock { 6 7 void lock() throws Exception; 8 9 boolean tryLock(long time, TimeUnit unit) throws Exception; 10 11 void unlock() throws Exception; 12 13 boolean isAcquiredInThisProcess(); 14 }
ZkDistributedLock.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex; 5 6 import java.util.concurrent.TimeUnit; 7 8 public class ZkDistributedLock implements DistributedLock { 9 10 private InterProcessMutex mutex; 11 12 ZkDistributedLock(ZkClient zkClient, String lockPath) { 13 CuratorFramework client = zkClient.getClient(); 14 this.mutex = new InterProcessMutex(client, lockPath); 15 } 16 17 @Override 18 public void lock() throws Exception { 19 this.mutex.acquire(); 20 } 21 22 @Override 23 public boolean tryLock(long time, TimeUnit unit) throws Exception { 24 return this.mutex.acquire(time, unit); 25 } 26 27 @Override 28 public void unlock() throws Exception { 29 this.mutex.release(); 30 } 31 32 @Override 33 public boolean isAcquiredInThisProcess() { 34 return this.mutex.isAcquiredInThisProcess(); 35 } 36 37 }
ZkClient.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.framework.CuratorFramework; 4 5 public class ZkClient { 6 7 private final CuratorFramework client; 8 9 ZkClient(CuratorFramework client) { 10 this.client = client; 11 } 12 13 CuratorFramework getClient() { 14 return this.client; 15 } 16 17 /** 18 * start the client 19 */ 20 public void start() { 21 this.client.start(); 22 } 23 24 /** 25 * close the client 26 */ 27 public void close() { 28 this.client.close(); 29 } 30 31 }
DistributedLocks.java
1 package org.fool.spring.util; 2 3 import org.apache.curator.RetryPolicy; 4 import org.apache.curator.framework.CuratorFramework; 5 import org.apache.curator.framework.CuratorFrameworkFactory; 6 import org.apache.curator.retry.ExponentialBackoffRetry; 7 8 public final class DistributedLocks { 9 10 private DistributedLocks() { 11 } 12 13 private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000; 14 private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000; 15 private static final int BASE_SLEEP_TIME_MS = 1000; 16 private static final int MAX_RETRIES = 3; 17 18 /** 19 * Define the default retry policy 20 */ 21 private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES); 22 23 /** 24 * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout. 25 */ 26 public static ZkClient newZkClient(String connectString) { 27 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS, 28 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY); 29 return new ZkClient(client); 30 } 31 32 /** 33 * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout 34 */ 35 public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) { 36 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, 37 connectionTimeoutMs, DEFAULT_RETRY_POLICY); 38 return new ZkClient(client); 39 } 40 41 /** 42 * Create a new DistributedLock with ZkClient and lock path. 43 */ 44 public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) { 45 return new ZkDistributedLock(zkClient, lockPath); 46 } 47 }
TestZkDistributedLock.java
1 package org.fool.spring.test; 2 3 import org.fool.spring.util.DistributedLock; 4 import org.fool.spring.util.DistributedLocks; 5 import org.fool.spring.util.ZkClient; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.util.concurrent.TimeUnit; 10 11 public class TestZkDistributedLock { 12 13 private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class); 14 15 private static final String lockPath = "/curator/test"; 16 17 public static void main(String[] args) throws Exception { 18 ZkClient zkClient = DistributedLocks.newZkClient("127.0.0.1:2181"); 19 zkClient.start(); 20 21 DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath); 22 23 boolean isAcquired = lock.isAcquiredInThisProcess(); 24 logger.info("==========lock acquired: " + isAcquired + "=========="); 25 26 if (lock.tryLock(3, TimeUnit.SECONDS)) { 27 try { 28 isAcquired = lock.isAcquiredInThisProcess(); 29 logger.info("==========lock acquired: " + isAcquired + "=========="); 30 31 // mock to do business logic 32 Thread.sleep(60000); 33 } finally { 34 lock.unlock(); 35 logger.info("==========release the lock !!!=========="); 36 } 37 } else { 38 logger.info("==========failed to get the lock !!!=========="); 39 } 40 41 zkClient.close(); 42 } 43 }
執行TestZkDistributedLock,模擬業務執行佔用60s時間
在60s內,再次執行TestZkDistributedLock,能夠看到嘗試獲取鎖失敗
打開zk client,查看執行期間內的順序臨時節點的變化狀況
Zookeeper實現的分佈式鎖
優勢
缺點