ZooKeeper.分佈式鎖

#ZooKeeper.分佈式鎖java

  • 看了好多版本的分佈式鎖,大同小異,其中有幾個自旋exists監聽節點,以爲很笨,那就再造個輪子吧
  • 思路無非都是相同的,利用ZooKeeper強一致特性
  1. 進程併發在固定父節點下建立不一樣的臨時節點
  2. 查看建立的子節點,排序,取第一個,等於本身的節點,算獲取到鎖,執行操做,其餘監聽第一個節點,進入等待
  3. 第一個節點變更,全部機器收到事件,喚醒線程,刪除本身的節點,進入步驟1
  • CountDownLatch 比較簡潔,可是仍是lock更直白,更明確。

tips

  • 這個輪子僅僅是個toy,真正生產環境考慮更多,如釋放鎖失敗異常各類狀況
public class ZKLock implements Lock{
	
	private static Logger log = Logger.getLogger(ZKLock.class.getSimpleName());
	
	private ZooKeeper zookpeer; 
	private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
	private static final String ROOT = "/lock";
	private byte[] b = new byte[]{1};
	private String lock;
	private ReentrantLock reenLock = new ReentrantLock();
	private Condition condition = reenLock.newCondition();
	//private CountDownLatch c;
	private int i;
	
	public ZKLock(String zookpeer,int i) {
		try {
			this.i = i;
			this.zookpeer = new ZooKeeper(zookpeer,3000,new Watcher(){
				
				@Override
				public void process(WatchedEvent event) {
					System.out.println("==啓動回調==");
				}});
			
			init();
		} catch (IOException e) {
			
		}
	}
	
	@Override
	public boolean tryLock() {
		reenLock.lock();
		try {
			System.out.println("===================");
			System.out.println("開始搶鎖 機器" + i);
			//c = new CountDownLatch(1);
			String myLock = zookpeer.create(getLockName(), b, acl, CreateMode.EPHEMERAL);
			this.lock = getFirstNode();
			
			if(myLock.equals(this.lock)) {
				//獲取到鎖
				System.out.println("我得到鎖,我是 機器" + i + ":" + this.lock + " !!!!!!!!!!!!");
			}
			else {
				System.out.println("我沒搶到鎖  機器" + i);
			}
			
			reg(zookpeer);
			
			condition.await();
			//c.await();
			System.out.println("釋放鎖 機器:" + i);
			if(zookpeer.exists(myLock, false) != null) {
				zookpeer.delete(myLock, -1);
			}
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}finally{
			reenLock.unlock();
		}
		return false;
	}

	//初始化鎖目錄
	public void init(){
		try {
			if(zookpeer.exists(ROOT, false) == null) {
				zookpeer.create(ROOT,b, acl, CreateMode.PERSISTENT);
			}
		} catch (KeeperException e) {
			log.log(Level.WARNING,"root already exist",e);
		} catch (InterruptedException e) {
			log.log(Level.SEVERE,"connection fail");
		}
	}
	
	public void reg(ZooKeeper zk) {
		try {
			zk.exists(lock, new M());
		} catch (KeeperException e) {
		} catch (InterruptedException e) {
		}
	}
	
	class M implements Watcher{
		
		@Override
		public void process(WatchedEvent event) {
			reenLock.lock();
			try {
				System.out.println("節點變動 機器i" + i);
				//c.countDown();
				condition.signal();
				reg(zookpeer);
			} finally{
				reenLock.unlock();
			}
		}
	}
	
	@Override
	public void lock() {
		while(true){
			tryLock();
		}
	}

	private String getFirstNode() throws KeeperException, InterruptedException {
		List<String> children = zookpeer.getChildren(ROOT, false);
		Collections.sort(children);
		System.out.println("當前節點:" + Arrays.toString(children.toArray()));
		return ROOT + "/" + children.get(0);
	}

	private String getLockName() {
		String name = UUID.randomUUID().toString();
		return ROOT + "/" + name.substring(0,name.indexOf("-"));
	}	
        .....
}
//3線程模擬爭奪 手動刪除節點,可看到從新爭鎖
public class LockWatch {

	public static void main(String[] args) throws Exception {
		ExecutorService p = Executors.newCachedThreadPool();
		for(int i = 0; i < 3; i ++) {
			p.submit(new T(i));
		}
	}
}
class T implements Runnable{
	
	private int i;
	
	public T(int i){
		this.i = i;
	}
	@Override
	public void run() {
		final ZKLock z = new ZKLock("127.0.0.1",i);
		z.lock();
	}
}
==啓動回調==
==啓動回調==
==啓動回調==
===================
===================
===================
開始搶鎖 機器2
開始搶鎖 機器0
開始搶鎖 機器1
當前節點:[a63a788c, c61a117b, decc9d68]
當前節點:[a63a788c, c61a117b, decc9d68]
我沒搶到鎖  機器1
當前節點:[a63a788c, c61a117b, decc9d68]
我沒搶到鎖  機器0
我得到鎖,我是 機器2:/lock/a63a788c !!!!!!!!!!!!
節點變動 機器i2
節點變動 機器i1
節點變動 機器i0
釋放鎖 機器:1
釋放鎖 機器:0
釋放鎖 機器:2
相關文章
相關標籤/搜索