zookeeper分佈式鎖

//lock 鎖 定義分佈式鎖
public interface Lock {
//獲取鎖
public void getLock();
//釋放鎖
public void unLock();
}
public abstract class ZookeeperAbstractLock implements Lock{

	//zk鏈接地址
	private static final String address = "127.0.0.1:2181";
	
	protected static final String path = "/lock";
	//zk鏈接客戶端
	protected ZkClient zkClient = new ZkClient(address);
	
	protected CountDownLatch countDownLatch = null;
	
	public void getLock() {
		if(tryLock()){
			System.out.println("###獲取鎖成功###");
		}else{
			//等待
			waitLock();
			//從新獲取鎖
			getLock();
		}
		
	}

	public void unLock() {
		if(zkClient != null){
			zkClient.close();
		}
	}
	
	// 是否獲取鎖成功,成功返回true,失敗返回false
	abstract Boolean tryLock();
	
	// 等待鎖
	abstract void waitLock();
}
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock{

	@Override
	Boolean tryLock() {
		try{
			zkClient.createEphemeral(path);
			return Boolean.TRUE;
		}catch(Exception e){
			return Boolean.FALSE;
		}
	}

	@Override
	void waitLock() {
		//使用事件監聽,獲取到節點被刪除
		IZkDataListener iZkDataListener = new IZkDataListener() {
			//當節點被刪除的時候
			public void handleDataDeleted(String dataPath) throws Exception {
				if(countDownLatch != null){
					//信號量減一,喚醒
					countDownLatch.countDown();
				}
			}
			//當節點發生改變
			public void handleDataChange(String dataPath, Object data) throws Exception {
				// TODO Auto-generated method stub
				
			}
		};
		//註冊節點信息
		zkClient.subscribeDataChanges(path, iZkDataListener);
		//檢測節點是否存在,若是存在則等待
		if(zkClient.exists(path)){
			//建立信號量
			countDownLatch = new CountDownLatch(1);
			
			try{
				//進行等待
				countDownLatch.await();
			}catch(Exception e){
				
			}
		}
		//刪除事件通知
		zkClient.unsubscribeDataChanges(path, iZkDataListener);
	}

}

<dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.7.0</version>
        </dependency>apache

public class CuratorDistrLockTest {分佈式

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "127.0.0.1:2181";
    private static final String ZK_LOCK_PATH = "/zktest";ide

    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");ui

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");code

        t1.start();
        t2.start();
    }事件

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }ip

    }ci

}get

相關文章
相關標籤/搜索