中間件 - ZooKeeper應用場景實踐

注:該文章用做回顧記錄java

1、準備工做

預先下載安裝 ZooKeeper ,簡單配置就能使用了。而後構建 Maven 項目,將下面的代碼粘貼到 pom.xml中:node

<dependency>  
        <groupId>org.apache.zookeeper</groupId>  
        <artifactId>zookeeper</artifactId>  
        <version>3.4.5</version>  
    </dependency>  
    <dependency>  
        <groupId>com.101tec</groupId>  
        <artifactId>zkclient</artifactId>  
        <version>0.5</version>  
    </dependency>

zkclient 是開源的客戶端工具,其中封裝了不少功能,好比:刪除包含子節點的父節點,鏈接重試,異步回調,偏向 Java 寫法的註冊監聽等,極大地方便了用戶使用。apache

下面不過多介紹客戶端操做,只針對應用場景作介紹,該文章會隨着本人的學習持續補充。服務器

2、數據發佈/訂閱

使用 ZooKeeper 節點監聽來實現該功能:網絡

ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000); // 鏈接集羣

zkClient.createPersistent("/xxx/xxx"); // 建立持久節點

// 註冊子節點變動監聽,當子節點改變(好比建立了"/xxx/xxx/1")或當前節點刪除等,會觸發異步回調
zkClient.subscribeChildChanges("/xxx/xxx", new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List<String> currentChilds) 
        throws Exception {
    }
});

下面爲部分源碼:負載均衡

package org.I0Itec.zkclient;

public class ZkClient implements Watcher {

    public List<String> watchForChilds(final String path) {
        return retryUntilConnected(new Callable<List<String>>() {
            @Override
            public List<String> call() throws Exception {
                exists(path, true);
                try {
                    return getChildren(path, true);
                } catch (ZkNoNodeException e) {
                }
                return null;
            }
        });
    }
    
    public <T> T retryUntilConnected(Callable<T> callable) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
        final long operationStartTime = System.currentTimeMillis();
        while (true) {
            if (_closed) {
                throw new IllegalStateException("ZkClient already closed!");
            }
            try {
                return callable.call();
            } catch (Exception e) {
                throw ExceptionUtil.convertToRuntimeException(e);
            }
        }
    }
}

基於 ZooKeeper 實現的數據發佈/訂閱很簡單吧,快動手試試。異步

3、分佈式鎖

這部分是 ZooKeeper 重要功能,在此基礎上實現諸如,分佈式協調/通知,負載均衡,Master選舉等複雜場景。分佈式

一、排它鎖

排它鎖又稱爲寫鎖或獨佔鎖。好比事務 T1 對數據對象 O1 加了排它鎖,那麼在整個加鎖期間,只容許 T1 對 O1 進行讀取或更新操做,其它事務都不能對 O1 操做。ide

1)獲取鎖工具

全部客戶端都建立臨時節點 zkClient.createEphemeral("/xxx/xxx", null);,ZooKeeper 會保證在全部客戶端中,最終只有一個客戶端能建立成功,那麼就認爲該客戶端獲取了鎖。同時,全部沒獲取到鎖的客戶端需在/xxx/xxx 上註冊子節點變動監聽,以便實時監聽節點變化。如節點發生變化,則未獲取到鎖的客戶端再從新獲取鎖。

private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
private static final String lockParentPath = "/zk-book/exclusice_lock";

public static void main(String[] args) throws InterruptedException {
    try {
        zkClient.createEphemeral(lockParentPath + "/lock");
        System.out.println("service3 獲取鎖成功");
    } catch (Exception e) {
        System.out.println("service3獲取鎖失敗");
        zkClient.subscribeChildChanges(lockParentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                System.out.println("service3再次獲取鎖");
                main(null);
            }
        });
    }
    Thread.sleep(Integer.MAX_VALUE);
}

2)釋放鎖

"/xxx/xxx" 是臨時節點時,如下倆種狀況都會釋放鎖。

  1. 當已獲取鎖的客戶機宕機,致使鏈接超時斷開,那麼 ZooKeeper 會將臨時節點刪除。
  2. 正常執行完邏輯後,客戶端主動將臨時節點刪除。

排它鎖流程圖

二、共享鎖

共享鎖又稱爲讀鎖。若是事務 T1 對數據對象 O1 加了共享鎖,那麼 T1 只能對 O1 進行讀取操做,其它事務只能對 O1 加共享鎖,直到 O1 上全部共享鎖都被釋放。

1)獲取鎖

全部客戶端都建立臨時順序節點 zkClient.createEphemeralSequential("/xxx/xxx", null);,ZooKeeper 會生成相似下面的節點,已保證節點的惟一性。

臨時順序節點

2)判斷讀寫順序

  1. 建立完臨時順序節點後,獲取 "/xxx" 下的全部子節點,並對該節點註冊子節點變動監聽。
  2. 肯定建立完的臨時順序節點在全部節點中的順序。
  3. 對於讀節點:
    沒有比本身序號小的節點,或比本身序號小的節點都是讀節點,則成功獲取到共享鎖。
    若是比本身序號小的節點中存在寫節點,則需進入等待。
    對於寫節點:
    若是本身不是序號最小的節點,則需進入等待。
  4. 接受到子節點變動通知後,重複步驟1

共享鎖流程圖

如下爲實現代碼:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分佈式共享鎖
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLock {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static volatile boolean isExecuted = false;
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        String nodeTemp = zkClient.createEphemeralSequential(PARENT_PATH + "/w-", null);
        String node = nodeTemp.substring(nodeTemp.lastIndexOf("/") + 1);
        
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        if (currentChilds.size() > 0)
            isExecuted = getLockAndExecute(currentChilds, node);
        
        zkClient.subscribeChildChanges(PARENT_PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                if (currentChilds.size() > 0) {
                    currentChilds = sortNodes(currentChilds);
                    isExecuted = getLockAndExecute(currentChilds, node);
                }
            }
        });
        
        while (!isExecuted) {
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序節點
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號
            }
        });
        return nodes;
    }
    
    /**
     * 獲取節點位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> nodes, String node) {
        for (int i = 0, size = nodes.size(); i < size; i++) {
            if (nodes.get(i).equals(node))
                return i;
        }
        return null; // 無此數據
    }
    
    /**
     * 是否獲得鎖
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @param node
     * @param nodePosition
     * @return
     */
    private static boolean isGetLock(List<String> nodes, String node, int nodePosition) {
        if (nodePosition == 0) // 沒有比此序號更小的節點 
            return true;
        if (node.indexOf("r-") > -1) { // 讀節點
            for (int i = 0; i < nodePosition; i++) { // 遍歷小於次序號的節點
                String nodeTemp = nodes.get(i);
                if (nodeTemp.indexOf("w-") > -1)  // 存在寫節點,則進入等待鎖
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 獲取鎖並執行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLockAndExecute(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null) // 子節點爲空
            return false;
        System.out.println("子節點:" + currentChilds.toString() + ", " + node + " 的位置:" + nodePosition);
        boolean isGetLock = isGetLock(currentChilds, node, nodePosition);
        if (isGetLock) {
            System.out.println(node + " 成功獲取到鎖,開始執行耗時任務");
            doSomething();
            boolean isSuccess = zkClient.delete(PARENT_PATH + "/" + node);
            if (isSuccess)
                System.out.println(node + " 成功執行完任務並刪除節點");
        } else {
            System.out.println(node + " 未獲取到鎖");
        }
        return isGetLock;
    }
    
    private static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

測試以上代碼會發現,當獲取鎖的節點過多時,某一節點變動會通知全部節點,會對 ZooKeeper 服務器形成巨大的性能影響和網絡衝擊,服務器會發送給客戶端大量的事件通知。好比有如下節點,當 w-24 節點變動時,會通知給其他節點。

圖片描述

由於當獲取共享鎖時,要判斷比本身序號小的節點,因此應該只給 r-25 節點發送通知。針對此狀況,改進後判斷讀寫順序爲:

  1. 建立完臨時順序節點後,獲取 "/xxx" 下的全部子節點。
  2. 客戶端調用 getChildren() 來獲取子節點列表,注意,這裏不註冊任何監聽。
  3. 若是未獲取到共享鎖,那麼找到比本身序號小的節點來註冊監聽,分爲如下倆種狀況:
    讀節點:比本身序號小的最後一個寫節點註冊監聽
    寫節點:比本身序號小的最後一個節點註冊監聽
  4. 等待監聽通知,重複步驟2

改進後的共享鎖流程圖

改進後的共享鎖代碼實現:

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.http.client.ClientProtocolException;

/**
 * 分佈式共享鎖最優
 * @author alexnevsky
 * @date 2018年5月23日
 */
public class SharedLockOptimal {
    
    private static ZkClient zkClient = new ZkClient("IP1:Port1,IP2:Port2,IP3:Port3", 5000);
    private static final String PARENT_PATH = "/zk-book/shared_lock";
    private static String nodeFullPath = zkClient.createEphemeralSequential(PARENT_PATH + "/r-", null);
    
    public static void main(String[] args) throws InterruptedException, ClientProtocolException, IOException {
        List<String> currentChilds = sortNodes(zkClient.getChildren(PARENT_PATH));
        String node = nodeFullPath.substring(nodeFullPath.lastIndexOf("/") + 1);
        
        boolean isReadNode = node.indexOf("r-") > -1 ? true : false, isGetLock = getLock(currentChilds, node);
        System.out.println("當前全部節點:" + currentChilds.toString() + ", 該" + (isReadNode ? "讀" : "寫") + "節點:" + node);
        
        if (isGetLock) {
            execute(node);
            System.out.println("退出程序");
            System.exit(1);
        } else {
            String monitorNode = getMonitorNode(currentChilds, node);
            System.out.println(node + " 未獲取到鎖,註冊監聽節點:" + monitorNode);
            if (null != monitorNode) {
                zkClient.subscribeChildChanges(PARENT_PATH + "/" + monitorNode, new IZkChildListener() {
                    @Override
                    public void handleChildChange(String parentPath, List<String> currentChilds)
                            throws Exception {
                        main(null); // 遞歸調用
                    }
                });
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    /**
     * 排序節點
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param nodes
     * @return
     */
    private static List<String> sortNodes(List<String> nodes) {
        Collections.sort(nodes, new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                o1 = o1.indexOf("r-") > -1 ? o1.replaceFirst("r-", "") : o1.replaceFirst("w-", "");
                o2 = o2.indexOf("r-") > -1 ? o2.replaceFirst("r-", "") : o2.replaceFirst("w-", "");
                return Integer.parseInt(o1) - Integer.parseInt(o2); // 比較序列號
            }
        });
        return nodes;
    }
    
    /**
     * 獲取節點位置
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static Integer getNodePosition(List<String> currentChilds, String node) {
        for (int i = 0, size = currentChilds.size(); i < size; i++) {
            if (currentChilds.get(i).equals(node))
                return i;
        }
        return null;
    }
    
    /**
     * 獲取監聽節點
     * @author alexnevsky
     * @date 2018年5月25日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static String getMonitorNode(List<String> currentChilds, String node) {
        String monitorNode = null;
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (0 < nodePosition) { // 非首節點
            if (node.indexOf("r-") > -1) { // 讀節點
                // 獲取比當前序號小的最後一個寫節點
                for (int i = nodePosition - 1; i >= 0; i--) {
                    String tempNode = currentChilds.get(i);
                    if (tempNode.indexOf("w-") > -1) 
                        return tempNode;
                }
            } else {
                // 獲取比當前序號小的最後一個節點
                return currentChilds.get(nodePosition - 1);
            }
        }
        return monitorNode;
    }
    
    /**
     * 獲取鎖
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param currentChilds
     * @param node
     * @return
     */
    private static boolean getLock(List<String> currentChilds, String node) {
        Integer nodePosition = getNodePosition(currentChilds, node);
        if (nodePosition == null)
            return false;
        if (nodePosition == 0) // 無序號更小的節點 
            return true;
        if (node.indexOf("r-") > -1) { // 讀節點
            for (int i = 0; i < nodePosition; i++) { // 遍歷前面序號的節點
                String tempNode = currentChilds.get(i);
                if (tempNode.indexOf("w-") > -1)  // 存在寫節點,返回失敗
                    return false;
            }
            return true;
        }
        return false;
    }
    
    /**
     * 執行
     * @author alexnevsky
     * @date 2018年5月24日  
     * @param node
     * @return
     */
    private static void execute(String node) {
        System.out.println(node + " 成功獲取到鎖,開始執行耗時任務");
        doSomething();
        boolean isDeletedLock = zkClient.delete(nodeFullPath);
        System.out.println(node + " 成功執行完任務,刪除節點" + (isDeletedLock ? "成功" : "失敗"));
    }
    
    /**
     * 模擬耗時任務
     * @author alexnevsky
     * @date 2018年5月25日
     */
    public static void doSomething() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}
相關文章
相關標籤/搜索