其實現以下代碼:
java
public class DistributedLock implements Lock { private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class); private ZooKeeper zooKeeper = null; /** * 鎖根節點目錄 */ private final String LOCK_ROOT = "/locks"; /** * 當前節點 */ private String currentNode; /** * 獲取須要監控的前一個節點 */ private String prevNode; /** * 當前已得到鎖節點 */ private String lockedBy; /** * 節點名稱 */ private String nodeName; /** * 線程阻塞器 */ private CountDownLatch latch; public DistributedLock() { this("/lock"); } public DistributedLock(String nodeName) { this("192.168.52.128:2181", 60000, nodeName); } public DistributedLock(String connectString, int sessionTimeout, String nodeName) { this.nodeName = nodeName; initZooKeeper(connectString, sessionTimeout); latch = new CountDownLatch(1); } /** * 初始化zookeeper * @param connectString * @param sessionTimeout */ public void initZooKeeper(String connectString, int sessionTimeout) { try { //建立zookeeper實例,Watcher:監控節點變化的事件 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new ZKWatcher()); //判斷鎖對應的根節點是否存在 Stat stat = zooKeeper.exists(LOCK_ROOT, false); if(null == stat) { zooKeeper.create(LOCK_ROOT, LOCK_ROOT.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public void lock() { if(!tryLock()) { //阻塞 await(); } logger.info("線程{}獲取鎖{}", Thread.currentThread().getName(), currentNode); } /** * 阻塞線程,直到latch計數countDown=0 */ private void await() { try { Stat stat = zooKeeper.exists(prevNode, true); if(null != stat) { logger.info("線程{}被阻塞,當前鎖{}", Thread.currentThread().getName(), lockedBy); latch.await(); } //前一個節點不存在,則當前節點可獲取鎖 lockedBy = currentNode; } catch (InterruptedException e) { logger.error("thread is interrupted{}", e); } catch (KeeperException e) { logger.error("KeeperException{}", e); e.printStackTrace(); } } @Override public boolean tryLock() { if(hasLocked()) { return true; } try { if (null == currentNode) { currentNode = zooKeeper.create(LOCK_ROOT + nodeName, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); logger.info("線程{}建立節點{}完成", Thread.currentThread().getName(), currentNode); } //若是當前最小的節點與當前建立的節點相同,則獲取到鎖 if(isLockNode(currentNode)) { return true; } } catch (InterruptedException e) { logger.error("create zooKeeper node failed...", e); e.printStackTrace(); } catch (KeeperException e) { logger.error("create zooKeeper node failed...", e); e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } /** * 判斷當前節點是不是須要枷鎖的節點(最小的節點) * @param currentNode * @return */ private boolean isLockNode(String currentNode) { List<String> nodes = getChildren(); Collections.sort(nodes, new Comparator<String>() { public int compare(String s1, String s2) { return s1.compareTo(s2); } }); lockedBy = LOCK_ROOT + "/" + nodes.get(0); if(currentNode.equals(lockedBy)) { return true; } //須要監控前一個節點數據的變化 String nodeName = currentNode.substring(LOCK_ROOT.length() + 1); for(int i = 1; i < nodes.size(); i++) { if(nodeName.equals(nodes.get(i))) { prevNode = LOCK_ROOT + "/" + nodes.get(i - 1); break; } } return false; } /** * 獲取全部節點 * @return */ private List<String> getChildren() { try { return zooKeeper.getChildren(LOCK_ROOT, false); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } /** * 判斷當前是否已經獲取到鎖 * @return */ private boolean hasLocked() { return null != currentNode && null != lockedBy && currentNode.equals(lockedBy); } @Override public void unlock() { try { Stat stat = zooKeeper.exists(currentNode, true); if(null != stat) { logger.info("線程{}釋放鎖{}", Thread.currentThread().getName(), currentNode); zooKeeper.delete(currentNode, stat.getVersion()); } } catch (KeeperException e) { e.printStackTrace(); logger.info("線程{}釋放鎖{}失敗", Thread.currentThread().getName(), currentNode); } catch (InterruptedException e) { e.printStackTrace(); logger.info("線程{}釋放鎖{}失敗", Thread.currentThread().getName(), currentNode); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public Condition newCondition() { return null; } /** * 監控節點變化 */ private class ZKWatcher implements Watcher { @Override public void process(WatchedEvent event) { //判斷是不是刪除節點事件,而且判斷刪除的節點是不是當前節點的前一個節點 if(event.getType() == Event.EventType.NodeDeleted) { latch.countDown(); } } } }