Paxos算法是大名鼎鼎的Zookeeper中採用的選舉Leader的算法,事實上,在涉及到分佈式系統的一致性的時候,就只有一種算法,那就是Paxos.java
首先來看,Paxos是爲了解決什麼問題:算法
Paxos 算法解決的問題是一個分佈式系統如何就某個值(決議)達成一致。一個典型的場景是,在一個分佈式數據庫存儲中,若是各節點的初始狀態一致,每一個節點執行相同的操做序列,那麼他們最後能獲得一個一致的狀態。爲保證每一個節點執行相同的命令序列,須要在每一條指令上執行一個「一致性算法」以保證每一個節點看到的指令一致。一個通用的一致性算法能夠應用在許多場景中,是分佈式計算中的重要問題。所以從20世紀80年代起對於一致性算法的研究就沒有中止過。節點通訊存在兩種模型:共享內存(Shared memory)和消息傳遞(Messages passing)。Paxos 算法就是一種基於消息傳遞模型的一致性算法。數據庫
本文的實現參考了LynnCui在知乎上的文章:https://zhuanlan.zhihu.com/p/21438357promise
有關Paxos的詳細介紹及推理過程都可參考上述文章,若是還有不明白,能夠查閱相關文章,這一類的文章網上仍是比較多的。服務器
關於個人實現,我這裏有幾點須要說明:網絡
1.關於maxVote(即比本次表決編號小的最大表決編號),若是該編號存在,則應該用改編號對應表決的提案做爲下一次的提案,不然,則能夠隨機指定提案,具體的提案的指定方式依賴於需求邏輯,本程序中採用隨機提案;app
2.Paxos假設消息的傳遞過程是不可靠的,這主要是由於實際環境中,網絡通訊是沒有辦法保障的。網絡延遲以及服務器自己也有可能宕機,這些條件其實都在Paxos的前提假設之中,本程序當中作了一個假設,假設網絡通訊有50%的機率失敗,其實是我隨便指定的;dom
3.Java自己是一門繁瑣的語言,因此不免會有很大冗餘代碼,這主要是爲了保證程序的穩定性,還有一部分是出於習慣。Paxos其實並非一個複雜的算法,至少基本的不是。程序中所採用的的面向對象的設計也不必定是合理的;分佈式
4.本程序依賴了Google的guava包以及common-lang3的包,實際中徹底能夠去掉。guava主要是在重載hashCode()方法的時候用到,但程序中並無使用hashCode的地方,實際只是由於在重載equals()方法的時候老是應該重載hashCode()方法。lang3主要作了字符串比較,應對字符串爲空的狀況。除此以外沒有任何依賴。ide
Java實現參考:
public final class PaxosDemo { private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(); private static final Random RANDOM = new Random(); private static final String[] PROPOSALS = {"ProjectA", "ProjectB", "ProjectC"}; public static void main(String[] args) { List<Acceptor> acceptors = new ArrayList<Acceptor>(); Arrays.asList("A", "B", "C", "D", "E") .forEach(name -> acceptors.add(new Acceptor(name))); Proposer.vote(new Proposal(1L, null), acceptors); } private static void printInfo(String subject, String operation, String result) { System.out.println(subject + ":" + operation + "<" + result + ">"); } /** * 對於提案的約束,第三條約束要求: * 若是maxVote不存在,那麼沒有限制,下一次表決可使用任意提案; * 不然,下一次表決要沿用maxVote的提案 * * @param currentVoteNumber * @param proposals * @return */ private static Proposal nextProposal(long currentVoteNumber, List<Proposal> proposals) { long voteNumber = currentVoteNumber + 1; if (proposals.isEmpty()) return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]); Collections.sort(proposals); Proposal maxVote = proposals.get(proposals.size() - 1); long maxVoteNumber = maxVote.getVoteNumber(); String content = maxVote.getContent(); if (maxVoteNumber >= currentVoteNumber) throw new IllegalStateException("illegal state maxVoteNumber"); if (content != null) return new Proposal(voteNumber, content); else return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]); } private static class Proposer { /** * @param proposal * @param acceptors */ public static void vote(Proposal proposal, Collection<Acceptor> acceptors) { int quorum = Math.floorDiv(acceptors.size(), 2) + 1; int count = 0; while (true) { printInfo("VOTE_ROUND", "START", ++count + ""); List<Proposal> proposals = new ArrayList<Proposal>(); for (Acceptor acceptor : acceptors) { Promise promise = acceptor.onPrepare(proposal); if (promise != null && promise.isAck()) proposals.add(promise.getProposal()); } if (proposals.size() < quorum) { printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT PREPARED"); proposal = nextProposal(proposal.getVoteNumber(), proposals); continue; } int acceptCount = 0; for (Acceptor acceptor : acceptors) { if (acceptor.onAccept(proposal)) acceptCount++; } if (acceptCount < quorum) { printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT ACCEPTED"); proposal = nextProposal(proposal.getVoteNumber(), proposals); continue; } break; } printInfo("PROPOSER[" + proposal + "]", "VOTE", "SUCCESS"); } } private static class Acceptor { //上次表決結果 private Proposal last = new Proposal(); private String name; public Acceptor(String name) { this.name = name; } public Promise onPrepare(Proposal proposal) { //假設這個過程有50%的概率失敗 if (Math.random() - 0.5 > 0) { printInfo("ACCEPTER_" + name, "PREPARE", "NO RESPONSE"); return null; } if (proposal == null) throw new IllegalArgumentException("null proposal"); if (proposal.getVoteNumber() > last.getVoteNumber()) { Promise response = new Promise(true, last); last = proposal; printInfo("ACCEPTER_" + name, "PREPARE", "OK"); return response; } else { printInfo("ACCEPTER_" + name, "PREPARE", "REJECTED"); return new Promise(false, null); } } public boolean onAccept(Proposal proposal) { //假設這個過程有50%的概率失敗 if (Math.random() - 0.5 > 0) { printInfo("ACCEPTER_" + name, "ACCEPT", "NO RESPONSE"); return false; } printInfo("ACCEPTER_" + name, "ACCEPT", "OK"); return last.equals(proposal); } } private static class Promise { private final boolean ack; private final Proposal proposal; public Promise(boolean ack, Proposal proposal) { this.ack = ack; this.proposal = proposal; } public boolean isAck() { return ack; } public Proposal getProposal() { return proposal; } } private static class Proposal implements Comparable<Proposal> { private final long voteNumber; private final String content; public Proposal(long voteNumber, String content) { this.voteNumber = voteNumber; this.content = content; } public Proposal() { this(0, null); } public long getVoteNumber() { return voteNumber; } public String getContent() { return content; } @Override public int compareTo(Proposal o) { return Long.compare(voteNumber, o.voteNumber); } @Override public boolean equals(Object obj) { if (obj == null) return false; if (!(obj instanceof Proposal)) return false; Proposal proposal = (Proposal) obj; return voteNumber == proposal.voteNumber && StringUtils.equals(content, proposal.content); } @Override public int hashCode() { return HASH_FUNCTION .newHasher() .putLong(voteNumber) .putString(content, Charsets.UTF_8) .hash() .asInt(); } @Override public String toString() { return new StringBuilder() .append(voteNumber) .append(':') .append(content) .toString(); } } }
如下是我試着運行了一下程序的結果:
VOTE_ROUND:START<1> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<NO RESPONSE> PROPOSER[1:null]:VOTE<NOT PREPARED> VOTE_ROUND:START<2> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<OK> ACCEPTER_E:ACCEPT<OK> PROPOSER[2:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<3> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[3:ProjectB]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<4> ACCEPTER_A:PREPARE<NO RESPONSE> ACCEPTER_B:PREPARE<NO RESPONSE> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<OK> ACCEPTER_E:ACCEPT<OK> PROPOSER[4:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<5> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<NO RESPONSE> ACCEPTER_D:PREPARE<NO RESPONSE> ACCEPTER_E:PREPARE<OK> ACCEPTER_A:ACCEPT<NO RESPONSE> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[5:ProjectB]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<6> ACCEPTER_A:PREPARE<NO RESPONSE> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<NO RESPONSE> ACCEPTER_A:ACCEPT<NO RESPONSE> ACCEPTER_B:ACCEPT<NO RESPONSE> ACCEPTER_C:ACCEPT<NO RESPONSE> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[6:ProjectC]:VOTE<NOT ACCEPTED> VOTE_ROUND:START<7> ACCEPTER_A:PREPARE<OK> ACCEPTER_B:PREPARE<OK> ACCEPTER_C:PREPARE<OK> ACCEPTER_D:PREPARE<OK> ACCEPTER_E:PREPARE<NO RESPONSE> ACCEPTER_A:ACCEPT<OK> ACCEPTER_B:ACCEPT<OK> ACCEPTER_C:ACCEPT<OK> ACCEPTER_D:ACCEPT<NO RESPONSE> ACCEPTER_E:ACCEPT<NO RESPONSE> PROPOSER[7:ProjectB]:VOTE<SUCCESS>
通過7輪表決終於達成一致.