在分佈式系統中,每每須要一些分佈式同步原語來作一些協同工做,上一篇文章介紹了Zookeeper的基本原理,本文介紹下基於Zookeeper的Lock和Queue的實現,主要代碼都來自Zookeeper的官方recipe。html
徹底分佈式鎖是全局同步的,這意味着在任什麼時候刻沒有兩個客戶端會同時認爲它們都擁有相同的鎖,使用 Zookeeper 能夠實現分佈式鎖,須要首先定義一個鎖節點(lock root node)。java
須要得到鎖的客戶端按照如下步驟來獲取鎖:node
第一次須要建立本客戶端要獲取lock的節點,調用 create( ),並設置 節點爲EPHEMERAL_SEQUENTIAL類型,表示該節點爲臨時的和順序的。若是獲取鎖的節點掛掉,則該節點自動失效,可讓其餘節點獲取鎖。
express
在父鎖節點(lock root node)上調用 getChildren( ) ,不須要設置監視標誌。 (爲了不「羊羣效應」).apache
按照Fair競爭的原則,將步驟3中的子節點(要獲取鎖的節點)按照節點順序的大小作排序,取出編號最小的一個節點作爲lock的owner,判斷本身的節點id是否就爲owner id,若是是則返回,lock成功。若是不是則調用 exists( )監聽比本身小的前一位的id,關注它鎖釋放的操做(也就是exist watch)。session
若是第4步監聽exist的watch被觸發,則繼續按4中的原則判斷本身是否能獲取到lock。數據結構
釋放鎖:須要釋放鎖的客戶端只須要刪除在第2步中建立的節點便可。app
注意事項:less
一個節點的刪除只會致使一個客戶端被喚醒,由於每一個節點只被一個客戶端watch,這避免了「羊羣效應」。分佈式
一個分佈式lock的實現:
package org.apache.zookeeper.recipes.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; /** * A <a href="package.html">protocol to implement an exclusive * write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to * start the process of grabbing the lock; you may get the lock then or it may be * some time later. <p/> You can register a listener so that you are invoked * when you get the lock; otherwise you can ask if you have the lock * by calling {@link #isOwner()} * */ public class WriteLock extends ProtocolSupport { private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class); private final String dir; private String id; private ZNodeName idName; private String ownerId; private String lastChildId; private byte[] data = {0x12, 0x34}; private LockListener callback; private LockZooKeeperOperation zop; /** * zookeeper contructor for writelock * @param zookeeper zookeeper client instance * @param dir the parent path you want to use for locking * @param acls the acls that you want to use for all the paths, * if null world read/write is used. */ public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) { super(zookeeper); this.dir = dir; if (acl != null) { setAcl(acl); } this.zop = new LockZooKeeperOperation(); } /** * zookeeper contructor for writelock with callback * @param zookeeper the zookeeper client instance * @param dir the parent path you want to use for locking * @param acl the acls that you want to use for all the paths * @param callback the call back instance */ public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, LockListener callback) { this(zookeeper, dir, acl); this.callback = callback; } /** * return the current locklistener * @return the locklistener */ public LockListener getLockListener() { return this.callback; } /** * register a different call back listener * @param callback the call back instance */ public void setLockListener(LockListener callback) { this.callback = callback; } /** * Removes the lock or associated znode if * you no longer require the lock. this also * removes your request in the queue for locking * in case you do not already hold the lock. * @throws RuntimeException throws a runtime exception * if it cannot connect to zookeeper. */ public synchronized void unlock() throws RuntimeException { if (!isClosed() && id != null) { // we don't need to retry this operation in the case of failure // as ZK will remove ephemeral files and we don't wanna hang // this process when closing if we cannot reconnect to ZK try { ZooKeeperOperation zopdel = new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { zookeeper.delete(id, -1); return Boolean.TRUE; } }; zopdel.execute(); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); } finally { if (callback != null) { callback.lockReleased(); } id = null; } } } /** * the watcher called on * getting watch while watching * my predecessor */ private class LockWatcher implements Watcher { public void process(WatchedEvent event) { // lets either become the leader or watch the new/updated node LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); try { lock(); } catch (Exception e) { LOG.warn("Failed to acquire lock: " + e, e); } } } /** * a zoookeeper operation that is mainly responsible * for all the magic required for locking. */ private class LockZooKeeperOperation implements ZooKeeperOperation { /** find if we have been created earler if not create our node * * @param prefix the prefix node * @param zookeeper teh zookeeper client * @param dir the dir paretn * @throws KeeperException * @throws InterruptedException */ private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { List<String> names = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = dir + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } } /** * the command that is run and retried for actually * obtaining the lock * @return if the command was successful or not */ public boolean execute() throws KeeperException, InterruptedException { do { if (id == null) { long sessionId = zookeeper.getSessionId(); String prefix = "x-" + sessionId + "-"; // lets try look up the current ID if we failed // in the middle of creating the znode findPrefixInChildren(prefix, zookeeper, dir); idName = new ZNodeName(id); } if (id != null) { List<String> names = zookeeper.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); // lets force the recreation of the id id = null; } else { // lets sort them explicitly (though they do seem to come back in order ususally :) SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); for (String name : names) { sortedNames.add(new ZNodeName(dir + "/" + name)); } ownerId = sortedNames.first().getName(); SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { ZNodeName lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { if (isOwner()) { if (callback != null) { callback.lockAcquired(); } return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } }; /** * Attempts to acquire the exclusive write lock returning whether or not it was * acquired. Note that the exclusive lock may be acquired some time later after * this method has been invoked due to the current lock owner going away. */ public synchronized boolean lock() throws KeeperException, InterruptedException { if (isClosed()) { return false; } ensurePathExists(dir); return (Boolean) retryOperation(zop); } /** * return the parent dir for lock * @return the parent dir used for locks. */ public String getDir() { return dir; } /** * Returns true if this node is the owner of the * lock (or the leader) */ public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } /** * return the id for this lock * @return the id for this lock */ public String getId() { return this.id; } }
注意這裏的lock,可能會失敗,會嘗試屢次,每次失敗後會Sleep一段時間。
PS:官方的代碼有個小bug,id和ownerId應該都是全路徑,即id = dir + "/" + name;原代碼在findPrefixInChildren裏有問題。
用於輔助節點大小順序排序的類:
package org.apache.zookeeper.recipes.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Represents an ephemeral znode name which has an ordered sequence number and * can be sorted in order * */ class ZNodeName implements Comparable<ZNodeName> { private final String name; private String prefix; private int sequence = -1; private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class); public ZNodeName(String name) { if (name == null) { throw new NullPointerException("id cannot be null"); } this.name = name; this.prefix = name; int idx = name.lastIndexOf('-'); if (idx >= 0) { this.prefix = name.substring(0, idx); try { this.sequence = Integer.parseInt(name.substring(idx + 1)); // If an exception occurred we misdetected a sequence suffix, // so return -1. } catch (NumberFormatException e) { LOG.info("Number format exception for " + idx, e); } catch (ArrayIndexOutOfBoundsException e) { LOG.info("Array out of bounds for " + idx, e); } } } @Override public String toString() { return name.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ZNodeName sequence = (ZNodeName) o; if (!name.equals(sequence.name)) return false; return true; } @Override public int hashCode() { return name.hashCode() + 37; } public int compareTo(ZNodeName that) { int s1 = this.sequence; int s2 = that.sequence; if (s1 == -1 && s2 == -1) { return this.name.compareTo(that.name); } if (s1 == -1) { return -1; } else if (s2 == -1) { return 1; } else { return s1 - s2; } } /** * Returns the name of the znode */ public String getName() { return name; } /** * Returns the sequence number */ public int getZNodeName() { return sequence; } /** * Returns the text prefix before the sequence number */ public String getPrefix() { return prefix; } }
PS:這個ZNodeName類是被我修改過的,官方的代碼比較有問題,官方的先用了節點路徑的前綴prefix比較,再去比較sequence序號是不對的,這樣會致使sessionid小的老是能拿到鎖。應該直接比較全局有序的sequence序號,小的先拿到鎖,先到先得。
Zookeeper統一操做ZooKeeperOperation接口:
public interface ZooKeeperOperation { /** * Performs the operation - which may be involved multiple times if the connection * to ZooKeeper closes during this operation * * @return the result of the operation or null * @throws KeeperException * @throws InterruptedException */ public boolean execute() throws KeeperException, InterruptedException; }
由於Zookeeper的操做會失敗,這個類封裝了屢次嘗試:
/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.recipes.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.recipes.lock.ZooKeeperOperation; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** * A base class for protocol implementations which provides a number of higher * level helper methods for working with ZooKeeper along with retrying synchronous * operations if the connection to ZooKeeper closes such as * {@link #retryOperation(ZooKeeperOperation)} * */ class ProtocolSupport { private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class); protected final ZooKeeper zookeeper; private AtomicBoolean closed = new AtomicBoolean(false); private long retryDelay = 500L; private int retryCount = 10; private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; public ProtocolSupport(ZooKeeper zookeeper) { this.zookeeper = zookeeper; } /** * Closes this strategy and releases any ZooKeeper resources; but keeps the * ZooKeeper instance open */ public void close() { if (closed.compareAndSet(false, true)) { doClose(); } } /** * return zookeeper client instance * @return zookeeper client instance */ public ZooKeeper getZookeeper() { return zookeeper; } /** * return the acl its using * @return the acl. */ public List<ACL> getAcl() { return acl; } /** * set the acl * @param acl the acl to set to */ public void setAcl(List<ACL> acl) { this.acl = acl; } /** * get the retry delay in milliseconds * @return the retry delay */ public long getRetryDelay() { return retryDelay; } /** * Sets the time waited between retry delays * @param retryDelay the retry delay */ public void setRetryDelay(long retryDelay) { this.retryDelay = retryDelay; } /** * Allow derived classes to perform * some custom closing operations to release resources */ protected void doClose() { } /** * Perform the given operation, retrying if the connection fails * @return object. it needs to be cast to the callee's expected * return type. */ protected Object retryOperation(ZooKeeperOperation operation) throws KeeperException, InterruptedException { KeeperException exception = null; for (int i = 0; i < retryCount; i++) { try { return operation.execute(); } catch (KeeperException.SessionExpiredException e) { LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e); throw e; } catch (KeeperException.ConnectionLossException e) { if (exception == null) { exception = e; } LOG.debug("Attempt " + i + " failed with connection loss so " + "attempting to reconnect: " + e, e); retryDelay(i); } } throw exception; } /** * Ensures that the given path exists with no data, the current * ACL and no flags * @param path */ protected void ensurePathExists(String path) { ensureExists(path, null, acl, CreateMode.PERSISTENT); } /** * Ensures that the given path exists with the given data, ACL and flags * @param path * @param acl * @param flags */ protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, final CreateMode flags) { try { retryOperation(new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { Stat stat = zookeeper.exists(path, false); if (stat != null) { return true; } zookeeper.create(path, data, acl, flags); return true; } }); } catch (KeeperException e) { LOG.warn("Caught: " + e, e); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); } } /** * Returns true if this protocol has been closed * @return true if this protocol is closed */ protected boolean isClosed() { return closed.get(); } /** * Performs a retry delay if this is not the first attempt * @param attemptCount the number of the attempts performed so far */ protected void retryDelay(int attemptCount) { if (attemptCount > 0) { try { Thread.sleep(attemptCount * retryDelay); } catch (InterruptedException e) { LOG.debug("Failed to sleep: " + e, e); } } } }
這個類是本客戶端獲取到lock和釋放lock的時候觸發操做的接口:
public interface LockListener { /** * call back called when the lock * is acquired */ public void lockAcquired(); /** * call back called when the lock is * released. */ public void lockReleased(); }
分佈式隊列是通用的數據結構,爲了在 Zookeeper 中實現分佈式隊列,首先須要指定一個 Znode 節點做爲隊列節點(queue node), 各個分佈式客戶端經過調用 create() 函數向隊列中放入數據,調用create()時節點路徑名帶"qn-"結尾,並設置順序(sequence)節點標誌。 因爲設置了節點的順序標誌,新的路徑名具備如下字符串模式:"_path-to-queue-node_/qn-X",X 是惟一自增號。須要從隊列中獲取數據/移除數據的客戶端首先調用 getChildren() 函數,有數據則獲取(獲取數據後能夠刪除也能夠不刪),沒有則在隊列節點(queue node)上將 watch 設置爲 true,等待觸發並處理最小序號的節點(即從序號最小的節點中取數據)。
實現步驟基本以下:
前提:須要一個隊列root節點dir
入隊:使用create()建立節點,將共享數據data放在該節點上,節點類型爲PERSISTENT_SEQUENTIAL,永久順序性的(也能夠設置爲臨時的,看需求)。
出隊:由於隊列可能爲空,2種方式處理:一種若是爲空則wait等待,一種返回異常。
等待方式:這裏使用了CountDownLatch的等待和Watcher的通知機制,使用了TreeMap的排序獲取節點順序最小的數據(FIFO)。
拋出異常:getChildren()獲取隊列數據時,若是size==0則拋出異常。
一個分佈式Queue的實現,詳細代碼:
package org.apache.zookeeper.recipes.queue; import java.util.List; import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * * A <a href="package.html">protocol to implement a distributed queue</a>. * */ public class DistributedQueue { private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class); private final String dir; private ZooKeeper zookeeper; private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private final String prefix = "qn-"; public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){ this.dir = dir; if(acl != null){ this.acl = acl; } this.zookeeper = zookeeper; //Add root dir first if not exists if (zookeeper != null) { try { Stat s = zookeeper.exists(dir, false); if (s == null) { zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); } } catch (KeeperException e) { LOG.error(e.toString()); } catch (InterruptedException e) { LOG.error(e.toString()); } } } /** * Returns a Map of the children, ordered by id. * @param watcher optional watcher on getChildren() operation. * @return map from id to child name for all children */ private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>(); List<String> childNames = null; try{ childNames = zookeeper.getChildren(dir, watcher); }catch (KeeperException.NoNodeException e){ throw e; } for(String childName : childNames){ try{ //Check format if(!childName.regionMatches(0, prefix, 0, prefix.length())){ LOG.warn("Found child node with improper name: " + childName); continue; } String suffix = childName.substring(prefix.length()); Long childId = new Long(suffix); orderedChildren.put(childId,childName); }catch(NumberFormatException e){ LOG.warn("Found child node with improper format : " + childName + " " + e,e); } } return orderedChildren; } /** * Find the smallest child node. * @return The name of the smallest child node. */ private String smallestChildName() throws KeeperException, InterruptedException { long minId = Long.MAX_VALUE; String minName = ""; List<String> childNames = null; try{ childNames = zookeeper.getChildren(dir, false); }catch(KeeperException.NoNodeException e){ LOG.warn("Caught: " +e,e); return null; } for(String childName : childNames){ try{ //Check format if(!childName.regionMatches(0, prefix, 0, prefix.length())){ LOG.warn("Found child node with improper name: " + childName); continue; } String suffix = childName.substring(prefix.length()); long childId = Long.parseLong(suffix); if(childId < minId){ minId = childId; minName = childName; } }catch(NumberFormatException e){ LOG.warn("Found child node with improper format : " + childName + " " + e,e); } } if(minId < Long.MAX_VALUE){ return minName; }else{ return null; } } /** * Return the head of the queue without modifying the queue. * @return the data at the head of the queue. * @throws NoSuchElementException * @throws KeeperException * @throws InterruptedException */ public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; // element, take, and remove follow the same pattern. // We want to return the child node with the smallest sequence number. // Since other clients are remove()ing and take()ing nodes concurrently, // the child with the smallest sequence number in orderedChildren might be gone by the time we check. // We don't call getChildren again until we have tried the rest of the nodes in sequence order. while(true){ try{ orderedChildren = orderedChildren(null); }catch(KeeperException.NoNodeException e){ throw new NoSuchElementException(); } if(orderedChildren.size() == 0 ) throw new NoSuchElementException(); for(String headNode : orderedChildren.values()){ if(headNode != null){ try{ return zookeeper.getData(dir+"/"+headNode, false, null); }catch(KeeperException.NoNodeException e){ //Another client removed the node first, try next } } } } } /** * Attempts to remove the head of the queue and return it. * @return The former head of the queue * @throws NoSuchElementException * @throws KeeperException * @throws InterruptedException */ public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; // Same as for element. Should refactor this. while(true){ try{ orderedChildren = orderedChildren(null); }catch(KeeperException.NoNodeException e){ throw new NoSuchElementException(); } if(orderedChildren.size() == 0) throw new NoSuchElementException(); for(String headNode : orderedChildren.values()){ String path = dir +"/"+headNode; try{ byte[] data = zookeeper.getData(path, false, null); zookeeper.delete(path, -1); return data; }catch(KeeperException.NoNodeException e){ // Another client deleted the node first. } } } } private class LatchChildWatcher implements Watcher { CountDownLatch latch; public LatchChildWatcher(){ latch = new CountDownLatch(1); } public void process(WatchedEvent event){ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); latch.countDown(); } public void await() throws InterruptedException { latch.await(); } } /** * Removes the head of the queue and returns it, blocks until it succeeds. * @return The former head of the queue * @throws NoSuchElementException * @throws KeeperException * @throws InterruptedException */ public byte[] take() throws KeeperException, InterruptedException { TreeMap<Long,String> orderedChildren; // Same as for element. Should refactor this. while(true){ LatchChildWatcher childWatcher = new LatchChildWatcher(); try{ orderedChildren = orderedChildren(childWatcher); }catch(KeeperException.NoNodeException e){ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); continue; } if(orderedChildren.size() == 0){ childWatcher.await(); continue; } for(String headNode : orderedChildren.values()){ String path = dir +"/"+headNode; try{ byte[] data = zookeeper.getData(path, false, null); zookeeper.delete(path, -1); return data; }catch(KeeperException.NoNodeException e){ // Another client deleted the node first. } } } } /** * Inserts data into queue. * @param data * @return true if data was successfully added */ public boolean offer(byte[] data) throws KeeperException, InterruptedException{ for(;;){ try{ zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); return true; }catch(KeeperException.NoNodeException e){ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT); } } } /** * Returns the data at the first element of the queue, or null if the queue is empty. * @return data at the first element of the queue, or null. * @throws KeeperException * @throws InterruptedException */ public byte[] peek() throws KeeperException, InterruptedException{ try{ return element(); }catch(NoSuchElementException e){ return null; } } /** * Attempts to remove the head of the queue and return it. Returns null if the queue is empty. * @return Head of the queue or null. * @throws KeeperException * @throws InterruptedException */ public byte[] poll() throws KeeperException, InterruptedException { try{ return remove(); }catch(NoSuchElementException e){ return null; } } }
Apache Curator
Curator是一個封裝Zookeeper操做的庫,使用這個庫的好處是Curator幫你管理和Zookeeper的鏈接,當鏈接有問題時會自動重試(retry)。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
Curator已經封裝了一些經常使用的Recipes
InterProcessMutex lock = new InterProcessMutex(client, lockPath); if ( lock.acquire(maxWait, waitUnit) ) { try { // do some work inside of the critical section here } finally { lock.release(); } }
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework client) throws Exception { // this callback will get called when you are the leader // do whatever leader work you need to and only exit // this method when you want to relinquish leadership } } LeaderSelector selector = new LeaderSelector(client, path, listener); selector.autoRequeue(); // not required, but this is behavior that you will probably expect selector.start();
參考:
http://zookeeper.apache.org/doc/trunk/recipes.html
http://curator.apache.org/curator-recipes/index.html