實戰案例丨分佈式系統中如何用python實現Paxos

摘要:提到分佈式算法,就不得不提 Paxos 算法,在過去幾十年裏,它基本上是分佈式共識的代 名詞,由於當前最經常使用的一批共識算法都是基於它改進的。好比,Fast Paxos 算法、 Cheap Paxos 算法、Raft 算法、ZAB 協議等等。

本文分享自華爲雲社區《實戰分佈式系統-python實現Paxos》,原文做者:Leo Xiao 。html

一致性算法背景:Paxos

  1. 一致性算法解決的問題:分佈式系統中數據不能存在單個節點(主機)上,不然可能出現單點故障;多個節點(主機)須要保證具備相同的數據。
  2. 什麼是一致性:一致性就是數據保持一致,在分佈式系統中,能夠理解爲多個節點中數據的值是一致的。
  3. 一致性模型分類:通常分爲強一致性和弱一致性,強一致性保證系統改變提交之後當即改變集羣的狀態。常見模型包括:Paxos,Raft(muti-paxos),ZAB(muti-paxos); 弱一致性也叫最終一致性,系統不保證改變提交之後當即改變集羣的狀態,可是隨着時間的推移最終狀態一致的。常見模型包括:DNS系統,Gossip協議
  4. 一致性算法使用案例: Google的Chubby分佈式鎖服務,採用了Paxos算法;etcd分佈式鍵值數據庫,採用了Raft算法;ZooKeeper分佈式應用協調服務以及Chubby的開源實現,採用ZAB算法
  • simple-paxos就單個靜態值達一致性自己並不實用,咱們須要實現的集羣系統(銀行帳戶服務)但願就隨時間變化的特定狀態(帳戶餘額)達成一致。因此須要使用Paxos就每一個操做達成一致,將每一個修改視爲狀態機轉換。
  • Multi-Paxos其實是simple Paxos實例(插槽)的序列,每一個實例都按順序編號。每一個狀態轉換都被賦予一個「插槽編號」,集羣的每一個成員都以嚴格的數字順序執行轉換。爲了更改羣集的狀態(例如,處理傳輸操做),咱們嘗試在下一個插槽中就該操做達成一致性。具體來講,這意味着向每一個消息添加一個插槽編號,並在每一個插槽的基礎上跟蹤全部協議狀態。
  • 爲每一個插槽運行Paxos,至少兩次往返會太慢。Multi-Paxos經過對全部插槽使用相同的選票編號集進行優化,並同時對全部插槽執行Prepare/Promise。
Client   Proposer      Acceptor     Learner
   |         |          |  |  |       |  | --- First Request ---
   X-------->|          |  |  |       |  |  Request
   |         X--------->|->|->|       |  |  Prepare(N)
   |         |<---------X--X--X       |  |  Promise(N,I,{Va,Vb,Vc})
   |         X--------->|->|->|       |  |  Accept!(N,I,V)
   |         |<---------X--X--X------>|->|  Accepted(N,I,V)
   |<---------------------------------X--X  Response
   |         |          |  |  |       |  |

Paxos實現

在實用軟件中實現Multi-Paxos是出了名的困難,催生了許多論文如"Paxos Made Simple",「Paxos Made Practical」node

  • 首先,multi-poposer在繁忙的環境中可能會成爲問題,由於每一個羣集成員都試圖在每一個插槽中決定其狀態機操做。解決方法是選舉一名「leader」,負責爲每一個時段提交選票。全部其餘羣集節點將新操做發送到領導者執行。所以,在只有一名領導人的正常運做中,不會發生投票衝突。

Prepare/Promise階段能夠做爲一種leader選舉:不管哪一個集羣成員擁有最近承諾的選票號碼,都被視爲leader。leader後續能夠自由地直接執行Accept/Accepted階段,而不重複第一階段。咱們將在下文看到的,leader選舉其實是至關複雜的。python

雖然simple Paxos保證集羣不會達成衝突的決定,但它不能保證會作出任何決定。例如,若是初始的Prepare消息丟失,而且沒有到達接受者,則提議者將等待永遠不會到達的Promise消息。解決這個問題須要精心設計的從新傳輸:足以最終取得進展,但不會羣集產生數據包風暴。web

  • 另外一個問題是決定的傳播。在正常狀況下,簡單地廣播Decision信息就能夠解決這個問題。可是,若是消息丟失,節點可能會永遠不知道該決定,而且沒法爲之後的插槽應用狀態機轉換。因此實現須要一些機制來共享有關已決定提案的信息。

使用分佈式狀態機帶來了另外一個挑戰:當新節點啓動時,它須要獲取羣集的現有狀態。
雖然能夠經過遇上第一個插槽以來的全部插槽的決策來作到這一點,但在一個大的集羣中,這可能涉及數百萬個插槽。此外,咱們須要一些方法來初始化一個新的羣集。算法

集羣庫介紹

前面都是理論介紹,下面咱們使用python來實現一個簡化的Multi-Paxos數據庫

業務場景和痛點

咱們以簡單的銀行帳戶管理服務的場景做爲案例。在這個服務中,每個帳戶都有一個當前餘額,同時每一個帳戶都有本身的帳號。用戶能夠對帳戶進行「存款」、「轉帳」、「查詢當前餘額」等操做。「轉帳」操做同時涉及了兩個帳戶:轉出帳戶和轉入帳戶,若是帳戶餘額不足,轉帳操做必須被駁回。bootstrap

  • 若是這個服務僅僅在一個服務器上部署,很容易就可以實現:使用一個操做鎖來確保「轉帳」操做不會同時進行,同時對轉出帳戶的進行校驗。然而,銀行不可能僅僅依賴於一個服務器來儲存帳戶餘額這樣的關鍵信息,一般,這些服務都是被分佈在多個服務器上的,每個服務器各自運行着相同代碼的實例。用戶能夠經過任何一個服務器來操做帳戶。
  • 在一個簡單的分佈式處理系統的實現中,每一個服務器都會保存一份帳戶餘額的副本。它會處理任何收到的操做,而且將帳戶餘額的更新發送給其餘的服務器。可是這種方法有一個嚴重的問題:若是兩個服務器同時對一個帳戶進行操做,哪個新的帳戶餘額是正確的?即便服務器不共享餘額而是共享操做,對一個帳戶同時進行轉帳操做也可能形成透支。
  • 從根本上來講,這些錯誤的發生都是因爲服務器使用它們本地狀態來響應操做,而不是首先確保本地狀態與其餘服務器相匹配。好比,想象服務器A接到了從帳號101向帳號202轉帳的操做指令,而此時服務器B已經處理了另外一個把帳號101的錢都轉到帳號202的請求,卻沒有通知服務器A。這樣,服務器A的本地狀態與服務器B不同,即便會形成帳戶101透支,服務器A依然容許從帳號101進行轉帳操做。

分佈式狀態機

爲了防止上述狀況發生咱們採用了一種叫作「分佈式狀態機」的工具。它的思路是對每一個一樣的輸入,每一個服務器都運行一樣的對應的狀態機。因爲狀態機的特性,對於一樣的輸入每一個服務器的輸出都是同樣的。對於像「轉帳」、「查詢當前餘額」等操做,帳號和餘額也都是狀態機的輸入。promise

這個應用的狀態機比較簡單:服務器

def execute_operation(state, operation):
     if operation.name == 'deposit':
         if not verify_signature(operation.deposit_signature):
         return state, False
         state.accounts[operation.destination_account] += operation.amount
         return state, True
     elif operation.name == 'transfer':
         if state.accounts[operation.source_account] < operation.amount:
             return state, False
             state.accounts[operation.source_account] -= operation.amount
         state.accounts[operation.destination_account] += operation.amount
         return state, True
     elif operation.name == 'get-balance':
     return state, state.accounts[operation.account]

值得注意的是,運行「查詢當前餘額」操做時雖然並不會改變當前狀態,可是咱們依然把它當作一個狀態變化操做來實現。這確保了返回的餘額是分佈式系統中的最新信息,而且不是基於一個服務器上的本地狀態來進行返回的。網絡

這可能跟你在計算機課程中學習到的典型的狀態機不太同樣。傳統的狀態機是一系列有限個狀態的集合,每一個狀態都與一個標記的轉移行爲相對應,而在本文中,狀態機的狀態是帳戶餘額的集合,所以存在無窮多個可能的狀態。可是,狀態機的基本規則一樣適用於本文的狀態機:對於一樣的初始狀態,一樣的輸入老是有一樣的輸出。

所以,分佈式狀態機確保了對於一樣的操做,每一個主機都會有一樣的相應。可是,爲了確保每一個服務器都容許狀態機的輸入,前文中提到的問題依然存在。這是一個一致性問題,爲了解決它咱們採用了一種派生的Paxos算法。

核心需求

  1. 能夠爲較大的應用程序提供一致性服務: 咱們用一個Cluster庫來實現簡化的Multi-Paxos
  2. 正確性是這個庫最重要的能力,所以結構化代碼是很重要的,以便咱們能夠看到並測試它與規範的對應關係。
  3. 複雜的協議可能會出現複雜的故障,所以咱們將構建對復現和調試不常見的故障的支持。
  4. 咱們會實現POC代碼:足以證實核心概念是實用的,代碼的結構化是爲了後續添加此功能對核心實現的更改最小
    咱們開始coding吧。

類型和常量

cluster中的協議須要使用15不一樣的消息類型,每種消息類型使用collection中的namedturple定義:

Accepted = namedtuple('Accepted', ['slot', 'ballot_num'])
    Accept = namedtuple('Accept', ['slot', 'ballot_num', 'proposal'])
    Decision = namedtuple('Decision', ['slot', 'proposal'])
    Invoked = namedtuple('Invoked', ['client_id', 'output'])
    Invoke = namedtuple('Invoke', ['caller', 'client_id', 'input_value'])
    Join = namedtuple('Join', [])
    Active = namedtuple('Active', [])
    Prepare = namedtuple('Prepare', ['ballot_num'])
    Promise = namedtuple('Promise', ['ballot_num', 'accepted_proposals'])
    Propose = namedtuple('Propose', ['slot', 'proposal'])
    Welcome = namedtuple('Welcome', ['state', 'slot', 'decisions'])
    Decided = namedtuple('Decided', ['slot'])
    Preempted = namedtuple('Preempted', ['slot', 'preempted_by'])
    Adopted = namedtuple('Adopted', ['ballot_num', 'accepted_proposals'])
    Accepting = namedtuple('Accepting', ['leader'])

使用命名元組描述每種消息類型能夠保持代碼的clean,並有助於避免一些簡單的錯誤。若是命名元組構造函數沒有被賦予正確的屬性,則它將引起異常,從而使錯誤變得明顯。元組在日誌消息中k能夠很好地格式化,不會像字典那樣使用那麼多的內存。
建立消息:

msg = Accepted(slot=10, ballot_num=30)

訪問消息:

got_ballot_num = msg.ballot_num

後面咱們會了解這些消息的含義。
代碼還引入了一些常量,其中大多數常量定義了各類消息的超時:

JOIN_RETRANSMIT = 0.7
    CATCHUP_INTERVAL = 0.6
    ACCEPT_RETRANSMIT = 1.0
    PREPARE_RETRANSMIT = 1.0
    INVOKE_RETRANSMIT = 0.5
    LEADER_TIMEOUT = 1.0
    NULL_BALLOT = Ballot(-1, -1)  # sorts before all real ballots
    NOOP_PROPOSAL = Proposal(None, None, None)  # no-op to fill otherwise empty slots

最後咱們須要定義協議中的Proposal和Ballot

Proposal = namedtuple('Proposal', ['caller', 'client_id', 'input'])
    Ballot = namedtuple('Ballot', ['n', 'leader'])

組件模型

實現multi-paxos的核心組件包括Role和Node。

  • 爲了保證可測試性並保持代碼的可讀性,咱們將Cluster分解爲與協議中描述的角色相對應的幾個類。每一個都是Role的子類。
class Role(object):

    def __init__(self, node):
        self.node = node
        self.node.register(self)
        self.running = True
        self.logger = node.logger.getChild(type(self).__name__)

    def set_timer(self, seconds, callback):
        return self.node.network.set_timer(self.node.address, seconds,
                                           lambda: self.running and callback())

    def stop(self):
        self.running = False
        self.node.unregister(self)

羣集節點的角色由Node類粘在一塊兒,該類表明網絡上的單個節點。在程序過程當中角色將添加到節點中,並從節點中刪除。

到達節點的消息將中繼到全部活動角色,調用以消息類型命名的方法,前綴爲do_。 這些do_方法接收消息的屬性做爲關鍵字參數,以便於訪問。Node``類還提供了``send方法做爲方便,使用functools.partial爲Network類的相同方法提供一些參數。

class Node(object):
    unique_ids = itertools.count()

    def __init__(self, network, address):
        self.network = network
        self.address = address or 'N%d' % self.unique_ids.next()
        self.logger = SimTimeLogger(
            logging.getLogger(self.address), {'network': self.network})
        self.logger.info('starting')
        self.roles = []
        self.send = functools.partial(self.network.send, self)

    def register(self, roles):
        self.roles.append(roles)

    def unregister(self, roles):
        self.roles.remove(roles)

    def receive(self, sender, message):
        handler_name = 'do_%s' % type(message).__name__

        for comp in self.roles[:]:
            if not hasattr(comp, handler_name):
                continue
            comp.logger.debug("received %s from %s", message, sender)
            fn = getattr(comp, handler_name)
            fn(sender=sender, **message._asdict())

應用接口

每一個集羣成員上都會建立並啓動一個Member對象,提供特定於應用程序的狀態機和對等項列表。若是成員對象正在加入現有集羣,則該成員對象向該節點添加bootstrap角色,若是正在建立新集羣,則該成員對象添加seed。再用Network.run在單獨的線程中運行協議。

應用程序經過該invoke方法與集羣進行交互,從而啓動了狀態轉換, 肯定該提議並運行狀態機後,invoke將返回狀態機的輸出。該方法使用簡單的同步Queue來等待協議線程的結果。

class Member(object):

    def __init__(self, state_machine, network, peers, seed=None,
                 seed_cls=Seed, bootstrap_cls=Bootstrap):
        self.network = network
        self.node = network.new_node()
        if seed is not None:
            self.startup_role = seed_cls(self.node, initial_state=seed, peers=peers,
                                      execute_fn=state_machine)
        else:
            self.startup_role = bootstrap_cls(self.node,
                                      execute_fn=state_machine, peers=peers)
        self.requester = None

    def start(self):
        self.startup_role.start()
        self.thread = threading.Thread(target=self.network.run)
        self.thread.start()

    def invoke(self, input_value, request_cls=Requester):
        assert self.requester is None
        q = Queue.Queue()
        self.requester = request_cls(self.node, input_value, q.put)
        self.requester.start()
        output = q.get()
        self.requester = None
        return output

Role 類

Paxos協議中的角色包括:client, acceptor, proposer, learner, and leader。在典型的實現中,單個processor能夠同時扮演一個或多個角色。這不會影響協議的正確性,一般會合並角色以改善協議中的延遲和/或消息數量。

下面逐一實現每一個角色類

Acceptor

Acceptor 類實現的是Paxos中的 acceptor角色,因此必須存儲最近promise的選票編號,以及每一個時段接受的各個slot的proposal,同時須要相應Prepare和Accept消息。 這裏的POC實現是一個和協議能夠直接對應的短類,對於acceptor來講Multi-paxos看起來像是簡單的Paxos,只是在message中添加了slot number。

class Acceptor(Role):

    def __init__(self, node):
        super(Acceptor, self).__init__(node)
        self.ballot_num = NULL_BALLOT
        self.accepted_proposals = {}  # {slot: (ballot_num, proposal)}

    def do_Prepare(self, sender, ballot_num):
        if ballot_num > self.ballot_num:
            self.ballot_num = ballot_num
            # we've heard from a scout, so it might be the next leader
            self.node.send([self.node.address], Accepting(leader=sender))

        self.node.send([sender], Promise(
            ballot_num=self.ballot_num, 
            accepted_proposals=self.accepted_proposals
        ))

    def do_Accept(self, sender, ballot_num, slot, proposal):
        if ballot_num >= self.ballot_num:
            self.ballot_num = ballot_num
            acc = self.accepted_proposals
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

        self.node.send([sender], Accepted(
            slot=slot, ballot_num=self.ballot_num))

Replica

Replica類是Role類最複雜的子類,對應協議中的Learner和Proposal角色,它的主要職責是:提出新的proposal;在決定proposal時調用本地狀態機;跟蹤當前Leader;以及將新啓動的節點添加到集羣中。

Replica建立新的proposal以響應來自客戶端的「invoke」消息,選擇它認爲是未使用的插槽,並向當前leader發送「Propose」消息。若是選定插槽的共識是針對不一樣proposal,則replica必須使用新插槽re-propose。

下圖顯示Replica的角色控制流程:

Requester    Local Rep   Current Leader
   X---------->|             |    Invoke
   |           X------------>|    Propose
   |           |<------------X    Decision
   |<----------X             |    Decision
   |           |             |

Decision消息表示集羣已達成共識的插槽, Replica類存儲新的決定並運行狀態機,直到到達未肯定的插槽。Replica從本地狀態機已處理的提交的slot識別出集羣已贊成的已決定的slot。若是slot出現亂序,提交的提案可能會滯後,等待下一個空位被決定。提交slot後,每一個replica會將操做結果發送回一條Invoked消息給請求者。

在某些狀況下slot可能沒有有效的提案,也沒有決策,須要狀態機一個接一個地執行slot,所以羣集必須就填充slot的內容達成共識。爲了不這種可能性,Replica在遇到插槽時會提出「no-op」的proposal。若是最終決定了這樣的proposal,則狀態機對該slot不執行任何操做。

一樣,同一proposal有可能被Decision兩次。對於任何此類重複的proposal,Replica將跳過調用狀態機,而不會對該slot執行任何狀態轉換。

Replicas須要知道哪一個節點是active leader才能向其發送Propose消息, 要實現這一目標,每一個副本都使用三個信息源跟蹤active leader。

當leader 的角色轉換爲active時,它會向同一節點上的副本發送一條Adopted消息(下圖):

Leader    Local Repplica   
   X----------->|          Admopted

當acceptor角色向Promise新的leader發送Accepting消息時,它將消息發送到其本地副本(下圖)。

Acceptor    Local Repplica   
   X----------->|          Accepting

active leader將以心跳的形式發送Active消息。若是在LEADER_TIMEOUT到期以前沒有此類消息到達,則Replica將假定該Leader已死,並轉向下一個Leader。在這種狀況下,重要的是全部副本都選擇相同的新領導者,咱們能夠經過對成員進行排序並在列表中選擇下一個leader。

當節點加入網絡時,Bootstrap將發送一條Join消息(下圖)。Replica以一條Welcome包含其最新狀態的消息做爲響應,從而使新節點可以快速啓用。

BootStrap     Replica        Replica       Replica
     X---------->|             |             |    Join
     |<----------X             X             |    Welcome
     X------------------------>|             |    Join
     |<------------------------X             |    Welcome
     X-------------------------------------->|    Join
     |<--------------------------------------X    Welcome      
class Replica(Role):

    def __init__(self, node, execute_fn, state, slot, decisions, peers):
        super(Replica, self).__init__(node)
        self.execute_fn = execute_fn
        self.state = state
        self.slot = slot
        self.decisions = decisions
        self.peers = peers
        self.proposals = {}
        # next slot num for a proposal (may lead slot)
        self.next_slot = slot
        self.latest_leader = None
        self.latest_leader_timeout = None

    # making proposals

    def do_Invoke(self, sender, caller, client_id, input_value):
        proposal = Proposal(caller, client_id, input_value)
        slot = next((s for s, p in self.proposals.iteritems() if p == proposal), None)
        # propose, or re-propose if this proposal already has a slot
        self.propose(proposal, slot)

    def propose(self, proposal, slot=None):
        """Send (or resend, if slot is specified) a proposal to the leader"""
        if not slot:
            slot, self.next_slot = self.next_slot, self.next_slot + 1
        self.proposals[slot] = proposal
        # find a leader we think is working - either the latest we know of, or
        # ourselves (which may trigger a scout to make us the leader)
        leader = self.latest_leader or self.node.address
        self.logger.info(
            "proposing %s at slot %d to leader %s" % (proposal, slot, leader))
        self.node.send([leader], Propose(slot=slot, proposal=proposal))

    # handling decided proposals

    def do_Decision(self, sender, slot, proposal):
        assert not self.decisions.get(self.slot, None), \
                "next slot to commit is already decided"
        if slot in self.decisions:
            assert self.decisions[slot] == proposal, \
                "slot %d already decided with %r!" % (slot, self.decisions[slot])
            return
        self.decisions[slot] = proposal
        self.next_slot = max(self.next_slot, slot + 1)

        # re-propose our proposal in a new slot if it lost its slot and wasn't a no-op
        our_proposal = self.proposals.get(slot)
        if (our_proposal is not None and 
            our_proposal != proposal and our_proposal.caller):
            self.propose(our_proposal)

        # execute any pending, decided proposals
        while True:
            commit_proposal = self.decisions.get(self.slot)
            if not commit_proposal:
                break  # not decided yet
            commit_slot, self.slot = self.slot, self.slot + 1

            self.commit(commit_slot, commit_proposal)

    def commit(self, slot, proposal):
        """Actually commit a proposal that is decided and in sequence"""
        decided_proposals = [p for s, p in self.decisions.iteritems() if s < slot]
        if proposal in decided_proposals:
            self.logger.info(
                "not committing duplicate proposal %r, slot %d", proposal, slot)
            return  # duplicate

        self.logger.info("committing %r at slot %d" % (proposal, slot))
        if proposal.caller is not None:
            # perform a client operation
            self.state, output = self.execute_fn(self.state, proposal.input)
            self.node.send([proposal.caller], 
                Invoked(client_id=proposal.client_id, output=output))

    # tracking the leader

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.latest_leader = self.node.address
        self.leader_alive()

    def do_Accepting(self, sender, leader):
        self.latest_leader = leader
        self.leader_alive()

    def do_Active(self, sender):
        if sender != self.latest_leader:
            return
        self.leader_alive()

    def leader_alive(self):
        if self.latest_leader_timeout:
            self.latest_leader_timeout.cancel()

        def reset_leader():
            idx = self.peers.index(self.latest_leader)
            self.latest_leader = self.peers[(idx + 1) % len(self.peers)]
            self.logger.debug("leader timed out; tring the next one, %s", 
                self.latest_leader)
        self.latest_leader_timeout = self.set_timer(LEADER_TIMEOUT, reset_leader)

    # adding new cluster members

    def do_Join(self, sender):
        if sender in self.peers:
            self.node.send([sender], Welcome(
                state=self.state, slot=self.slot, decisions=self.decisions))

Leader Scout Commander

Leader的主要任務是接受Propose要求新投票的消息並作出決定。成功完成協議的Prepare/Promise部分後Leader將處於「Active狀態」 。活躍的Leader能夠當即發送Accept消息以響應Propose。

與按角色分類的模型保持一致,Leader會委派scout和Commander角色來執行協議的每一個部分。

class Leader(Role):

    def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
        super(Leader, self).__init__(node)
        self.ballot_num = Ballot(0, node.address)
        self.active = False
        self.proposals = {}
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls
        self.scouting = False
        self.peers = peers

    def start(self):
        # reminder others we're active before LEADER_TIMEOUT expires
        def active():
            if self.active:
                self.node.send(self.peers, Active())
            self.set_timer(LEADER_TIMEOUT / 2.0, active)
        active()

    def spawn_scout(self):
        assert not self.scouting
        self.scouting = True
        self.scout_cls(self.node, self.ballot_num, self.peers).start()

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.scouting = False
        self.proposals.update(accepted_proposals)
        # note that we don't re-spawn commanders here; if there are undecided
        # proposals, the replicas will re-propose
        self.logger.info("leader becoming active")
        self.active = True

    def spawn_commander(self, ballot_num, slot):
        proposal = self.proposals[slot]
        self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()

    def do_Preempted(self, sender, slot, preempted_by):
        if not slot:  # from the scout
            self.scouting = False
        self.logger.info("leader preempted by %s", preempted_by.leader)
        self.active = False
        self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, 
                                 self.ballot_num.leader)

    def do_Propose(self, sender, slot, proposal):
        if slot not in self.proposals:
            if self.active:
                self.proposals[slot] = proposal
                self.logger.info("spawning commander for slot %d" % (slot,))
                self.spawn_commander(self.ballot_num, slot)
            else:
                if not self.scouting:
                    self.logger.info("got PROPOSE when not active - scouting")
                    self.spawn_scout()
                else:
                    self.logger.info("got PROPOSE while scouting; ignored")
        else:
            self.logger.info("got PROPOSE for a slot already being proposed")

Leader想要變爲活動狀態時會建立一個Scout角色,以響應Propose在其處於非活動狀態時收到消息(下圖),Scout發送(並在必要時從新發送)Prepare消息,並收集Promise響應,直到聽到消息爲止。多數同行或直到被搶佔爲止。在經過Adopted或Preempted回覆給Leader。

Leader    Scout      Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Prepare
   |          |<---------X            |           |    Promise
   |          X---------------------->|           |    Prepare
   |          |<----------------------X           |    Promise
   |          X---------------------------------->|    Prepare
   |          |<----------------------------------X    Promise
   |<---------X          |            |           |    Adopted
class Scout(Role):

 

def __init__(self, node, ballot_num, peers):
        super(Scout, self).__init__(node)
        self.ballot_num = ballot_num
        self.accepted_proposals = {}
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1
        self.retransmit_timer = None

    def start(self):
        self.logger.info("scout starting")
        self.send_prepare()

    def send_prepare(self):
        self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
        self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)

    def update_accepted(self, accepted_proposals):
        acc = self.accepted_proposals
        for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

    def do_Promise(self, sender, ballot_num, accepted_proposals):
        if ballot_num == self.ballot_num:
            self.logger.info("got matching promise; need %d" % self.quorum)
            self.update_accepted(accepted_proposals)
            self.acceptors.add(sender)
            if len(self.acceptors) >= self.quorum:
                # strip the ballot numbers from self.accepted_proposals, now that it
                # represents a majority
                accepted_proposals = \ 
                    dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
                # We're adopted; note that this does *not* mean that no other
                # leader is active.  # Any such conflicts will be handled by the
                # commanders.
                self.node.send([self.node.address],
                    Adopted(ballot_num=ballot_num, 
                            accepted_proposals=accepted_proposals))
                self.stop()
        else:
            # this acceptor has promised another leader a higher ballot number,
            # so we've lost
            self.node.send([self.node.address], 
                Preempted(slot=None, preempted_by=ballot_num))
            self.stop()

Leader爲每一個有active proposal的slot建立一個Commander角色(下圖)。像Scout同樣,Commander發送和從新發送Accept消息,並等待大多數接受者的回覆Accepted或搶佔消息。接受建議後,Commander將Decision消息廣播到全部節點。它用Decided或Preempted響應Leader。

Leader    Commander   Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Accept
   |          |<---------X            |           |    Accepted
   |          X---------------------->|           |    Accept
   |          |<----------------------X           |    Accepted
   |          X---------------------------------->|    Accept
   |          |<----------------------------------X    Accepted
   |<---------X          |            |           |    Decided
class Commander(Role):

 

def __init__(self, node, ballot_num, slot, proposal, peers):
        super(Commander, self).__init__(node)
        self.ballot_num = ballot_num
        self.slot = slot
        self.proposal = proposal
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1

    def start(self):
        self.node.send(set(self.peers) - self.acceptors, Accept(
            slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
        self.set_timer(ACCEPT_RETRANSMIT, self.start)

    def finished(self, ballot_num, preempted):
        if preempted:
            self.node.send([self.node.address], 
                           Preempted(slot=self.slot, preempted_by=ballot_num))
        else:
            self.node.send([self.node.address], 
                           Decided(slot=self.slot))
        self.stop()

    def do_Accepted(self, sender, slot, ballot_num):
        if slot != self.slot:
            return
        if ballot_num == self.ballot_num:
            self.acceptors.add(sender)
            if len(self.acceptors) < self.quorum:
                return
            self.node.send(self.peers, Decision(
                           slot=self.slot, proposal=self.proposal))
            self.finished(ballot_num, False)
        else:
            self.finished(ballot_num, True)

有一個問題是後續會介紹的網絡模擬器甚至在節點內的消息上也引入了數據包丟失。當全部 Decision消息丟失時,該協議沒法繼續進行。Replica繼續從新傳輸Propose消息,可是Leader忽略了這些消息,由於它已經對該slot提出了proposal,因爲沒有Replica收到Decision因此Replica的catch過程找不到結果,解決方案是像實際網絡堆棧以西洋確保本地消息始終傳遞成功。

Bootstrap

node加入cluster時必須獲取當前的cluster狀態, Bootstrap role循環每一個節點發送join消息,知道收到Welcome, Bootstrap的時序圖以下所示:

若是在每一個role(replica,leader,acceptor)中實現啓動過程,並等待welcome消息,會把初始化邏輯分散到每一個role,測試起來會很是麻煩,最終,咱們決定添加bootstrap role,一旦啓動完成,就給node添加每一個role,而且將初始狀態傳遞給他們的構造函數。

class Bootstrap(Role):

    def __init__(self, node, peers, execute_fn,
                 replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
                 commander_cls=Commander, scout_cls=Scout):
        super(Bootstrap, self).__init__(node)
        self.execute_fn = execute_fn
        self.peers = peers
        self.peers_cycle = itertools.cycle(peers)
        self.replica_cls = replica_cls
        self.acceptor_cls = acceptor_cls
        self.leader_cls = leader_cls
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls

    def start(self):
        self.join()

    def join(self):
        self.node.send([next(self.peers_cycle)], Join())
        self.set_timer(JOIN_RETRANSMIT, self.join)

    def do_Welcome(self, sender, state, slot, decisions):
        self.acceptor_cls(self.node)
        self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
                         state=state, slot=slot, decisions=decisions)
        self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
                        scout_cls=self.scout_cls).start()
        self.stop()

參考:

  • http://aosabook.org/en/500L/clustering-by-consensus.html
  • https://lamport.azurewebsites.net/pubs/lamport-paxos.pdf
  • https://lamport.azurewebsites.net/pubs/paxos-simple.pdf
  • https://www.scs.stanford.edu/~dm/home/papers/paxos.pdf
  • https://www.researchgate.net/publication/221234235_Revisiting_the_Paxos_Algorithm
  • https://www.paxos.com/
  • https://www.cs.cornell.edu/courses/cs6410/2017fa/slides/20-p2p-gossip.pdf
  • https://en.wikipedia.org/wiki/Paxos_(computer_science)
  • https://ongardie.net/static/raft/userstudy/quizzes.html
  • https://zhuanlan.zhihu.com/p/130332285

 

點擊關注,第一時間瞭解華爲雲新鮮技術~

相關文章
相關標籤/搜索