#ZooKeeper.分佈式鎖java
public class ZKLock implements Lock{ private static Logger log = Logger.getLogger(ZKLock.class.getSimpleName()); private ZooKeeper zookpeer; private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final String ROOT = "/lock"; private byte[] b = new byte[]{1}; private String lock; private ReentrantLock reenLock = new ReentrantLock(); private Condition condition = reenLock.newCondition(); //private CountDownLatch c; private int i; public ZKLock(String zookpeer,int i) { try { this.i = i; this.zookpeer = new ZooKeeper(zookpeer,3000,new Watcher(){ @Override public void process(WatchedEvent event) { System.out.println("==啓動回調=="); }}); init(); } catch (IOException e) { } } @Override public boolean tryLock() { reenLock.lock(); try { System.out.println("==================="); System.out.println("開始搶鎖 機器" + i); //c = new CountDownLatch(1); String myLock = zookpeer.create(getLockName(), b, acl, CreateMode.EPHEMERAL); this.lock = getFirstNode(); if(myLock.equals(this.lock)) { //獲取到鎖 System.out.println("我得到鎖,我是 機器" + i + ":" + this.lock + " !!!!!!!!!!!!"); } else { System.out.println("我沒搶到鎖 機器" + i); } reg(zookpeer); condition.await(); //c.await(); System.out.println("釋放鎖 機器:" + i); if(zookpeer.exists(myLock, false) != null) { zookpeer.delete(myLock, -1); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ reenLock.unlock(); } return false; } //初始化鎖目錄 public void init(){ try { if(zookpeer.exists(ROOT, false) == null) { zookpeer.create(ROOT,b, acl, CreateMode.PERSISTENT); } } catch (KeeperException e) { log.log(Level.WARNING,"root already exist",e); } catch (InterruptedException e) { log.log(Level.SEVERE,"connection fail"); } } public void reg(ZooKeeper zk) { try { zk.exists(lock, new M()); } catch (KeeperException e) { } catch (InterruptedException e) { } } class M implements Watcher{ @Override public void process(WatchedEvent event) { reenLock.lock(); try { System.out.println("節點變動 機器i" + i); //c.countDown(); condition.signal(); reg(zookpeer); } finally{ reenLock.unlock(); } } } @Override public void lock() { while(true){ tryLock(); } } private String getFirstNode() throws KeeperException, InterruptedException { List<String> children = zookpeer.getChildren(ROOT, false); Collections.sort(children); System.out.println("當前節點:" + Arrays.toString(children.toArray())); return ROOT + "/" + children.get(0); } private String getLockName() { String name = UUID.randomUUID().toString(); return ROOT + "/" + name.substring(0,name.indexOf("-")); } ..... }
//3線程模擬爭奪 手動刪除節點,可看到從新爭鎖 public class LockWatch { public static void main(String[] args) throws Exception { ExecutorService p = Executors.newCachedThreadPool(); for(int i = 0; i < 3; i ++) { p.submit(new T(i)); } } } class T implements Runnable{ private int i; public T(int i){ this.i = i; } @Override public void run() { final ZKLock z = new ZKLock("127.0.0.1",i); z.lock(); } }
==啓動回調== ==啓動回調== ==啓動回調== =================== =================== =================== 開始搶鎖 機器2 開始搶鎖 機器0 開始搶鎖 機器1 當前節點:[a63a788c, c61a117b, decc9d68] 當前節點:[a63a788c, c61a117b, decc9d68] 我沒搶到鎖 機器1 當前節點:[a63a788c, c61a117b, decc9d68] 我沒搶到鎖 機器0 我得到鎖,我是 機器2:/lock/a63a788c !!!!!!!!!!!! 節點變動 機器i2 節點變動 機器i1 節點變動 機器i0 釋放鎖 機器:1 釋放鎖 機器:0 釋放鎖 機器:2