Zookeerper實現分佈式鎖服務

利用Curator客戶端API,實現分佈式事務鎖.java

1.Maven 座標配置

<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>2.5.0</version>  
</dependency>

2.利用API封裝類 CuratorUtil.java

import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CuratorUtil implements InitializingBean {
    private static final Logger LOG = LoggerFactory.getLogger(CuratorUtil.class);

    //zookeeper.connection.url=172.30.0.177:2181,172.30.0.173:2181,172.30.0.171:2181
    //zookeeper.iread.lock.path=/iread/source/lock

    @Value("${zookeeper.connection.url}")
    private String zookeeperConnectionString;

    @Value("${zookeeper.lockPath.prefix}")
    private String lockPathPrefix;

    private CuratorFramework client;
    
    @Override
    public void afterPropertiesSet() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
        client.start();
    }

    /**
     * 獲取鎖。返回不爲null表示成功獲取到鎖,用完以後須要調用releaseLock方法釋放
     * @param relativePath 鎖的相對路徑,Not start with '/'
     * @param waitSeconds 等待秒數
     * @return 未獲取到鎖返回null
     */
    public InterProcessMutex getLock(String relativePath, int waitSeconds) {
        InterProcessMutex lock = new InterProcessMutex(client, lockPathPrefix + relativePath);
        try {
            if (lock.acquire(waitSeconds, TimeUnit.SECONDS)) {
                return lock;
            }
        } catch (Exception e) {
            LOG.error("get lock error", e);
        }
        releaseLock(lock);
        return null;
    }

    /**
     * 釋放鎖
     */
    public void releaseLock(InterProcessMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                LOG.warn("release lock error", e);
            }
        }
    }
}

3.調用方法

import javax.annotation.Resource;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;


/**
 * 抽象分佈式Job(自動獲取和釋放ZK分佈式鎖) <br>
 * 子類實現process()方法進行業務處理
 */
public abstract class AbstractDistributedJob {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    /** 至少鎖60秒 */
    private static final long LOCK_MIN_TIME = 60000;

    @Resource
    private CuratorUtil curatorUtil;

    public void run() {
        InterProcessMutex lock = curatorUtil.getLock(getClass().getSimpleName() + "/lock", 1);
        if (lock == null) {
            LOG.info("can not get lock, exit job.");
            return;
        }

        long st = System.currentTimeMillis();
        LOG.info("start job...");

        try {
            process();
        } catch (Exception e) {
            LOG.error("job error", e);
        } finally {
            long cost = System.currentTimeMillis() - st;
            LOG.info("job finished, cost {} ms.", cost);

            if (cost < LOCK_MIN_TIME) {
                try {
                    Thread.sleep(LOCK_MIN_TIME - cost);
                } catch (InterruptedException e) {}
            }
            curatorUtil.releaseLock(lock);
        }
    }

    public abstract void process() throws Exception;
}

4.使用案例

        使用場景:當部署多臺服務器時,有一個任務須要,若是沒有分佈式鎖,則多臺服務器都會執行這個任務,可是咱們每每只想讓其中一臺服務器執行這個任務。git

                  1.    5臺tomcat服務器,部署相同的war包,每一個tomcat服務器都會在凌晨2點執行一次消息推送,爲了防止5臺服務器都推送消息,部署三臺zookeeper 服務器,5臺Tomcat服務器都鏈接上zookeeper服務器,而後在推送消息的時候,獲取鎖的那臺服務器執行任務,從而保證了Tomcat服務器集羣只有一臺服務器獲取鎖,執行任務。github

                 2.    分佈式調度,一臺消息隊列服務器MQ,多個業務邏輯服務器,多個業務邏輯服務器能夠使用一個分佈式鎖去競爭消息隊列數據,獲取到鎖的服務器獲取數據,保證了消息隊列的每條數據只被一臺服務器獲取,從而保證多臺服務器併發執行任務。redis

 

redis實現分佈式鎖  spring

  https://yq.aliyun.com/articles/307547?spm=5176.100239.blogrightarea309637.19.af4dc28ybSYCyapache

 redisson分佈式鎖tomcat

  https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#81-lock服務器

相關文章
相關標籤/搜索