樸素Paxos(Basic Paxos)算法java簡易實現

Paxos算法是大名鼎鼎的Zookeeper中採用的選舉Leader的算法,事實上,在涉及到分佈式系統的一致性的時候,就只有一種算法,那就是Paxos.java

首先來看,Paxos是爲了解決什麼問題:算法

Paxos 算法解決的問題是一個分佈式系統如何就某個值(決議)達成一致。一個典型的場景是,在一個分佈式數據庫存儲中,若是各節點的初始狀態一致,每一個節點執行相同的操做序列,那麼他們最後能獲得一個一致的狀態。爲保證每一個節點執行相同的命令序列,須要在每一條指令上執行一個「一致性算法」以保證每一個節點看到的指令一致。一個通用的一致性算法能夠應用在許多場景中,是分佈式計算中的重要問題。所以從20世紀80年代起對於一致性算法的研究就沒有中止過。節點通訊存在兩種模型:共享內存(Shared memory)和消息傳遞(Messages passing)。Paxos 算法就是一種基於消息傳遞模型的一致性算法。數據庫

本文的實現參考了LynnCui在知乎上的文章:https://zhuanlan.zhihu.com/p/21438357promise

有關Paxos的詳細介紹及推理過程都可參考上述文章,若是還有不明白,能夠查閱相關文章,這一類的文章網上仍是比較多的。服務器

關於個人實現,我這裏有幾點須要說明:網絡

1.關於maxVote(即比本次表決編號小的最大表決編號),若是該編號存在,則應該用改編號對應表決的提案做爲下一次的提案,不然,則能夠隨機指定提案,具體的提案的指定方式依賴於需求邏輯,本程序中採用隨機提案;app

2.Paxos假設消息的傳遞過程是不可靠的,這主要是由於實際環境中,網絡通訊是沒有辦法保障的。網絡延遲以及服務器自己也有可能宕機,這些條件其實都在Paxos的前提假設之中,本程序當中作了一個假設,假設網絡通訊有50%的機率失敗,其實是我隨便指定的;dom

3.Java自己是一門繁瑣的語言,因此不免會有很大冗餘代碼,這主要是爲了保證程序的穩定性,還有一部分是出於習慣。Paxos其實並非一個複雜的算法,至少基本的不是。程序中所採用的的面向對象的設計也不必定是合理的;分佈式

4.本程序依賴了Google的guava包以及common-lang3的包,實際中徹底能夠去掉。guava主要是在重載hashCode()方法的時候用到,但程序中並無使用hashCode的地方,實際只是由於在重載equals()方法的時候老是應該重載hashCode()方法。lang3主要作了字符串比較,應對字符串爲空的狀況。除此以外沒有任何依賴。ide

Java實現參考:

public final class PaxosDemo {

    private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
    private static final Random RANDOM = new Random();
    private static final String[] PROPOSALS = {"ProjectA", "ProjectB", "ProjectC"};

    public static void main(String[] args) {
        List<Acceptor> acceptors = new ArrayList<Acceptor>();
        Arrays.asList("A", "B", "C", "D", "E")
                .forEach(name -> acceptors.add(new Acceptor(name)));
        Proposer.vote(new Proposal(1L, null), acceptors);
    }

    private static void printInfo(String subject, String operation, String result) {
        System.out.println(subject + ":" + operation + "<" + result + ">");
    }

    /**
     * 對於提案的約束,第三條約束要求:
     * 若是maxVote不存在,那麼沒有限制,下一次表決可使用任意提案;
     * 不然,下一次表決要沿用maxVote的提案
     *
     * @param currentVoteNumber
     * @param proposals
     * @return
     */
    private static Proposal nextProposal(long currentVoteNumber, List<Proposal> proposals) {
        long voteNumber = currentVoteNumber + 1;
        if (proposals.isEmpty())
            return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
        Collections.sort(proposals);
        Proposal maxVote = proposals.get(proposals.size() - 1);
        long maxVoteNumber = maxVote.getVoteNumber();
        String content = maxVote.getContent();
        if (maxVoteNumber >= currentVoteNumber)
            throw new IllegalStateException("illegal state maxVoteNumber");
        if (content != null)
            return new Proposal(voteNumber, content);
        else return new Proposal(voteNumber, PROPOSALS[RANDOM.nextInt(PROPOSALS.length)]);
    }


    private static class Proposer {

        /**
         * @param proposal
         * @param acceptors
         */
        public static void vote(Proposal proposal, Collection<Acceptor> acceptors) {
            int quorum = Math.floorDiv(acceptors.size(), 2) + 1;
            int count = 0;
            while (true) {
                printInfo("VOTE_ROUND", "START", ++count + "");
                List<Proposal> proposals = new ArrayList<Proposal>();
                for (Acceptor acceptor : acceptors) {
                    Promise promise = acceptor.onPrepare(proposal);
                    if (promise != null && promise.isAck())
                        proposals.add(promise.getProposal());
                }
                if (proposals.size() < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT PREPARED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                int acceptCount = 0;
                for (Acceptor acceptor : acceptors) {
                    if (acceptor.onAccept(proposal))
                        acceptCount++;
                }
                if (acceptCount < quorum) {
                    printInfo("PROPOSER[" + proposal + "]", "VOTE", "NOT ACCEPTED");
                    proposal = nextProposal(proposal.getVoteNumber(), proposals);
                    continue;
                }
                break;
            }
            printInfo("PROPOSER[" + proposal + "]", "VOTE", "SUCCESS");
        }


    }

    private static class Acceptor {

        //上次表決結果
        private Proposal last = new Proposal();
        private String name;

        public Acceptor(String name) {
            this.name = name;
        }

        public Promise onPrepare(Proposal proposal) {
            //假設這個過程有50%的概率失敗
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "PREPARE", "NO RESPONSE");
                return null;
            }
            if (proposal == null)
                throw new IllegalArgumentException("null proposal");
            if (proposal.getVoteNumber() > last.getVoteNumber()) {
                Promise response = new Promise(true, last);
                last = proposal;
                printInfo("ACCEPTER_" + name, "PREPARE", "OK");
                return response;
            } else {
                printInfo("ACCEPTER_" + name, "PREPARE", "REJECTED");
                return new Promise(false, null);
            }
        }

        public boolean onAccept(Proposal proposal) {
            //假設這個過程有50%的概率失敗
            if (Math.random() - 0.5 > 0) {
                printInfo("ACCEPTER_" + name, "ACCEPT", "NO RESPONSE");
                return false;
            }
            printInfo("ACCEPTER_" + name, "ACCEPT", "OK");
            return last.equals(proposal);
        }
    }

    private static class Promise {

        private final boolean ack;
        private final Proposal proposal;

        public Promise(boolean ack, Proposal proposal) {
            this.ack = ack;
            this.proposal = proposal;
        }

        public boolean isAck() {
            return ack;
        }

        public Proposal getProposal() {
            return proposal;
        }
    }

    private static class Proposal implements Comparable<Proposal> {

        private final long voteNumber;
        private final String content;

        public Proposal(long voteNumber, String content) {
            this.voteNumber = voteNumber;
            this.content = content;
        }

        public Proposal() {
            this(0, null);
        }

        public long getVoteNumber() {
            return voteNumber;
        }

        public String getContent() {
            return content;
        }

        @Override
        public int compareTo(Proposal o) {
            return Long.compare(voteNumber, o.voteNumber);
        }

        @Override
        public boolean equals(Object obj) {
            if (obj == null)
                return false;
            if (!(obj instanceof Proposal))
                return false;
            Proposal proposal = (Proposal) obj;
            return voteNumber == proposal.voteNumber && StringUtils.equals(content, proposal.content);
        }

        @Override
        public int hashCode() {
            return HASH_FUNCTION
                    .newHasher()
                    .putLong(voteNumber)
                    .putString(content, Charsets.UTF_8)
                    .hash()
                    .asInt();
        }

        @Override
        public String toString() {
            return new StringBuilder()
                    .append(voteNumber)
                    .append(':')
                    .append(content)
                    .toString();
        }
    }

}

如下是我試着運行了一下程序的結果:

VOTE_ROUND:START<1>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<NO RESPONSE>
PROPOSER[1:null]:VOTE<NOT PREPARED>
VOTE_ROUND:START<2>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<OK>
ACCEPTER_E:ACCEPT<OK>
PROPOSER[2:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<3>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[3:ProjectB]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<4>
ACCEPTER_A:PREPARE<NO RESPONSE>
ACCEPTER_B:PREPARE<NO RESPONSE>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<OK>
ACCEPTER_E:ACCEPT<OK>
PROPOSER[4:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<5>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<NO RESPONSE>
ACCEPTER_D:PREPARE<NO RESPONSE>
ACCEPTER_E:PREPARE<OK>
ACCEPTER_A:ACCEPT<NO RESPONSE>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[5:ProjectB]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<6>
ACCEPTER_A:PREPARE<NO RESPONSE>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<NO RESPONSE>
ACCEPTER_A:ACCEPT<NO RESPONSE>
ACCEPTER_B:ACCEPT<NO RESPONSE>
ACCEPTER_C:ACCEPT<NO RESPONSE>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[6:ProjectC]:VOTE<NOT ACCEPTED>
VOTE_ROUND:START<7>
ACCEPTER_A:PREPARE<OK>
ACCEPTER_B:PREPARE<OK>
ACCEPTER_C:PREPARE<OK>
ACCEPTER_D:PREPARE<OK>
ACCEPTER_E:PREPARE<NO RESPONSE>
ACCEPTER_A:ACCEPT<OK>
ACCEPTER_B:ACCEPT<OK>
ACCEPTER_C:ACCEPT<OK>
ACCEPTER_D:ACCEPT<NO RESPONSE>
ACCEPTER_E:ACCEPT<NO RESPONSE>
PROPOSER[7:ProjectB]:VOTE<SUCCESS>

通過7輪表決終於達成一致.

相關文章
相關標籤/搜索