注:該文章用做回顧記錄java
預先下載安裝 ZooKeeper ,簡單配置就能使用了。而後構建 Maven 項目,將下面的代碼粘貼到 pom.xml中:node
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.5</version> </dependency>
zkclient 是開源的客戶端工具,其中封裝了不少功能,好比:刪除包含子節點的父節點,鏈接重試,異步回調,偏向 Java 寫法的註冊監聽等,極大地方便了用戶使用。apache
下面不過多介紹客戶端操做,只針對應用場景作介紹,該文章會隨着本人的學習持續補充。服務器
使用 ZooKeeper 節點監聽來實現該功能:網絡
ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 鏈接集羣 zkClient.createPersistent("/xxx/xxx"); // 建立持久節點 // 註冊子節點變動監聽,當子節點改變(好比建立了"/xxx/xxx/1")或當前節點刪除等,會觸發異步回調 zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { } });
下面爲部分源碼:負載均衡
package org.I0Itec.zkclient; public class ZkClient implements Watcher { public List<String> watchForChilds(final String path) { return retryUntilConnected(new Callable<List<String>>() { @Override public List<String> call() throws Exception { exists(path, true); try { return getChildren(path, true); } catch (ZkNoNodeException e) { } return null; } }); } public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { final long operationStartTime = System.currentTimeMillis(); while (true) { if (_closed) { throw new IllegalStateException("ZkClient already closed!"); } try { return callable.call(); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } } }
基於 ZooKeeper 實現的數據發佈/訂閱很簡單吧,快動手試試。異步
這部分是 ZooKeeper 重要功能,在此基礎上實現諸如,分佈式協調/通知,負載均衡,Master選舉等複雜場景。分佈式
排它鎖又稱爲寫鎖或獨佔鎖。好比事務 T1 對數據對象 O1 加了排它鎖,那麼在整個加鎖期間,只容許 T1 對 O1 進行讀取或更新操做,其它事務都不能對 O1 操做。ide
1)獲取鎖工具
全部客戶端都建立臨時節點 zkClient.createEphemeral("/xxx/xxx", null);
,ZooKeeper 會保證在全部客戶端中,最終只有一個客戶端能建立成功,那麼就認爲該客戶端獲取了鎖。同時,全部沒獲取到鎖的客戶端需在/xxx/xxx
上註冊子節點變動監聽,以便實時監聽節點變化。如節點發生變化,則未獲取到鎖的客戶端再從新獲取鎖。
private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String lockParentPath = "/zk-book/exclusice_lock"; public static void main(String[] args) throws InterruptedException { try { zkClient.createEphemeral(lockParentPath + "/lock"); System.out.println("service3 獲取鎖成功"); } catch (Exception e) { System.out.println("service3獲取鎖失敗"); zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("service3再次獲取鎖"); main(null); } }); } Thread.sleep(Integer.MAX_VALUE); }
2)釋放鎖
當 "/xxx/xxx"
是臨時節點時,如下倆種狀況都會釋放鎖。
共享鎖又稱爲讀鎖。若是事務 T1 對數據對象 O1 加了共享鎖,那麼 T1 只能對 O1 進行讀取操做,其它事務只能對 O1 加共享鎖,直到 O1 上全部共享鎖都被釋放。
1)獲取鎖
全部客戶端都建立臨時順序節點 zkClient.createEphemeralSequential("/xxx/xxx", null);
,ZooKeeper 會生成相似下面的節點,已保證節點的惟一性。
2)判斷讀寫順序
"/xxx"
下的全部子節點,並對該節點註冊子節點變動監聽。
如下爲實現代碼:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分佈式共享鎖 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLock { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static volatile boolean isExecuted = false; public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null); String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1); List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); if (currentChilds.size() > 0) isExecuted = getLockAndExecute(currentChilds, node); zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if (currentChilds.size() > 0) { currentChilds = sortNodes(currentChilds); isExecuted = getLockAndExecute(currentChilds, node); } } }); while (!isExecuted) { Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節點 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List<String> sortNodes(List<String> nodes) { Collections.sort(nodes, new Comparator<String>() { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號 } }); return nodes; } /** * 獲取節點位置 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @return */ private static Integer getNodePosition(List<String> nodes, String node) { for (int i = 0, size = nodes.size(); i < size; i++) { if (nodes.get(i).equals(node)) return i; } return null; // 無此數據 } /** * 是否獲得鎖 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @param node * @param nodePosition * @return */ private static boolean isGetLock(List<String> nodes, String node, int nodePosition) { if (nodePosition == 0) // 沒有比此序號更小的節點 return true; if (node.indexOf("r-") > -1) { // 讀節點 for (int i = 0; i < nodePosition; i++) { // 遍歷小於次序號的節點 String nodeTemp = nodes.get(i); if (nodeTemp.indexOf("w-") > -1) // 存在寫節點,則進入等待鎖 return false; } return true; } return false; } /** * 獲取鎖並執行 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLockAndExecute(List<String> currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) // 子節點爲空 return false; System.out.println("子節點:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition); boolean isGetLock = isGetLock(currentChilds, node, nodePosition); if (isGetLock) { System.out.println(node + " 成功獲取到鎖,開始執行耗時任務"); doSomething(); boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node); if (isSuccess) System.out.println(node + " 成功執行完任務並刪除節點"); } else { System.out.println(node + " 未獲取到鎖"); } return isGetLock; } private static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試以上代碼會發現,當獲取鎖的節點過多時,某一節點變動會通知全部節點,會對 ZooKeeper 服務器形成巨大的性能影響和網絡衝擊,服務器會發送給客戶端大量的事件通知。好比有如下節點,當 w-24 節點變動時,會通知給其他節點。
由於當獲取共享鎖時,要判斷比本身序號小的節點,因此應該只給 r-25 節點發送通知。針對此狀況,改進後判斷讀寫順序爲:
"/xxx"
下的全部子節點。
改進後的共享鎖代碼實現:
import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.http.client.ClientProtocolException; /** * 分佈式共享鎖最優 * @author alexnevsky * @date 2018年5月23日 */ public class SharedLockOptimal { private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); private static final String PARENT_PATH = "/zk-book/shared_lock"; private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null); public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException { List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH)); String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1); boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node); System.out.println("當前全部節點:" + currentChilds.toString() + ", 該" + (isReadNode ? "讀" : "寫") + "節點:" + node); if (isGetLock) { execute(node); System.out.println("退出程序"); System.exit(1); } else { String monitorNode = getMonitorNode(currentChilds, node); System.out.println(node + " 未獲取到鎖,註冊監聽節點:" + monitorNode); if (null != monitorNode) { zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { main(null); // 遞歸調用 } }); } Thread.sleep(Integer.MAX_VALUE); } } /** * 排序節點 * @author alexnevsky * @date 2018年5月24日 * @param nodes * @return */ private static List<String> sortNodes(List<String> nodes) { Collections.sort(nodes, new Comparator<String>() { @Override public int compare(String o1, String o2) { o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", ""); o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", ""); return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號 } }); return nodes; } /** * 獲取節點位置 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static Integer getNodePosition(List<String> currentChilds, String node) { for (int i = 0, size = currentChilds.size(); i < size; i++) { if (currentChilds.get(i).equals(node)) return i; } return null; } /** * 獲取監聽節點 * @author alexnevsky * @date 2018年5月25日 * @param currentChilds * @param node * @return */ private static String getMonitorNode(List<String> currentChilds, String node) { String monitorNode = null; Integer nodePosition = getNodePosition(currentChilds, node); if (0 < nodePosition) { // 非首節點 if (node.indexOf("r-") > -1) { // 讀節點 // 獲取比當前序號小的最後一個寫節點 for (int i = nodePosition - 1; i >= 0; i--) { String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) return tempNode; } } else { // 獲取比當前序號小的最後一個節點 return currentChilds.get(nodePosition - 1); } } return monitorNode; } /** * 獲取鎖 * @author alexnevsky * @date 2018年5月24日 * @param currentChilds * @param node * @return */ private static boolean getLock(List<String> currentChilds, String node) { Integer nodePosition = getNodePosition(currentChilds, node); if (nodePosition == null) return false; if (nodePosition == 0) // 無序號更小的節點 return true; if (node.indexOf("r-") > -1) { // 讀節點 for (int i = 0; i < nodePosition; i++) { // 遍歷前面序號的節點 String tempNode = currentChilds.get(i); if (tempNode.indexOf("w-") > -1) // 存在寫節點,返回失敗 return false; } return true; } return false; } /** * 執行 * @author alexnevsky * @date 2018年5月24日 * @param node * @return */ private static void execute(String node) { System.out.println(node + " 成功獲取到鎖,開始執行耗時任務"); doSomething(); boolean isDeletedLock = zkClient.delete(nodeFullPath); System.out.println(node + " 成功執行完任務,刪除節點" + (isDeletedLock ? "成功" : "失敗")); } /** * 模擬耗時任務 * @author alexnevsky * @date 2018年5月25日 */ public static void doSomething() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }