前段時間遇到一個問題,每一個應用啓了一個定時任務,可是要求每一個定時任務不能同時操做同一條數據庫記錄,當時想到的是對錶加鎖。後面瞭解到能夠經過zookeeper實現分佈式鎖,查了一些資料,本身總結了一下,寫了一個分佈式鎖。java
實現步驟:node
1. ZooKeeper 調用 create (「lock/sub_lock」)方法來建立一個路徑格式爲「 lock/sub_lock」的節點,此節點類型爲EPHEMERAL_SEQUENTIAL 。順序自動編號的節點,這種節點會根據當前已近存在的節點數自動加 1。且建立的節點爲臨時節點,即客戶端與服務器端 session 超時,或者鏈接主動斷開時,該節點會自動刪除,建立的節點格式爲「lock/sub_lock0000000001」。git
2. 在建立的鎖節點上調用 getChildren(「lock/sub_lock」)方法,來獲取鎖目錄下的最小編號節點,而且不設置 watch 。github
3. 步驟 2 中獲取的節點剛好是步驟 1 中客戶端建立的節點,那麼此客戶端得到此種類型的鎖,繼續進行後續的操做,當操做完成後,刪除當前節點。數據庫
4. 客戶端在鎖目錄上調用 exists ()方法,而且設置 watch 來監聽鎖目錄下比本身小一個的連續臨時節點是否被刪除。apache
5. 若是監聽節點已經被刪除,重複步驟2。(不能直接得到鎖,前一個節點被刪除,多是因爲客戶端斷開鏈接,而不是得到鎖以後主動刪除的)。服務器
實現代碼:session
package com.peach.zk.peachLock; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.peach.zk.apacheLock.LockListener; import com.peach.zk.constant.Constant; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Created by zengtao on 2016/8/19. */ public class DistributedLock { private ZooKeeper zk; private static Logger logger = LoggerFactory.getLogger(DistributedLock.class); private String lockPath = null; private LockListener callBack; private CountDownLatch countDownLatch = new CountDownLatch(1); public void init(ZooKeeper zk, LockListener lockListener) { this.zk = zk; this.callBack = lockListener; } public void getLock() { createLock(); lock(); try { countDownLatch.await(); } catch (InterruptedException e) { logger.error("", e); } } public void unLock() { try { Stat stat = zk.exists(lockPath, false); if (stat != null) { zk.delete(lockPath, -1); callBack.lockReleased(); } } catch (KeeperException | InterruptedException e) { logger.error("", e); } } private synchronized void lock() { logger.info("|| in lock"); List<String> sortedChildren = getSortedChildren(); try { int index = sortedChildren.indexOf(lockPath); switch (index) { case 0: logger.info("||so happy, I get my lock"); callBack.lockAcquired(); countDownLatch.countDown(); break; default: String preChildren = sortedChildren.get(index - 1); Stat stat = zk.exists(preChildren, new NodeChangeWatcher()); if (stat == null) { logger.warn("node {} has bean deleted", preChildren); // Thread.sleep(3000); lock(); } } } catch (KeeperException | InterruptedException e) { logger.error("", e); } logger.info("|| out lock"); } private void createLock() { try { Stat stat = zk.exists(Constant.ZK_LOCK_PATH, false); if (stat == null) { zk.create(Constant.ZK_LOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } lockPath = zk.create(Constant.ZK_SUBLOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException | InterruptedException e) { logger.error("|| create znode error", e); } } private List<String> getSortedChildren() { //獲取按照字母排序的鏈表 Ordering<String> order = Ordering.natural(); List<String> fullpathChildrenList = Lists.newArrayList(); List<String> sortedChildren = Lists.newArrayList(); try { List<String> childrenList = zk.getChildren(Constant.ZK_LOCK_PATH, false); //須要獲得children node 的全路徑,後面能夠匹配該node for (String children : childrenList) { fullpathChildrenList.add(Constant.ZK_LOCK_PATH + "/" + children); } sortedChildren = order.sortedCopy(fullpathChildrenList); } catch (KeeperException | InterruptedException e) { logger.error("||", e); } return sortedChildren; } private class NodeChangeWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted) { logger.info("|| pre node changed"); lock(); } } } }
package com.peach.zk.constant; /** * Created by zengtao on 2016/8/16. */ public class Constant { public static final String ZK_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183"; public static final int ZK_SESSION_TIMEOUT = 50000; public static final String ZK_REGISTRY_PATH = "/registry"; public static final String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider"; public static final String ZK_LOCK_PATH = "/lock"; public static final String ZK_SUBLOCK_PATH = ZK_LOCK_PATH + "/sub_lock"; }
package com.peach.zk.apacheLock; public interface LockListener { public void lockAcquired() throws InterruptedException; public void lockReleased(); }
package com.peach.zk.util; import com.peach.zk.constant.Constant; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; /** * Created by zengtao on 2016/8/16. */ public class ZkUtil { private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class); private static final CountDownLatch latch = new CountDownLatch(1); public static synchronized ZooKeeper connectServer( ) { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (Exception e) { logger.error("", e); } return zk; } }
驅動程序:app
package com.peach.zk.peachLock; import com.peach.zk.apacheLock.LockListener; import com.peach.zk.util.ZkUtil; import org.apache.zookeeper.ZooKeeper; /** * Created by zengtao on 2016/8/19. */ public class Driver { public static void main(String[] args) throws InterruptedException { ZooKeeper zk = ZkUtil.connectServer(); while (true) { System.out.println("while"); final DistributedLock distributedLock = new DistributedLock(); distributedLock.init(zk, new LockListener() { @Override public void lockAcquired() throws InterruptedException { System.out.println("|| get lock, do something"); Thread.sleep(10000); distributedLock.unLock(); } @Override public void lockReleased() { System.out.println("|| lock released"); } }); distributedLock.getLock(); } } }
執行五個進程,結果:分佈式
進程1
進程2
進程3
進程4
進程5
執行結果分析:
5個進程都沒有在同一時間得到鎖,從進程1到進程5都是按照順序執行,且間隔時間都是固定10s,對於每一個進程自己,都是間隔50s執行。
完整工程代碼:https://github.com/zengtaotao3390/zookeeper.git
裏面還有RMI+zookeeper 簡單的分佈式應用實現和apache提供的分佈式鎖實現。