zookeeper分佈式鎖實現

前段時間遇到一個問題,每一個應用啓了一個定時任務,可是要求每一個定時任務不能同時操做同一條數據庫記錄,當時想到的是對錶加鎖。後面瞭解到能夠經過zookeeper實現分佈式鎖,查了一些資料,本身總結了一下,寫了一個分佈式鎖。java

實現步驟:node

1. ZooKeeper 調用 create (「lock/sub_lock」)方法來建立一個路徑格式爲「 lock/sub_lock」的節點,此節點類型爲EPHEMERAL_SEQUENTIAL 。順序自動編號的節點,這種節點會根據當前已近存在的節點數自動加 1。且建立的節點爲臨時節點,即客戶端與服務器端 session 超時,或者鏈接主動斷開時,該節點會自動刪除,建立的節點格式爲「lock/sub_lock0000000001」。git

2. 在建立的鎖節點上調用 getChildren(「lock/sub_lock」)方法,來獲取鎖目錄下的最小編號節點,而且不設置 watch 。github

3. 步驟 2 中獲取的節點剛好是步驟 1 中客戶端建立的節點,那麼此客戶端得到此種類型的鎖,繼續進行後續的操做,當操做完成後,刪除當前節點。數據庫

4. 客戶端在鎖目錄上調用 exists ()方法,而且設置 watch 來監聽鎖目錄下比本身小一個的連續臨時節點是否被刪除。apache

5. 若是監聽節點已經被刪除,重複步驟2。(不能直接得到鎖,前一個節點被刪除,多是因爲客戶端斷開鏈接,而不是得到鎖以後主動刪除的)。服務器

實現代碼:session

package com.peach.zk.peachLock;

import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.peach.zk.apacheLock.LockListener;
import com.peach.zk.constant.Constant;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Created by zengtao on 2016/8/19.
 */
public class DistributedLock {

    private ZooKeeper zk;
    private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
    private String lockPath = null;
    private LockListener callBack;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void init(ZooKeeper zk, LockListener lockListener) {
        this.zk = zk;
        this.callBack = lockListener;
    }

    public void getLock() {
        createLock();
        lock();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("", e);
        }
    }

    public void unLock() {
        try {
            Stat stat = zk.exists(lockPath, false);
            if (stat != null) {
                zk.delete(lockPath, -1);
                callBack.lockReleased();
            }
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }

    private synchronized void lock() {
        logger.info("|| in lock");
        List<String> sortedChildren = getSortedChildren();
        try {
            int index = sortedChildren.indexOf(lockPath);
            switch (index) {
                case 0:
                    logger.info("||so happy, I get my lock");
                    callBack.lockAcquired();
                    countDownLatch.countDown();
                    break;
                default:
                    String preChildren = sortedChildren.get(index - 1);
                    Stat stat = zk.exists(preChildren, new NodeChangeWatcher());
                    if (stat == null) {
                        logger.warn("node {} has bean deleted", preChildren);
//                        Thread.sleep(3000);
                        lock();
                    }
            }
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
        logger.info("|| out lock");
    }

    private void createLock() {
        try {
            Stat stat = zk.exists(Constant.ZK_LOCK_PATH, false);

            if (stat == null) {
                zk.create(Constant.ZK_LOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            lockPath = zk.create(Constant.ZK_SUBLOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException | InterruptedException e) {
            logger.error("|| create znode error", e);
        }
    }

    private List<String> getSortedChildren() {
        //獲取按照字母排序的鏈表
        Ordering<String> order = Ordering.natural();
        List<String> fullpathChildrenList = Lists.newArrayList();
        List<String> sortedChildren = Lists.newArrayList();
        try {
            List<String> childrenList = zk.getChildren(Constant.ZK_LOCK_PATH, false);
            //須要獲得children node 的全路徑,後面能夠匹配該node
            for (String children : childrenList) {
                fullpathChildrenList.add(Constant.ZK_LOCK_PATH + "/" + children);
            }
            sortedChildren = order.sortedCopy(fullpathChildrenList);
        } catch (KeeperException | InterruptedException e) {
            logger.error("||", e);
        }
        return sortedChildren;
    }

    private class NodeChangeWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                logger.info("|| pre node changed");
                lock();
            }
        }
    }
}
package com.peach.zk.constant;

/**
 * Created by zengtao on 2016/8/16.
 */
public class Constant {
    public static final String ZK_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183";
    public static final int ZK_SESSION_TIMEOUT = 50000;
    public static final String ZK_REGISTRY_PATH = "/registry";
    public static final String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
    public static final String ZK_LOCK_PATH = "/lock";
    public static final String ZK_SUBLOCK_PATH = ZK_LOCK_PATH + "/sub_lock";

}
package com.peach.zk.apacheLock;

public interface LockListener {

    public void lockAcquired() throws InterruptedException;
    
    public void lockReleased();
}
package com.peach.zk.util;

import com.peach.zk.constant.Constant;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

/**
 * Created by zengtao on 2016/8/16.
 */
public class ZkUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class);

    private static final CountDownLatch latch = new CountDownLatch(1);
    public static synchronized ZooKeeper connectServer( ) {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (Exception e) {
            logger.error("", e);
        }
        return zk;
    }
}

驅動程序:app

package com.peach.zk.peachLock;

import com.peach.zk.apacheLock.LockListener;
import com.peach.zk.util.ZkUtil;
import org.apache.zookeeper.ZooKeeper;

/**
 * Created by zengtao on 2016/8/19.
 */
public class Driver {

    public static void main(String[] args) throws InterruptedException {
        ZooKeeper zk = ZkUtil.connectServer();
        while (true) {
            System.out.println("while");
            final DistributedLock distributedLock = new DistributedLock();
            distributedLock.init(zk, new LockListener() {
                @Override
                public void lockAcquired() throws InterruptedException {
                    System.out.println("|| get lock, do something");
                    Thread.sleep(10000);
                    distributedLock.unLock();
                }

                @Override
                public void lockReleased() {
                    System.out.println("|| lock released");
                }
            });
            distributedLock.getLock();
        }

    }
}

執行五個進程,結果:分佈式

進程1

進程2

進程3

進程4

進程5

執行結果分析:

5個進程都沒有在同一時間得到鎖,從進程1到進程5都是按照順序執行,且間隔時間都是固定10s,對於每一個進程自己,都是間隔50s執行。

完整工程代碼:https://github.com/zengtaotao3390/zookeeper.git

裏面還有RMI+zookeeper 簡單的分佈式應用實現和apache提供的分佈式鎖實現。

相關文章
相關標籤/搜索