基於zookeeper的分佈式鎖實現

  • 工做中須要寫一個定時任務,因爲是集羣環境,天然而然想到須要經過分佈式鎖來保證單臺執行..相信你們都會想到使用zk來實現對應的分佈式鎖.下面就簡單介紹一下幾種實現java

     

    準備工做

    有幾個幫助類,先把代碼放上來apache

    ZKClient 對zk的操做作了一個簡單的封裝服務器

     

    Java代碼  session

    1. package zk.lock;  
    2.   
    3. import org.apache.zookeeper.*;  
    4. import org.apache.zookeeper.data.Stat;  
    5. import zk.util.ZKUtil;  
    6.   
    7. import java.util.concurrent.CountDownLatch;  
    8. import java.util.concurrent.TimeUnit;  
    9.   
    10. /** 
    11.  * User: zhenghui 
    12.  * Date: 14-3-26 
    13.  * Time: 下午8:50 
    14.  * 封裝一個zookeeper實例. 
    15.  */  
    16. public class ZKClient implements Watcher {  
    17.   
    18.     private ZooKeeper zookeeper;  
    19.   
    20.     private CountDownLatch connectedSemaphore = new CountDownLatch(1);  
    21.   
    22.   
    23.     public ZKClient(String connectString, int sessionTimeout) throws Exception {  
    24.         zookeeper = new ZooKeeper(connectString, sessionTimeout, this);  
    25.         System.out.println("connecting zk server");  
    26.         if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {  
    27.             System.out.println("connect zk server success");  
    28.         } else {  
    29.             System.out.println("connect zk server error.");  
    30.             throw new Exception("connect zk server error.");  
    31.         }  
    32.     }  
    33.   
    34.     public void close() throws InterruptedException {  
    35.         if (zookeeper != null) {  
    36.             zookeeper.close();  
    37.         }  
    38.     }  
    39.   
    40.     public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {  
    41.         CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;  
    42.         path = ZKUtil.normalize(path);  
    43.         if (!this.exists(path)) {  
    44.             zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);  
    45.         }  
    46.     }  
    47.   
    48.     public boolean exists(String path) throws Exception {  
    49.         path = ZKUtil.normalize(path);  
    50.         Stat stat = zookeeper.exists(path, null);  
    51.         return stat != null;  
    52.     }  
    53.   
    54.     public String getData(String path) throws Exception {  
    55.         path = ZKUtil.normalize(path);  
    56.         try {  
    57.             byte[] data = zookeeper.getData(path, null, null);  
    58.             return new String(data);  
    59.         } catch (KeeperException e) {  
    60.             if (e instanceof KeeperException.NoNodeException) {  
    61.                 throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);  
    62.             } else {  
    63.                 throw new Exception(e);  
    64.             }  
    65.         } catch (InterruptedException e) {  
    66.             Thread.currentThread().interrupt();  
    67.             throw new Exception(e);  
    68.         }  
    69.     }  
    70.   
    71.     @Override  
    72.     public void process(WatchedEvent event) {  
    73.         if (event == null) return;  
    74.   
    75.         // 鏈接狀態  
    76.         Watcher.Event.KeeperState keeperState = event.getState();  
    77.         // 事件類型  
    78.         Watcher.Event.EventType eventType = event.getType();  
    79.         // 受影響的path  
    80. //        String path = event.getPath();  
    81.         if (Watcher.Event.KeeperState.SyncConnected == keeperState) {  
    82.             // 成功鏈接上ZK服務器  
    83.             if (Watcher.Event.EventType.None == eventType) {  
    84.                 System.out.println("zookeeper connect success");  
    85.                 connectedSemaphore.countDown();  
    86.             }  
    87.         }  
    88.         //下面能夠作一些重連的工做.  
    89.         else if (Watcher.Event.KeeperState.Disconnected == keeperState) {  
    90.             System.out.println("zookeeper Disconnected");  
    91.         } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {  
    92.             System.out.println("zookeeper AuthFailed");  
    93.         } else if (Watcher.Event.KeeperState.Expired == keeperState) {  
    94.             System.out.println("zookeeper Expired");  
    95.         }  
    96.     }  
    97. }  

     ZKUtil 針對zk路徑的一個工具類mybatis

    Java代碼  mvc

    1. package zk.util;  
    2.   
    3. /** 
    4.  * User: zhenghui 
    5.  * Date: 14-3-26 
    6.  * Time: 下午9:56 
    7.  */  
    8. public class ZKUtil {  
    9.   
    10.     public static final String SEPARATOR = "/";  
    11.   
    12.     /** 
    13.      * 轉換path爲zk的標準路徑 以/開頭,最後不帶/ 
    14.      */  
    15.     public static String normalize(String path) {  
    16.         String temp = path;  
    17.  
    18.         if(!path.startsWith(SEPARATOR)) {  
    19.             temp = SEPARATOR + path;  
    20.         }  
    21.         if(path.endsWith(SEPARATOR)) {  
    22.             temp = temp.substring(0, temp.length()-1);  
    23.             return normalize(temp);  
    24.         }else {  
    25.             return temp;  
    26.         }  
    27.     }  
    28.   
    29.     /** 
    30.      * 連接兩個path,並轉化爲zk的標準路徑 
    31.      */  
    32.     public static String contact(String path1,String path2){  
    33.         if(path2.startsWith(SEPARATOR)) {  
    34.             path2 = path2.substring(1);  
    35.         }  
    36.         if(path1.endsWith(SEPARATOR)) {  
    37.             return normalize(path1 + path2);  
    38.         } else {  
    39.             return normalize(path1 + SEPARATOR + path2);  
    40.         }  
    41.     }  
    42.   
    43.     /** 
    44.      * 字符串轉化成byte類型 
    45.      */  
    46.     public static byte[] toBytes(String data) {  
    47.         if(data == null || data.trim().equals("")) return null;  
    48.         return data.getBytes();  
    49.     }  
    50. }  

     NetworkUtil 獲取本機IP的工具方法框架

    Java代碼  分佈式

    1. package zk.util;  
    2.   
    3. import java.net.InetAddress;  
    4. import java.net.NetworkInterface;  
    5. import java.util.Enumeration;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-4-1 
    10.  * Time: 下午4:47 
    11.  */  
    12. public class NetworkUtil {  
    13.   
    14.     static private final char COLON = ':';  
    15.   
    16.     /** 
    17.      * 獲取當前機器ip地址 
    18.      * 聽說多網卡的時候會有問題. 
    19.      */  
    20.     public static String getNetworkAddress() {  
    21.         Enumeration<NetworkInterface> netInterfaces;  
    22.         try {  
    23.             netInterfaces = NetworkInterface.getNetworkInterfaces();  
    24.             InetAddress ip;  
    25.             while (netInterfaces.hasMoreElements()) {  
    26.                 NetworkInterface ni = netInterfaces  
    27.                         .nextElement();  
    28.                 Enumeration<InetAddress> addresses=ni.getInetAddresses();  
    29.                 while(addresses.hasMoreElements()){  
    30.                     ip = addresses.nextElement();  
    31.                     if (!ip.isLoopbackAddress()  
    32.                             && ip.getHostAddress().indexOf(COLON) == -1) {  
    33.                         return ip.getHostAddress();  
    34.                     }  
    35.                 }  
    36.             }  
    37.             return "";  
    38.         } catch (Exception e) {  
    39.             return "";  
    40.         }  
    41.     }  
    42. }  

     

    --------------------------- 正文開始  -----------------------------------ide

    這種實現很是簡單,具體的流程以下工具



     對應的實現以下

    Java代碼  

    1. package zk.lock;  
    2.   
    3.   
    4. import zk.util.NetworkUtil;  
    5. import zk.util.ZKUtil;  
    6.   
    7. /** 
    8.  * User: zhenghui 
    9.  * Date: 14-3-26 
    10.  * Time: 下午8:37 
    11.  * 分佈式鎖實現. 
    12.  * 
    13.  * 這種實現的原理是,建立某一個任務的節點,好比 /lock/tasckname 而後獲取對應的值,若是是當前的Ip,那麼得到鎖,若是不是,則沒得到 
    14.  * .若是該節點不存在,則建立該節點,並把改節點的值設置成當前的IP 
    15.  */  
    16. public class DistributedLock01 {  
    17.   
    18.     private ZKClient zkClient;  
    19.   
    20.   
    21.     public static final String LOCK_ROOT = "/lock";  
    22.     private String lockName;  
    23.   
    24.   
    25.     public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {  
    26.         //先建立zk連接.  
    27.         this.createConnection(connectString,sessionTimeout);  
    28.   
    29.         this.lockName = lockName;  
    30.     }  
    31.   
    32.     public boolean tryLock(){  
    33.         String path = ZKUtil.contact(LOCK_ROOT,lockName);  
    34.         String localIp = NetworkUtil.getNetworkAddress();  
    35.         try {  
    36.             if(zkClient.exists(path)){  
    37.                 String ownnerIp = zkClient.getData(path);  
    38.                 if(localIp.equals(ownnerIp)){  
    39.                     return true;  
    40.                 }  
    41.             } else {  
    42.                 zkClient.createPathIfAbsent(path,false);  
    43.                 if(zkClient.exists(path)){  
    44.                     String ownnerIp = zkClient.getData(path);  
    45.                     if(localIp.equals(ownnerIp)){  
    46.                         return true;  
    47.                     }  
    48.                 }  
    49.             }  
    50.         } catch (Exception e) {  
    51.             e.printStackTrace();  
    52.         }  
    53.         return false;  
    54.     }  
    55.   
    56.   
    57.     /** 
    58.      * 建立zk鏈接 
    59.      * 
    60.      */  
    61.     protected void createConnection(String connectString, int sessionTimeout) throws Exception {  
    62.         if(zkClient != null){  
    63.             releaseConnection();  
    64.         }  
    65.         zkClient = new ZKClient(connectString,sessionTimeout);  
    66.         zkClient.createPathIfAbsent(LOCK_ROOT,true);  
    67.     }  
    68.     /** 
    69.      * 關閉ZK鏈接 
    70.      */  
    71.     protected void releaseConnection() throws InterruptedException {  
    72.         if (zkClient != null) {  
    73.             zkClient.close();  
    74.         }  
    75.     }  
    76.   
    77. }  

     

    總結

    網上有不少文章,你們的方法大多數都是建立一個root根節點,每個trylock的客戶端都會在root下建立一個 EPHEMERAL_SEQUENTIAL 的子節點,同時設置root的child 變動watcher(爲了不羊羣效應,能夠只添加前一個節點的變動通知) .若是建立的節點的序號是最小,則獲取到鎖,不然繼續等待root的child 變動

  • 核心技術:Maven,Springmvc mybatis shiro, Druid, Restful, Dubbo, ZooKeeper,Redis,FastDFS,ActiveMQ,Nginx 
    1.     項目核心代碼結構截圖

    分佈式框架介紹 - kafkaee - kafkaee的博客

       項目模塊依賴分佈式框架介紹 - kafkaee - kafkaee的博客

    特別提醒:開發人員在開發的時候能夠將本身的業務REST服務化或者Dubbo服務化

    2.    項目依賴介紹

       2.1 後臺管理系統、Rest服務系統、Scheculer定時調度系統依賴以下圖:
     

    分佈式框架介紹 - kafkaee - kafkaee的博客

           2.2 Dubbo獨立服務項目依賴以下圖:

     分佈式框架介紹 - kafkaee - kafkaee的博客

    3.  項目功能部分截圖:

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客
     

    zookeeper、dubbo服務啓動 

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客
     

    dubbo管控臺 

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     REST服務平臺

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

     

    分佈式框架介紹 - kafkaee - kafkaee的博客

相關文章
相關標籤/搜索