ZooKeeper(七)-- ZK原生API實現分佈式鎖

1、使用場景

在分佈式應用,每每存在多個進程提供同一服務。這些進程有可能在相同的機器上,也有可能分佈在不一樣的機器上。 若是這些進程共享了一些資源,可能就須要分佈式鎖來鎖定對這些資源的訪問。java

2、實現分佈式鎖結構圖

img

3、代碼實現

package com.xbq.zookeeper.javaApi;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/** * 使用Zookeeper原生API實現分佈式鎖 * @author xbq */
public class ZookeeperLock implements Watcher{

    // 聲明zk對象
    private ZooKeeper zk = null;
    // 此demo使用的集羣,因此有多個ip和端口
    private static String CONNECT_SERVER = "192.168.242.130:2181,192.168.242.130:2182,192.168.242.130:2183";
    // session過時時間
    private static int SESSION_TIMEOUT = 3000;
    // 根節點
    private String root = "/locks";
    // 當前等待的節點
    private String waitNode;
    // 等待的時間
    private int waitTime;
    // 鎖節點
    private String myzkNode;
    // 計數器
    private CountDownLatch latch;
    
    /** * 構造函數 初始化 */
    public ZookeeperLock(){
        try {
            zk = new ZooKeeper(CONNECT_SERVER, SESSION_TIMEOUT, this);
            // 判斷是否存在根節點,不須要監聽根節點
            Stat stat = zk.exists(root, false);
            // 爲空,說明不存在
            if(stat == null){
                // 添加一個永久節點,即添加一個根節點
                zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /** * 嘗試獲取鎖 * @return */
    private boolean tryLock(){
        String splitStr = "lock_";  // 格式 lock_000000001
        try {
            // 建立一個臨時有序節點,並賦值給 myzkNode
            myzkNode = zk.create(root + "/" + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 獲取根節點下的全部子節點
            List<String> children = zk.getChildren(root, false);
            // 對子節點 排序
            Collections.sort(children);
            // 若是剛剛建立的節點 等於 獲取最小的一個子節點,則說明 獲取到鎖
            if(myzkNode.equals(root + "/" + children.get(0))){
                return true;
            }
            // 若是剛剛建立的節點 不等於 獲取到的最小的一個子節點,則 監控比本身小的一個節點
            // 獲取剛剛創建的子節點(不包含根節點的子節點)
            String childNode = myzkNode.substring(myzkNode.lastIndexOf("/") + 1);
            // 獲取比本身小的節點
            waitNode = children.get(Collections.binarySearch(children, childNode) - 1);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
    
    /** * 等待鎖釋放 * @param waitNode * @param waidTime * @return * @throws KeeperException * @throws InterruptedException */
    private boolean waitLock(String waitNode, int waidTime) throws KeeperException, InterruptedException{
        // 判斷比本身小的節點是否存在,並添加監聽
        Stat stat = zk.exists(root + "/" + waitNode, true);
        // 若是存在 比本身小的節點
        if(stat != null){
            this.latch = new CountDownLatch(1);
            this.latch.await(waidTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }
    
    /** * 獲取鎖 */
    public void lock(){
        // 若是嘗試獲取鎖成功
        if(tryLock()){
            // 獲取 成功
            System.out.println("Thread" + Thread.currentThread().getName() + " -- hold lock!");
            return;
        }
        // 等待並獲取鎖
        try {
            waitLock(waitNode, waitTime);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /** * 解鎖 */
    public void unLock(){
        try {
            zk.delete(myzkNode, -1);
            zk.close();
            System.out.println("Thread" + Thread.currentThread().getName() +" unlock success! 鎖節點:" + myzkNode);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /** * 刪除的時候 觸發事件 */
    @Override
    public void process(WatchedEvent event) {
        // 若是計數器不爲空的話,釋放計數器鎖
        if(this.latch != null){
            this.latch.countDown();
        }
    }
    
    /** * 測試 * @param args */
    public static void main(String[] args) {
        // 定義線程池
        ExecutorService service = Executors.newCachedThreadPool();
        // 只能10個線程同時運行,即模擬併發數爲10
        final Semaphore semaphore = new Semaphore(10);
        // 10個客戶端鏈接
        for(int i=0;i<10;i++){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        ZookeeperLock zkLock = new ZookeeperLock();
                        zkLock.lock();
                        // 業務操做代碼
                        Thread.sleep(3000);
                        zkLock.unLock();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}
複製代碼

歡迎關注個人公衆號,第一時間接收最新文章~ 搜索公衆號: 碼咖 或者 掃描下方二維碼:

img
相關文章
相關標籤/搜索