ZooKeeper分佈式鎖應用

ZooKeeper分佈式鎖應用

1、Zookeeper是什麼

Zookeeper是一個高性能的分佈式系統的協調服務。它在一個簡單的接口裏暴露公共服務:像命名、配置管理、同步、和羣組服務,因此你沒有必要從頭開始實現它們。你可使用現成的Zookeeper去實現共識、羣組管理、領導人選舉和業務協議。而且你能夠在它的基礎之上創建本身特定的需求。java

2、Zookeeper分佈式鎖的實現原理

利用臨時順序節點實現Zookeeper分佈式鎖。node

一、首先爲一個lock場景,在zookeeper中指定對應的一個根節點,用於記錄資源競爭的內容,稱之爲/lock_nodeapache

二、每一個lock建立後,會lazy在zookeeper中建立一個node節點,代表對應的資源競爭標識。 (小技巧:node節點爲EPHEMERAL_SEQUENTIAL,自增加的臨時節點)。好比有兩個客戶端建立znode,那分別爲/lock_node/lock-一、/lock_node/lock-2session

三、進行lock操做時,獲取對應lock根節點下的全部字節點,也即處於競爭中的資源標識maven

四、按照Fair競爭的原則,按照對應的自增內容作排序,取出編號最小的一個節點作爲lock的owner,判斷本身的節點id是否就爲owner id,若是是則返回,lock成功。分佈式

五、若是本身非owner id,按照排序的結果找到序號比本身前一位的id,關注它鎖釋放的操做(也就是exist watcher),造成一個鏈式的觸發過程。
unlock過程ide

六、將本身id對應的節點刪除便可,對應的下一個排隊的節點就能夠收到Watcher事件,從而被喚醒獲得鎖後退出性能

ZooKeeper的幾個特性讓它很是合適做爲分佈式鎖服務測試

zookeeper支持watcher機制,這樣實現阻塞鎖,能夠watch鎖數據,等到數據被刪除,zookeeper會通知客戶端去從新競爭鎖。
zookeeper的數據能夠支持臨時節點的概念,即客戶端寫入的數據是臨時數據,在客戶端宕機後,臨時數據會被刪除,這樣就實現了鎖的異常釋放。使用這樣的方式,就不須要給鎖增長超時自動釋放的特性了。ui

3、Zookeeper分佈式鎖應用

Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。

使用場景

一、建立ZooKeeperConnector.java

package com.robot.zookeeper.components;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/6/21 18:07
 * @description:ZooKeeper鏈接器
 */
public class ZooKeeperConnector {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 鏈接IP端口信息,格式爲:10.1.74.75:2281,10.1.74.75:2282,10.1.74.75:2283
     */
    private String hosts;

    private CuratorFramework client;

    private static final int DEFAULT_SESSION_TIMEOUT_MS = 30 * 1000;
    private static final int DEFAULT_CONNECTION_TIMEOUT_MS = 10 * 1000;

    private int sessionTimeout = DEFAULT_SESSION_TIMEOUT_MS;
    private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT_MS;

    /**
     * 鏈接ZooKeeper
     */
    public void connect() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(hosts, sessionTimeout, connectionTimeout, retryPolicy);
        client.start();
        logger.info("Successfully connected to Zookeeper [{}] ", hosts);
    }

    /**
     * 關閉ZooKeeper的鏈接
     */
    public void close() {
        CloseableUtils.closeQuietly(client);
    }

    /**
     * 重連
     *
     * @return
     */
    public CuratorFramework reConnect() {
        connect();
        return client;
    }

    public String getHosts() {
        return hosts;
    }

    public void setHosts(String hosts) {
        this.hosts = hosts;
    }

    public CuratorFramework getClient() {
        if (client == null) {
            connect();
        }
        return client;
    }

    public void setClient(CuratorFramework client) {
        this.client = client;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public int getConnectionTimeout() {
        return connectionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }
}

二、建立AccountLock.java

package com.robot.zookeeper.utils;

import com.robot.zookeeper.components.ZooKeeperConnector;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/6/22 10:16
 * @description:帳戶分佈式鎖
 */
public class AccountLock {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 共享鎖
     */
    InterProcessMutex lock;

    /**
     * 是否獲取到鎖
     */
    boolean wasAcquired = false;

    /**
     * 構造器
     *
     * @param path
     * @param zooKeeperConnector
     */
    public AccountLock(String path, ZooKeeperConnector zooKeeperConnector) {
        lock = new InterProcessMutex(zooKeeperConnector.getClient(), path);
    }

    /**
     * 嘗試加鎖並等待
     *
     * @param timeOut 超時時間(秒),-1表示不等待
     * @return true表示獲取鎖成功,false表示獲取鎖失敗
     */
    public boolean acquire(int timeOut) {
        try {
            if (timeOut == -1) {
                wasAcquired = lock.acquire(-1, null);
            } else {
                wasAcquired = lock.acquire(timeOut, TimeUnit.SECONDS);
            }
        } catch (Exception e) {
            logger.error("Get lock time out error", e.getMessage());
            wasAcquired = false;
        }
        return wasAcquired;
    }

    /**
     * 嘗試加鎖並等待
     *
     * @param timeOut  timeOut 超時時間,-1表示不等待
     * @param timeUnit 超時時間單位
     * @return true表示獲取鎖成功,false表示獲取鎖失敗
     */
    public boolean acquire(int timeOut, TimeUnit timeUnit) {
        try {
            wasAcquired = lock.acquire(timeOut, timeUnit);
        } catch (Exception e) {
            logger.error("Get lock time out error", e.getMessage());
            wasAcquired = false;
        }
        return wasAcquired;
    }

    /**
     * 釋放鎖
     */
    public void release() {
        if (wasAcquired) {
            try {
                lock.release();
            } catch (Exception e) {
                logger.error("release lock error", e.getMessage());
            }
        }
    }

}

三、測試類

package com.robot.zookeeper;

import com.robot.zookeeper.components.ZooKeeperConnector;
import com.robot.zookeeper.utils.AccountLock;

/**
 * @author: 會跳舞的機器人
 * @date: 2017/6/22 10:34
 * @description:
 */
public class Test {
    public static void main(String[] args) {
        final ZooKeeperConnector zooKeeperConnector = new ZooKeeperConnector();
        zooKeeperConnector.setHosts("192.168.133.128:2181,192.168.133.129:2182,192.168.133.130:2183");
        zooKeeperConnector.connect();

        /**
         * 建立4個線程去獲取鎖
         */
        for (int i = 1; i < 5; i++) {
            Thread thread = new Thread() {
                @Override
                public void run() {
                    AccountLock accountLock = new AccountLock("/ACCOUNT/221890", zooKeeperConnector);
                    boolean wasAcquired = accountLock.acquire(10);
                    if (wasAcquired) {
                        System.out.println("線程" + Thread.currentThread().getName() + "獲取到鎖");
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        accountLock.release();
                        System.out.println("線程" + Thread.currentThread().getName() + "釋放鎖");
                    }
                }
            };
            thread.start();
        }
    }
}

輸出結果:

線程Thread-3獲取到鎖
線程Thread-3釋放鎖
線程Thread-4獲取到鎖
線程Thread-4釋放鎖
線程Thread-2獲取到鎖
線程Thread-2釋放鎖
線程Thread-1獲取到鎖
線程Thread-1釋放鎖

帳戶加鎖的時候,咱們針對用戶的ID進行加鎖,在測試類中,咱們建立了4個線程去獲取鎖,從輸出結果能夠看出每次只有一個線程能獲取到鎖,而且在該線程釋放鎖以後,其餘的線程才能獲取到鎖。

固然,測試類中的ZooKeeperConnector的初始化通常都是經過Spring進行管理

<beans>
    <bean id="zkConnector" class="com.baibei.component.zk.ZooKeeperConnector"
        init-method="connect" lazy-init="false">
        <property name="hosts" value="#{environment['ZOOKEEPER.CONNECTION.HOSTS']}" />
    </bean>
</beans>

Demo中所須要的maven配置以下:

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.10.0</version>
        </dependency>

        <!-- curator的內嵌包版本存在問題,因此用這個版原本替代-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>

原文轉自:https://www.jianshu.com/p/11b81630990b做者:會跳舞的機器人

相關文章
相關標籤/搜索