zookeeper 分佈式鎖

利用zookeeper實現

當不少進程須要訪問共享資源時,咱們能夠經過zk來實現分佈式鎖。主要步驟是: 
1.創建一個節點,假如名爲:lock 。節點類型爲持久節點(PERSISTENT) 
2.每當進程須要訪問共享資源時,會調用分佈式鎖的lock()或tryLock()方法得到鎖,這個時候會在第一步建立的lock節點下創建相應的順序子節點,節點類型爲臨時順序節點(EPHEMERAL_SEQUENTIAL),經過組成特定的名字name+lock+順序號。 
3.在創建子節點後,對lock下面的全部以name開頭的子節點進行排序,判斷剛剛創建的子節點順序號是不是最小的節點,假如是最小節點,則得到該鎖對資源進行訪問。 
4.假如不是該節點,就得到該節點的上一順序節點,並給該節點是否存在註冊監聽事件。同時在這裏阻塞。等待監聽事件的發生,得到鎖控制權。 
5.當調用完共享資源後,調用unlock()方法,關閉zk,進而能夠引起監聽事件,釋放該鎖。 
實現的分佈式鎖是嚴格的按照順序訪問的併發鎖。java

 

代碼實現

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Watcher {

    ZooKeeper zk = null; // zookeeper原生api去實現一個分佈式鎖

    private String root = "/locks";

    private String myZonode; // 表示當前獲取到的鎖名稱-也就是節點名稱

    private String waitNode; // 表示當前等待的節點

    private CountDownLatch latch;

    private static final int SESSION_TIMEOUT = 10000; // 超時時間

    /**
     * 構造函數初始化
     * 
     * @param config
     *            表示zookeeper鏈接串
     */
    public DistributedLock(String config) {
        try {
            zk = new ZooKeeper(config, SESSION_TIMEOUT, this);
            Stat stat = zk.exists(root, false); // 判斷是否是已經存在locks節點,不須要監聽root節點
            if (stat == null) { // 若是不存在,則建立根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent event) {
        if (this.latch != null) { // 若是計數器不爲空話話,釋放計數器鎖
            this.latch.countDown();
        }
    }

    /**
     * 獲取鎖的方法
     */
    public boolean lock(String name) {
        if (tryLock(name)) {
            return true;
        }
        try {
            return waitLock(waitNode, SESSION_TIMEOUT);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 釋放鎖操做的方法
     */
    public void unlock() {
        try {
            zk.delete(myZonode, -1);
            myZonode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    private boolean tryLock(String name) {
        String splitStr = name; // lock_0000000001
        try {
            // 建立一個有序的臨時節點,賦值給myznode
            myZonode = zk.create(root + "/" + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            List<String> subNodes = zk.getChildren(root, false);
            Collections.sort(subNodes); // 講全部的子節點排序
            if (myZonode.equals(root + "/" + subNodes.get(0))) {
                // 當前客戶端建立的臨時有序節點是locks下節點中的最小的節點,表示當前的客戶端可以獲取到鎖
                return true;
            }
            // 不然的話,監聽比本身小的節點 locks/lock_0000000003
            String subMyZnode = myZonode
                    .substring((myZonode.lastIndexOf("/") + 1));
            waitNode = subNodes.get(Collections.binarySearch(subNodes,
                    subMyZnode) - 1);// 獲取比當前節點小的節點
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitLock(String lower, long waitTime)
            throws KeeperException, InterruptedException {
        Stat stat = zk.exists(root + "/" + lower, true); // 獲取節點狀態,並添加監聽
        if (stat != null) {
            this.latch = new CountDownLatch(1); // 實例化計數器,讓當前的線程等待
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public static int count =10;
    
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {

                public void run() {
                    try {
                        DistributedLock distributeLockDemo = new DistributedLock(
                                "127.0.0.1:2181");
                        boolean lock = distributeLockDemo.lock("test_");
                        if (lock) {
                            System.out.println(count--);
                            distributeLockDemo.unlock();
                        }

                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            };

            executorService.execute(runnable);
        }
        executorService.shutdown();

    }

}
相關文章
相關標籤/搜索