本文主要研究一下curator recipes的LeaderLatchhtml
@Test public void testCuratorLeaderLatch() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); String leaderLockPath = "/leader-lock2"; List<LeaderLatch> latchList = IntStream.rangeClosed(1,10) .parallel() .mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i)) .collect(Collectors.toList()); latchList.parallelStream() .forEach(latch -> { try { latch.start(); } catch (Exception e) { e.printStackTrace(); } }); TimeUnit.SECONDS.sleep(5); Iterator<LeaderLatch> iterator = latchList.iterator(); while (iterator.hasNext()){ LeaderLatch latch = iterator.next(); if(latch.hasLeadership()){ System.out.println(latch.getId() + " hasLeadership"); try { latch.close(); } catch (IOException e) { e.printStackTrace(); } iterator.remove(); } } TimeUnit.SECONDS.sleep(5); latchList.stream() .filter(latch -> latch.hasLeadership()) .forEach(latch -> System.out.println(latch.getId() + " hasLeadership")); Participant participant = latchList.get(0).getLeader(); System.out.println(participant); TimeUnit.MINUTES.sleep(15); latchList.stream() .forEach(latch -> { try { latch.close(); } catch (IOException e) { e.printStackTrace(); } }); client.close(); }
[zk: localhost:2181(CONNECTED) 17] ls / [leader-lock1, leader-lock2, zookeeper, leader-lock] [zk: localhost:2181(CONNECTED) 18] ls /leader-lock2 [_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javajava
/** * Add this instance to the leadership election and attempt to acquire leadership. * * @throws Exception errors */ public void start() throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); startTask.set(AfterConnectionEstablished.execute(client, new Runnable() { @Override public void run() { try { internalStart(); } finally { startTask.set(null); } } })); } private synchronized void internalStart() { if ( state.get() == State.STARTED ) { client.getConnectionStateListenable().addListener(listener); try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("An error occurred checking resetting leadership.", e); } } } @VisibleForTesting void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); }
curator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.javanode
@VisibleForTesting static final String PROTECTED_PREFIX = "_c_"; @Override public String forPath(final String givenPath, byte[] data) throws Exception { if ( compress ) { data = client.getCompressionProvider().compress(givenPath, data); } final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential())); List<ACL> aclList = acling.getAclList(adjustedPath); client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList); String returnPath = null; if ( backgrounding.inBackground() ) { pathInBackground(adjustedPath, data, givenPath); } else { String path = protectedPathInForeground(adjustedPath, data, aclList); returnPath = client.unfixForNamespace(path); } return returnPath; } @VisibleForTesting String adjustPath(String path) throws Exception { if ( doProtected ) { ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); String name = getProtectedPrefix(protectedId) + pathAndNode.getNode(); path = ZKPaths.makePath(pathAndNode.getPath(), name); } return path; } private static String getProtectedPrefix(String protectedId) { return PROTECTED_PREFIX + protectedId + "-"; }
UUID
)還有-,好比原來是latch-,處理以後變爲_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javaapache
private void getChildren() throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { checkLeadership(event.getChildren()); } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } private void checkLeadership(List<String> children) throws Exception { final String localOurPath = ourPath.get(); List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if ( ourIndex < 0 ) { log.error("Can't find our node. Resetting. Index: " + ourIndex); reset(); } else if ( ourIndex == 0 ) { setLeadership(true); } else { String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ) { try { getChildren(); } catch ( Exception ex ) { ThreadUtils.checkInterrupted(ex); log.error("An error occurred checking the leadership.", ex); } } } }; BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { // previous node is gone - reset reset(); } } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } }
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javaapp
/** * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @throws IOException errors */ @Override public void close() throws IOException { close(closeMode); } /** * Remove this instance from the leadership election. If this instance is the leader, leadership * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch * instances must eventually be closed. * * @param closeMode allows the default close mode to be overridden at the time the latch is closed. * @throws IOException errors */ public synchronized void close(CloseMode closeMode) throws IOException { Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started"); Preconditions.checkNotNull(closeMode, "closeMode cannot be null"); cancelStartTask(); try { setNode(null); client.removeWatchers(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); throw new IOException(e); } finally { client.getConnectionStateListenable().removeListener(listener); switch ( closeMode ) { case NOTIFY_LEADER: { setLeadership(false); listeners.clear(); break; } default: { listeners.clear(); setLeadership(false); break; } } } } private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); if ( oldValue && !newValue ) { // Lost leadership, was true, now false listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener listener) { listener.notLeader(); return null; } }); } else if ( !oldValue && newValue ) { // Gained leadership, was false, now true listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener input) { input.isLeader(); return null; } }); } notifyAll(); }
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java框架
private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; private void handleStateChange(ConnectionState newState) { switch ( newState ) { default: { // NOP break; } case RECONNECTED: { try { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() ) { reset(); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e); setLeadership(false); } break; } case SUSPENDED: { if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) ) { setLeadership(false); } break; } case LOST: { setLeadership(false); break; } } }
_c_
)以及protectedId(UUID
),於是最後的節點名的格式爲PROTECTED_PREFIX+UUID+LOCK_NAME+編號,相似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045