ZooKeeper(八)-- Curator實現分佈式鎖

 1.pom.xmljava

  <dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.10</version>
        <scope>test</scope>
   </dependency>
   <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
   </dependency>
   <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
       <version>2.5.0</version>
   </dependency>
  </dependencies>

2.JAVA代碼apache

 1 package com.xbq.zookeeper.curator;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Semaphore;
 6 import java.util.concurrent.TimeUnit;
 7 import org.apache.curator.framework.CuratorFramework;
 8 import org.apache.curator.framework.CuratorFrameworkFactory;
 9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
10 import org.apache.curator.retry.RetryNTimes;
11 
12 /**
13  * 使用Curator來實現分佈式鎖
14  * @author xbq
15  */
16 public class LockByCurator {
17 
18     // 此demo使用的集羣,因此有多個ip和端口
19     private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
20     // session過時時間
21     private static int SESSION_TIMEOUT = 3000;
22     // 鏈接超時時間
23     private static int CONNECTION_TIMEOUT = 3000; 
24     
25     // 鎖節點
26     private static final String CURATOR_LOCK = "/curatorLock";
27     
28     /**
29      * 獲取鎖操做
30      * @param cf
31      */
32     public static void doLock(CuratorFramework cf){
33         System.out.println(Thread.currentThread().getName() + " 嘗試獲取鎖!");
34         // 實例化 zk分佈式鎖  
35         InterProcessMutex mutex = new InterProcessMutex(cf, CURATOR_LOCK);
36         try {
37             // 判斷是否獲取到了zk分佈式鎖
38             if(mutex.acquire(5, TimeUnit.SECONDS)){
39                 System.out.println(Thread.currentThread().getName() + " 獲取到了鎖!-------");
40                 // 業務操做
41                 Thread.sleep(5000);
42             }
43         } catch (Exception e) {
44             e.printStackTrace();
45         } finally {
46             try {
47                 // 釋放鎖
48                 mutex.release();
49             } catch (Exception e) {
50                 e.printStackTrace();
51             }
52         }
53     }
54     
55     /**
56      * 測試
57      * @param args
58      */
59     public static void main(String[] args) {
60         // 定義線程池
61         ExecutorService service = Executors.newCachedThreadPool();
62         // 定義信號燈,只能容許10個線程併發操做
63         final Semaphore semaphore = new Semaphore(10);
64         // 模擬10個客戶端
65         for(int i=0; i < 10 ;i++){
66             Runnable runnable = new Runnable() {
67                 @Override
68                 public void run() {
69                     try {
70                         semaphore.acquire();
71                          // 鏈接 ZooKeeper 
72                         CuratorFramework framework = CuratorFrameworkFactory.
73                                 newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new RetryNTimes(10,5000));
74                         // 啓動
75                         framework.start();
76                         doLock(framework);
77                         
78                         semaphore.release();
79                     } catch (Exception e) {
80                     
81                     }
82                 }
83             };
84             service.execute(runnable);
85         }
86         service.shutdown();
87     }
88 }
相關文章
相關標籤/搜索