在分佈式應用, 每每存在多個進程提供同一服務. 這些進程有可能在相同的機器上, 也有可能分佈在不一樣的機器上. 若是這些進程共享了一些資源, 可能就須要分佈式鎖來鎖定對這些資源的訪問.
本文將介紹如何利用zookeeper實現分佈式鎖.git
進程須要訪問共享數據時, 就在"/locks"節點下建立一個sequence類型的子節點, 稱爲thisPath. 當thisPath在全部子節點中最小時, 說明該進程得到了鎖. 進程得到鎖以後, 就能夠訪問共享資源了. 訪問完成後, 須要將thisPath刪除. 鎖由新的最小的子節點得到.
有了清晰的思路以後, 還須要補充一些細節. 進程如何知道thisPath是全部子節點中最小的呢? 能夠在建立的時候, 經過getChildren方法獲取子節點列表, 而後在列表中找到排名比thisPath前1位的節點, 稱爲waitPath, 而後在waitPath上註冊監聽, 當waitPath被刪除後, 進程得到通知, 此時說明該進程得到了鎖.github
以一個DistributedClient對象模擬一個進程的形式, 演示zookeeper分佈式鎖的實現.web
public class DistributedClient { 安全
// 超時時間 網絡
private static final int SESSION_TIMEOUT = 5000; app
// zookeeper server列表 分佈式
private String hosts = "localhost:4180,localhost:4181,localhost:4182"; this
private String groupNode = "locks"; spa
private String subNode = "sub"; 線程
private ZooKeeper zk;
// 當前client建立的子節點
private String thisPath;
// 當前client等待的子節點
private String waitPath;
private CountDownLatch latch = new CountDownLatch(1);
/**
* 鏈接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 鏈接創建時, 打開latch, 喚醒wait在該latch上的線程
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
// 發生了waitPath的刪除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
doSomething();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待鏈接創建
latch.await();
// 建立子節點
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會, 讓結果更清晰一些
Thread.sleep(10);
// 注意, 沒有必要監聽"/locks"的子節點的變化狀況
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
// 列表中只有一個子節點, 那確定就是thisPath, 說明client得到鎖
if (childrenNodes.size() == 1) {
doSomething();
} else {
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
// never happened
} else if (index == 0) {
// inddx == 0, 說明thisNode在列表中最小, 當前client得到鎖
doSomething();
} else {
// 得到排名比thisPath前1位的節點
this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
// 在waitPath上註冊監聽器, 當waitPath被刪除時, zookeeper會回調監聽器的process方法
zk.getData(waitPath, true, new Stat());
}
}
}
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 將thisPath刪除, 監聽thisPath的client將得到通知
// 至關於釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
DistributedClient dl = new DistributedClient();
dl.connectZookeeper();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
Thread.sleep(Long.MAX_VALUE);
}
}
思惟縝密的朋友可能會想到, 上述的方案並不安全. 假設某個client在得到鎖以前掛掉了, 因爲client建立的節點是ephemeral類型的, 所以這個節點也會被刪除, 從而致使排在這個client以後的client提早得到了鎖. 此時會存在多個client同時訪問共享資源.
如何解決這個問題呢? 能夠在接到waitPath的刪除通知的時候, 進行一次確認, 確認當前的thisPath是否真的是列表中最小的節點.
// 發生了waitPath的刪除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
// 確認thisPath是否真的是列表中的最小節點
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if (index == 0) {
// 確實是最小節點
doSomething();
} else {
// 說明waitPath是因爲出現異常而掛掉的
// 更新waitPath
waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
// 從新註冊監聽, 並判斷此時waitPath是否已刪除
if (zk.exists(waitPath, true) == null) {
doSomething();
}
}
}
另外, 因爲thisPath和waitPath這2個成員變量會在多個線程中訪問, 最好將他們聲明爲volatile, 以防止出現線程可見性問題.
下面介紹一種更簡單, 可是不怎麼推薦的解決方案.
每一個client在getChildren的時候, 註冊監聽子節點的變化. 當子節點的變化通知到來時, 再一次經過getChildren獲取子節點列表, 判斷thisPath是不是列表中的最小節點, 若是是, 則執行資源訪問邏輯.
public class DistributedClient2 {
// 超時時間
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts = "localhost:4180,localhost:4181,localhost:4182";
private String groupNode = "locks";
private String subNode = "sub";
private ZooKeeper zk;
// 當前client建立的子節點
private volatile String thisPath;
private CountDownLatch latch = new CountDownLatch(1);
/**
* 鏈接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 鏈接創建時, 打開latch, 喚醒wait在該latch上的線程
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
// 子節點發生變化
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
// thisPath是不是列表中的最小節點
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
doSomething();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待鏈接創建
latch.await();
// 建立子節點
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會, 讓結果更清晰一些
Thread.sleep(10);
// 監聽子節點的變化
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
// 列表中只有一個子節點, 那確定就是thisPath, 說明client得到鎖
if (childrenNodes.size() == 1) {
doSomething();
}
}
/**
* 共享資源的訪問邏輯寫在這個方法中
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 將thisPath刪除, 監聽thisPath的client將得到通知
// 至關於釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
DistributedClient2 dl = new DistributedClient2();
dl.connectZookeeper();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
Thread.sleep(Long.MAX_VALUE);
}
}
爲何不推薦這個方案呢? 是由於每次子節點的增長和刪除都要廣播給全部client, client數量很少時還看不出問題. 若是存在不少client, 那麼就可能致使廣播風暴--過多的廣播通知阻塞了網絡. 使用第一個方案, 會使得通知的數量大大降低. 固然第一個方案更復雜一些, 複雜的方案同時也意味着更容易引進bug.