本文主要研究一下artemis的QuorumVotejava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVote.javanode
public abstract class QuorumVote<V extends Vote, T> { private SimpleString name; public QuorumVote(SimpleString name) { this.name = name; } /** * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when one of the nodes in the quorum is * successfully connected to. The QuorumVote can then decide whether or not a decision can be made with just that information. * * @return the vote to use */ public abstract Vote connected(); /** * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} fails to connect to a node in the quorum. * The QuorumVote can then decide whether or not a decision can be made with just that information however the node * cannot cannot be asked. * * @return the vote to use */ public abstract Vote notConnected(); /** * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when a vote can be made, either from the * cluster or decided by itself. * * @param vote the vote to make. */ public abstract void vote(V vote); /** * get the decion of the vote * * @return the voting decision */ public abstract T getDecision(); /** * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when all the votes have been cast and received. * * @param voteTopology the topology of where the votes were sent. */ public abstract void allVotesCast(Topology voteTopology); /** * the name of this quorum vote, used for identifying the correct {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler} * * @return the name of the wuorum vote */ public SimpleString getName() { return name; } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.javagit
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> { public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote"); private final CountDownLatch latch; private final String targetNodeId; private final String liveConnector; private int votesNeeded; private int total = 0; private boolean decision = false; // Is this the live requesting to stay live, or a backup requesting to become live. private boolean requestToStayLive = false; /** * live nodes | remaining nodes | majority | votes needed * 1 | 0 | 0 | 0 * 2 | 1 | 1 | 1 * n | r = n-1 | n/2 + 1 | n/2 + 1 rounded * 3 | 2 | 2.5 | 2 * 4 | 3 | 3 | 3 * 5 | 4 | 3.5 | 3 * 6 | 5 | 4 | 4 */ public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive, String liveConnector) { super(LIVE_FAILOVER_VOTE); this.targetNodeId = targetNodeId; this.liveConnector = liveConnector; double majority; if (size <= 2) { majority = ((double) size) / 2; } else { //even majority = ((double) size) / 2 + 1; } //votes needed could be say 2.5 so we add 1 in this case votesNeeded = (int) majority; latch = new CountDownLatch(votesNeeded); if (votesNeeded == 0) { decision = true; } this.requestToStayLive = requestToStayLive; } public QuorumVoteServerConnect(int size, String targetNodeId) { this(size, targetNodeId, false, null); } /** * if we can connect to a node * * @return */ @Override public Vote connected() { return new ServerConnectVote(targetNodeId, requestToStayLive, null); } /** * if we cant connect to the node * * @return */ @Override public Vote notConnected() { return new BooleanVote(false); } /** * live nodes | remaining nodes | majority | votes needed * 1 | 0 | 0 | 0 * 2 | 1 | 1 | 1 * n | r = n-1 | n/2 + 1 | n/2 + 1 rounded * 3 | 2 | 2.5 | 2 * 4 | 3 | 3 | 3 * 5 | 4 | 3.5 | 3 * 6 | 5 | 4 | 4 * * @param vote the vote to make. */ @Override public synchronized void vote(ServerConnectVote vote) { if (decision) return; if (!requestToStayLive && vote.getVote()) { total++; latch.countDown(); if (total >= votesNeeded) { decision = true; }//do the opposite, if it says there is a node connected it means the backup has come live } else if (requestToStayLive && vote.getVote()) { total++; latch.countDown(); if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) { ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector); return; } if (total >= votesNeeded) { decision = true; } } } @Override public void allVotesCast(Topology voteTopology) { while (latch.getCount() > 0) { latch.countDown(); } } @Override public Boolean getDecision() { return decision; } public void await(int latchTimeout, TimeUnit unit) throws InterruptedException { ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase()); if (latch.await(latchTimeout, unit)) ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes(); else ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses(); } public boolean isRequestToStayLive() { return requestToStayLive; } }
QuorumVote是個抽象類,定義了connected、notConnected、vote、getDecision、allVotesCast抽象方法;QuorumVoteServerConnect繼承了QuorumVote,其構造器根據size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法對於ServerConnectVote的vote爲true的遞增total,同時latch.countDown(),對於total大於等於votesNeeded的更新decision爲true;其allVotesCast方法則循環latch.countDown()github