public class MasterSelect { CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "/master"; public void masterSelect() { LeaderSelector leaderSelector = new LeaderSelector(client, path, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("Get master"); TimeUnit.SECONDS.sleep(3); System.out.println("Release master"); } }); // 自動加入隊列 leaderSelector.autoRequeue(); leaderSelector.start(); try { TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new MasterSelect().masterSelect(); } }
public void start() { //cas操做,狀態不是從LATENT到STARTED的,拋異常 Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); // 啓動後,不能中止線程 Preconditions.checkState(!executorService.isShutdown(), "Already started"); // 判斷當前是否Leader節點 Preconditions.checkState(!hasLeadership, "Already has leadership"); // 增長監聽 client.getConnectionStateListenable().addListener(listener); // 入隊列 requeue(); } public boolean requeue() { // 當前狀態是不是STARTED Preconditions.checkState(state.get() == State.STARTED, "close() has already been called"); // 入隊列 return internalRequeue(); } private synchronized boolean internalRequeue() { // 若是沒有在隊列,而且是STARTED狀態 if ( !isQueued && (state.get() == State.STARTED) ) { // 設置已經在隊列 isQueued = true; Future<Void> task = executorService.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { doWorkLoop(); } finally { // 退出隊列 clearIsQueued(); // 自動入隊列 if ( autoRequeue.get() ) { internalRequeue(); } } return null; } }); ourTask.set(task); return true; } return false; }
private void doWorkLoop() throws Exception { KeeperException exception = null; try { doWork(); } catch ( KeeperException.ConnectionLossException e ) { exception = e; } catch ( KeeperException.SessionExpiredException e ) { exception = e; } catch ( InterruptedException ignore ) { // 線程中斷,則忽略 Thread.currentThread().interrupt(); } if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying { throw exception; } } void doWork() throws Exception { // 不是Leader節點 hasLeadership = false; try { // 獲取到鎖 mutex.acquire(); // 是leader節點 hasLeadership = true; try { if ( debugLeadershipLatch != null ) { debugLeadershipLatch.countDown(); } if ( debugLeadershipWaitLatch != null ) { debugLeadershipWaitLatch.await(); } // 調用takeLeadership方法 listener.takeLeadership(client); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); throw e; } catch ( Throwable e ) { ThreadUtils.checkInterrupted(e); } finally { // 退出隊列 clearIsQueued(); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); throw e; } finally { // 若是搶到leader if ( hasLeadership ) { // 設置爲非leader hasLeadership = false; // 是否中斷 boolean wasInterrupted = Thread.interrupted(); // clear any interrupted tatus so that mutex.release() works immediately try { // 釋放資源 mutex.release(); } catch ( Exception e ) { if ( failedMutexReleaseCount != null ) { failedMutexReleaseCount.incrementAndGet(); } ThreadUtils.checkInterrupted(e); log.error("The leader threw an exception", e); // ignore errors - this is just a safety } finally { if ( wasInterrupted ) { Thread.currentThread().interrupt(); } } } } }
public void acquire() throws Exception { if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } } private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } // 獲取節點 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { // 放入緩存 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { // 獲取臨時節點,建立失敗,在catch中會經過重試機制建立 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; } private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { // 獲取排序後的子節點 List<String> children = getSortedChildren(); // 獲取當前的節點 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash // 獲取到predicateResults,predicateResults有兩個值,是否獲取鎖,監聽節點 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); // 獲取到鎖 if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { // 沒有獲取到鎖,則監聽 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak // 監聽,等到喚醒,watcher中會調用notifyAll client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 獲取節點的位置 int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); // maxLeases爲1,說明若是不是第一個位置,則沒獲取到鎖 boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
public void release() throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } final void releaseLock(String lockPath) throws Exception { // 移除監聽 client.removeWatchers(); revocable.set(null); // 刪除節點 deleteOurPath(lockPath); }