基於zookeeper的分佈式鎖

其實現以下代碼:
java

public class DistributedLock implements Lock {

    private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);

    private ZooKeeper zooKeeper = null;
    /**
     * 鎖根節點目錄
     */
    private final String LOCK_ROOT = "/locks";
    /**
     * 當前節點
     */
    private String currentNode;
    /**
     * 獲取須要監控的前一個節點
     */
    private String prevNode;
    /**
     * 當前已得到鎖節點
     */
    private String lockedBy;
    /**
     * 節點名稱
     */
    private String nodeName;
    /**
     * 線程阻塞器
     */
    private CountDownLatch latch;

    public DistributedLock() {
        this("/lock");
    }

    public DistributedLock(String nodeName) {
        this("192.168.52.128:2181", 60000, nodeName);
    }

    public DistributedLock(String connectString, int sessionTimeout, String nodeName) {
        this.nodeName = nodeName;
        initZooKeeper(connectString, sessionTimeout);
        latch = new CountDownLatch(1);
    }

    /**
     * 初始化zookeeper
     * @param connectString
     * @param sessionTimeout
     */
    public void initZooKeeper(String connectString, int sessionTimeout) {
        try {
            //建立zookeeper實例,Watcher:監控節點變化的事件
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, new ZKWatcher());
            //判斷鎖對應的根節點是否存在
            Stat stat = zooKeeper.exists(LOCK_ROOT, false);
            if(null == stat) {
                zooKeeper.create(LOCK_ROOT, LOCK_ROOT.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void lock() {
        if(!tryLock()) {
            //阻塞
            await();
        }
        logger.info("線程{}獲取鎖{}", Thread.currentThread().getName(), currentNode);
    }

    /**
     * 阻塞線程,直到latch計數countDown=0
     */
    private void await() {
        try {
            Stat stat = zooKeeper.exists(prevNode, true);
            if(null != stat) {
                logger.info("線程{}被阻塞,當前鎖{}", Thread.currentThread().getName(), lockedBy);
                latch.await();
            }
            //前一個節點不存在,則當前節點可獲取鎖
            lockedBy = currentNode;
        } catch (InterruptedException e) {
            logger.error("thread is interrupted{}", e);
        } catch (KeeperException e) {
            logger.error("KeeperException{}", e);
            e.printStackTrace();
        }
    }

    @Override
    public boolean tryLock() {
        if(hasLocked()) {
            return true;
        }

        try {
            if (null == currentNode) {
                currentNode = zooKeeper.create(LOCK_ROOT + nodeName, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                logger.info("線程{}建立節點{}完成", Thread.currentThread().getName(), currentNode);
            }
            //若是當前最小的節點與當前建立的節點相同,則獲取到鎖
            if(isLockNode(currentNode)) {
                return true;
            }
        } catch (InterruptedException e) {
            logger.error("create zooKeeper node failed...", e);
            e.printStackTrace();
        } catch (KeeperException e) {
            logger.error("create zooKeeper node failed...", e);
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    /**
     * 判斷當前節點是不是須要枷鎖的節點(最小的節點)
     * @param currentNode
     * @return
     */
    private boolean isLockNode(String currentNode) {
        List<String> nodes = getChildren();
        Collections.sort(nodes, new Comparator<String>() {
            public int compare(String s1, String s2) {
                return s1.compareTo(s2);
            }
        });

        lockedBy = LOCK_ROOT + "/" + nodes.get(0);

        if(currentNode.equals(lockedBy)) {
            return true;
        }

        //須要監控前一個節點數據的變化
        String nodeName = currentNode.substring(LOCK_ROOT.length() + 1);
        for(int i = 1; i < nodes.size(); i++) {
            if(nodeName.equals(nodes.get(i))) {
                prevNode = LOCK_ROOT + "/" + nodes.get(i - 1);
                break;
            }
        }
        return false;
    }

    /**
     * 獲取全部節點
     * @return
     */
    private List<String> getChildren() {
        try {
            return zooKeeper.getChildren(LOCK_ROOT, false);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 判斷當前是否已經獲取到鎖
     * @return
     */
    private boolean hasLocked() {
        return null != currentNode && null != lockedBy && currentNode.equals(lockedBy);
    }

    @Override
    public void unlock() {
        try {
            Stat stat = zooKeeper.exists(currentNode, true);
            if(null != stat) {
                logger.info("線程{}釋放鎖{}", Thread.currentThread().getName(), currentNode);
                zooKeeper.delete(currentNode, stat.getVersion());
            }
        } catch (KeeperException e) {
            e.printStackTrace();
            logger.info("線程{}釋放鎖{}失敗", Thread.currentThread().getName(), currentNode);
        } catch (InterruptedException e) {
            e.printStackTrace();
            logger.info("線程{}釋放鎖{}失敗", Thread.currentThread().getName(), currentNode);
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public Condition newCondition() {
        return null;
    }


    /**
     * 監控節點變化
     */
    private class ZKWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            //判斷是不是刪除節點事件,而且判斷刪除的節點是不是當前節點的前一個節點
            if(event.getType() == Event.EventType.NodeDeleted) {
                latch.countDown();
            }
        }
    }
}
相關文章
相關標籤/搜索