ZooKeeper實現分佈式鎖

1.基於ZooKeeper分佈式鎖的流程java

在zookeeper指定節點(locks)下建立臨時順序節點node_n
獲取locks下全部子節點children
對子節點按節點自增序號從小到大排序
判斷本節點是否是第一個子節點,如果,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件
若監聽事件生效,則回到第二步從新進行判斷,直到獲取到鎖

2.實現node

zookeeper系列(五)實戰分佈式鎖apache

3.簡單實現segmentfault

package zookeeper;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class TestLock implements Watcher {

	private ZooKeeper zk=null;
	private String config;
	private String root="/locks";
	private String lock="/lock_";
	private String currentPath;
	// 計數器
    private CountDownLatch countDownLatch;
    
	@Override
	public void process(WatchedEvent event) {
		System.out.println(event.getPath()+"-"+event.getType()+"-"+event.getState());
	}
	
	/**
	 * @param config 路徑
	 * @param root  根目錄
	 * @throws IOException
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */
	public TestLock(String config) throws IOException, KeeperException, InterruptedException {
		if(null==config) {
			return;
		}
		this.config=config;
		zk=new ZooKeeper(config,5000,this);
		Stat st=zk.exists(root, false);
		if(null==st) {
			zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
		}
	}
	
	/**
	 * 獲取鎖
	 * @return
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */
	public synchronized boolean  getLock() throws KeeperException, InterruptedException {
		currentPath=zk.create(root+lock,new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
		System.out.println("當前路徑:"+currentPath.substring(currentPath.lastIndexOf('/')+1));
		while(true) {
        List<String>children=zk.getChildren(root, false);
        //排序
        Collections.sort(children);
        //獲取位置
        System.out.println(children.toString());
        int index=children.indexOf(currentPath.substring(currentPath.lastIndexOf("/")+1));
        //不是在開頭位置
        if(index!=0) {
        	System.out.println(root+"/"+children.get(0));
        	new ZkClient(this.config).subscribeDataChanges(root+"/"+children.get(0),new IZkDataListener(){
        		// 當修改當前節點的數據
        		public void handleDataChange(String arg0, Object arg1)
        		throws Exception {
        			System.out.println("---");
        		}
				@Override
				public void handleDataDeleted(String arg0) throws Exception {
					System.out.println("線程:"+Thread.currentThread().getName()+"釋放鎖:"+currentPath);
					 countDownLatch.countDown();
				}	
        	});
        	this.countDownLatch=new CountDownLatch(1);
        	this.countDownLatch.await();
        	this.countDownLatch=null;        	
        }else {
        	System.out.println("線程:"+Thread.currentThread().getName()+"獲取鎖:"+currentPath);
        	break;
         }
        }
		return false;
	}

	public void unlock() throws InterruptedException, KeeperException {
		this.zk.delete(currentPath, -1);
		System.out.println("線程:"+Thread.currentThread().getName()+"釋放鎖:"+currentPath);
	}
	
	public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
		Runnable runnable=new Runnable() {
			@Override
			public void run() {
				TestLock tl;
				try {
					tl = new TestLock("127.0.0.1:2181");
					tl.getLock();
					tl.unlock();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (KeeperException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		};
		
//		 for (int i = 0; i < 1; i++) {
//	            Thread t = new Thread(runnable);
//	            t.start();
//	        }
		
		  Thread t = new Thread(runnable);
          t.start();
	}
	
}
相關文章
相關標籤/搜索