前一講中咱們知道,Zookeeper經過維護一個分佈式目錄數據結構,實現分佈式協調服務。本文主要介紹利用Zookeeper有序目錄的建立和刪除,實現分佈式共享鎖。html
舉個例子,性能管理系統中,告警規則只容許最多建立450條,咱們如何保證這個約束呢?java
若是隻有一個web節點,咱們只須要簡單的把規則數量查詢服務,入庫服務加一個鎖便可以解決,代碼以下node
synchronized(this) { if(450 > queryRuleCount()) { insertRule(rule); } }
實際上,性能管理系統至少有兩個以上的web節點,一方面保障服務性能,一方面用於容災備份。這種場景兩個規則建立請求可能在兩個web節點上執行,synchronized就無用武之地了。這種衝突在規則導入場景下更容易發生。因此,使用分佈式共享鎖就勢在必行了。web
咱們知道,zookeeper維護的分佈式目錄數據結構視圖,對於各個zookeeper節點都是相同。zookeeper容許客戶端建立一個有序的目錄——在CreateMode.EPHEMERAL_SEQUENTIAL建立模式下,zookeeper會自動在客戶端建立的目錄名稱後面添加一個自增加的id。關鍵代碼spring
// 關鍵方法,建立包含自增加id名稱的目錄,這個方法支持了分佈式鎖的實現 // 四個參數: // 一、目錄名稱 二、目錄文本信息 // 三、文件夾權限,Ids.OPEN_ACL_UNSAFE表示全部權限 // 四、目錄類型,CreateMode.EPHEMERAL_SEQUENTIAL表示會在目錄名稱後面加一個自增長數字 String lockPath = getZkClient().create( ROOT_LOCK_PATH + '/' + PRE_LOCK_NAME, Thread.currentThread().getName().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
利用zookeeper容許客戶端建立一個有序的目錄的特性,能夠實現一個可靠的分佈式共享鎖。apache
分佈式進程在讀寫一個共享數據時,能夠先在某個公共目錄下建立一個有序子目錄,而後判斷該目錄id是否最小。
目錄id最小則得到鎖並消費共享數據,而後刪除該目錄。不然則等待,直到本身的目錄id成爲最小後,纔得到鎖。數據結構
zookeeper全部目錄操做事件均可以註冊監聽器,因此分佈式進程沒必要循環查詢子目錄判斷本身的目錄id是否最小,能夠註冊一個監聽器在前一個目錄上,監聽前一個目錄是否被刪除。app
下面是一個分佈式進程消費共享消息的例子分佈式
一、 zookeeper共享鎖ide
package com.coshaho.learn.zookeeper; import java.io.IOException; import java.util.Collections; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * zookeeper分佈式共享鎖 * @author coshaho * */ public class ZookeeperLock { private String ROOT_LOCK_PATH = "/Locks"; private String PRE_LOCK_NAME = "mylock_"; private static ZookeeperLock lock; public static ZookeeperLock getInstance() { if(null == lock) { lock = new ZookeeperLock(); } return lock; } /** * 獲取鎖:其實是建立線程目錄,並判斷線程目錄序號是否最小 * @return */ public String getLock() { try { // 關鍵方法,建立包含自增加id名稱的目錄,這個方法支持了分佈式鎖的實現 // 四個參數: // 一、目錄名稱 二、目錄文本信息 // 三、文件夾權限,Ids.OPEN_ACL_UNSAFE表示全部權限 // 四、目錄類型,CreateMode.EPHEMERAL_SEQUENTIAL表示會在目錄名稱後面加一個自增長數字 String lockPath = getZkClient().create( ROOT_LOCK_PATH + '/' + PRE_LOCK_NAME, Thread.currentThread().getName().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " create lock path : " + lockPath); tryLock(lockPath); return lockPath; } catch (Exception e) { e.printStackTrace(); } return null; } private boolean tryLock(String lockPath) throws KeeperException, InterruptedException { // 獲取ROOT_LOCK_PATH下全部的子節點,並按照節點序號排序 List<String> lockPaths = getZkClient().getChildren(ROOT_LOCK_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(ROOT_LOCK_PATH.length() + 1)); if (index == 0) { System.out.println(Thread.currentThread().getName() + " get lock, lock path: " + lockPath); return true; } else { // 建立Watcher,監控lockPath的前一個節點 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { // 建立的鎖目錄只有刪除事件 System.out.println("Received delete event, node path is " + event.getPath()); synchronized (this) { notifyAll(); } } }; String preLockPath = lockPaths.get(index - 1); // 查詢前一個目錄是否存在,而且註冊目錄事件監聽器,監聽一次事件後即刪除 Stat state = getZkClient().exists(ROOT_LOCK_PATH + "/" + preLockPath, watcher); // 返回值爲目錄詳細信息 if (state == null) { return tryLock(lockPath); } else { System.out.println(Thread.currentThread().getName() + " wait for " + preLockPath); synchronized (watcher) { // 等待目錄刪除事件喚醒 watcher.wait(); } return tryLock(lockPath); } } } /** * 釋放鎖:其實是刪除當前線程目錄 * @param lockPath */ public void releaseLock(String lockPath) { try { getZkClient().delete(lockPath, -1); System.out.println("Release lock, lock path is" + lockPath); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } private String zookeeperIp = "192.168.1.104:12181"; private static ZooKeeper zkClient = null; public ZooKeeper getZkClient() { if(null == zkClient) { try { zkClient = new ZooKeeper(zookeeperIp, 3000, null); } catch (IOException e) { e.printStackTrace(); } } return zkClient; } }
二、 模擬分佈式進程消費共享消息
package com.coshaho.learn.zookeeper; import java.util.ArrayList; import java.util.List; import org.springframework.util.CollectionUtils; /** * 分佈式進程消費共享消息 * @author coshaho * */ public class DistributeCache { private static List<String> msgCache = new ArrayList<String>(); static class MsgConsumer extends Thread { @Override public void run() { while(!CollectionUtils.isEmpty(msgCache)) { String lock = ZookeeperLock.getInstance().getLock(); if(CollectionUtils.isEmpty(msgCache)) { return; } String msg = msgCache.get(0); System.out.println(Thread.currentThread().getName() + " consume msg: " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } msgCache.remove(msg); ZookeeperLock.getInstance().releaseLock(lock); } } } public static void main(String[] args) { for(int i = 0; i < 10; i++) { msgCache.add("msg" + i); } MsgConsumer consumer1 = new MsgConsumer(); MsgConsumer consumer2 = new MsgConsumer(); consumer1.start(); consumer2.start(); } }
三、 測試結果
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Thread-1 create lock path : /Locks/mylock_0000000217 Thread-0 create lock path : /Locks/mylock_0000000216 Thread-0 get lock, lock path: /Locks/mylock_0000000216 Thread-0 consume msg: msg0 Thread-1 wait for mylock_0000000216 Received delete event, node path is /Locks/mylock_0000000216 Release lock, lock path is/Locks/mylock_0000000216 Thread-1 get lock, lock path: /Locks/mylock_0000000217 Thread-1 consume msg: msg1 Thread-0 create lock path : /Locks/mylock_0000000218 Thread-0 wait for mylock_0000000217 Received delete event, node path is /Locks/mylock_0000000217 Release lock, lock path is/Locks/mylock_0000000217 Thread-0 get lock, lock path: /Locks/mylock_0000000218 Thread-0 consume msg: msg2 Thread-1 create lock path : /Locks/mylock_0000000219 Thread-1 wait for mylock_0000000218 Received delete event, node path is /Locks/mylock_0000000218 Release lock, lock path is/Locks/mylock_0000000218 Thread-1 get lock, lock path: /Locks/mylock_0000000219 Thread-1 consume msg: msg3 Thread-0 create lock path : /Locks/mylock_0000000220 Thread-0 wait for mylock_0000000219