//lock 鎖 定義分佈式鎖 public interface Lock { //獲取鎖 public void getLock(); //釋放鎖 public void unLock(); }
public abstract class ZookeeperAbstractLock implements Lock{ //zk鏈接地址 private static final String address = "127.0.0.1:2181"; protected static final String path = "/lock"; //zk鏈接客戶端 protected ZkClient zkClient = new ZkClient(address); protected CountDownLatch countDownLatch = null; public void getLock() { if(tryLock()){ System.out.println("###獲取鎖成功###"); }else{ //等待 waitLock(); //從新獲取鎖 getLock(); } } public void unLock() { if(zkClient != null){ zkClient.close(); } } // 是否獲取鎖成功,成功返回true,失敗返回false abstract Boolean tryLock(); // 等待鎖 abstract void waitLock(); }
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock{ @Override Boolean tryLock() { try{ zkClient.createEphemeral(path); return Boolean.TRUE; }catch(Exception e){ return Boolean.FALSE; } } @Override void waitLock() { //使用事件監聽,獲取到節點被刪除 IZkDataListener iZkDataListener = new IZkDataListener() { //當節點被刪除的時候 public void handleDataDeleted(String dataPath) throws Exception { if(countDownLatch != null){ //信號量減一,喚醒 countDownLatch.countDown(); } } //當節點發生改變 public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub } }; //註冊節點信息 zkClient.subscribeDataChanges(path, iZkDataListener); //檢測節點是否存在,若是存在則等待 if(zkClient.exists(path)){ //建立信號量 countDownLatch = new CountDownLatch(1); try{ //進行等待 countDownLatch.await(); }catch(Exception e){ } } //刪除事件通知 zkClient.unsubscribeDataChanges(path, iZkDataListener); } }
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.0</version>
</dependency>apache
public class CuratorDistrLockTest {分佈式
/** Zookeeper info */
private static final String ZK_ADDRESS = "127.0.0.1:2181";
private static final String ZK_LOCK_PATH = "/zktest";ide
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");ui
Thread t1 = new Thread(() -> {
doWithLock(client);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(client);
}, "t2");code
t1.start();
t2.start();
}事件
private static void doWithLock(CuratorFramework client) {
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}ip
}ci
}get