1.基於ZooKeeper分佈式鎖的流程java
在zookeeper指定節點(locks)下建立臨時順序節點node_n 獲取locks下全部子節點children 對子節點按節點自增序號從小到大排序 判斷本節點是否是第一個子節點,如果,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件 若監聽事件生效,則回到第二步從新進行判斷,直到獲取到鎖
2.實現node
zookeeper系列(五)實戰分佈式鎖apache
3.簡單實現segmentfault
package zookeeper; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TestLock implements Watcher { private ZooKeeper zk=null; private String config; private String root="/locks"; private String lock="/lock_"; private String currentPath; // 計數器 private CountDownLatch countDownLatch; @Override public void process(WatchedEvent event) { System.out.println(event.getPath()+"-"+event.getType()+"-"+event.getState()); } /** * @param config 路徑 * @param root 根目錄 * @throws IOException * @throws InterruptedException * @throws KeeperException */ public TestLock(String config) throws IOException, KeeperException, InterruptedException { if(null==config) { return; } this.config=config; zk=new ZooKeeper(config,5000,this); Stat st=zk.exists(root, false); if(null==st) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } /** * 獲取鎖 * @return * @throws InterruptedException * @throws KeeperException */ public synchronized boolean getLock() throws KeeperException, InterruptedException { currentPath=zk.create(root+lock,new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("當前路徑:"+currentPath.substring(currentPath.lastIndexOf('/')+1)); while(true) { List<String>children=zk.getChildren(root, false); //排序 Collections.sort(children); //獲取位置 System.out.println(children.toString()); int index=children.indexOf(currentPath.substring(currentPath.lastIndexOf("/")+1)); //不是在開頭位置 if(index!=0) { System.out.println(root+"/"+children.get(0)); new ZkClient(this.config).subscribeDataChanges(root+"/"+children.get(0),new IZkDataListener(){ // 當修改當前節點的數據 public void handleDataChange(String arg0, Object arg1) throws Exception { System.out.println("---"); } @Override public void handleDataDeleted(String arg0) throws Exception { System.out.println("線程:"+Thread.currentThread().getName()+"釋放鎖:"+currentPath); countDownLatch.countDown(); } }); this.countDownLatch=new CountDownLatch(1); this.countDownLatch.await(); this.countDownLatch=null; }else { System.out.println("線程:"+Thread.currentThread().getName()+"獲取鎖:"+currentPath); break; } } return false; } public void unlock() throws InterruptedException, KeeperException { this.zk.delete(currentPath, -1); System.out.println("線程:"+Thread.currentThread().getName()+"釋放鎖:"+currentPath); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { Runnable runnable=new Runnable() { @Override public void run() { TestLock tl; try { tl = new TestLock("127.0.0.1:2181"); tl.getLock(); tl.unlock(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (KeeperException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; // for (int i = 0; i < 1; i++) { // Thread t = new Thread(runnable); // t.start(); // } Thread t = new Thread(runnable); t.start(); } }