本文記錄筆者學習和理解區塊鏈共識算法Paxos的點滴,文章比較長,須要耐心來細細琢磨,筆者也是苦戰了一個週末纔對此有那麼一點初步瞭解,有問題的地方請不吝斧正!html
1.初始是閱讀本文後續內容的基礎,概念性的東西敘述很少,乾貨乾貨乾貨在後面的代碼實戰。但有提供我認爲優秀的帖子以供參考理解。前面這些Paxos基本的理解是2.代碼設計和3.實戰流程的基礎!git
Paxos 問題是指分佈式的系統中存在故障(fault),但不存在惡意(corrupt)節點場景(即 可能消息丟失或重複,但無錯誤消息)下的共識達成(Consensus)問題。github
Paxos最先是 Leslie Lamport 用 Paxon 島的故事模型來進行描述而命名。故事背景是古希臘 Paxon 島上的多個法官在一個大廳內對一個議案進行表決,如何達成統一 的結果。他們之間經過服務人員來傳遞紙條,但法官可能離開或進入大廳,服務人員可能偷 懶去睡覺。算法
Paxos 是第一個被證實的共識算法,其原理基於兩階段提交併進行擴展。 做爲如今共識算法設計的鼻祖,以最初論文的難懂(算法自己並不複雜)出名。算法中將節點分爲三種類型:編程
proposer:提出一個提案,等待你們批准爲結案。每每是客戶端擔任該角色;數組
acceptor:負責對提案進行投票。每每是服務端擔任該角色;通常須要至少3個且節點個數爲奇數,由於Paxos算法最終要產生一個大多數決策者都贊成的提議。bash
learner:被告知結案結果,並與之統一,不參與投票過程。可能爲客戶端或服務端。網絡
prepare 階段: 1. Proposer 選擇一個提案編號 n,而後向acceptor的某個超過半數的子成員發送編號爲n的 prepare 請求; 2. Acceptor 收到 prepare 消息後,若是提案的編號n大於該acceptor已經回覆的全部 prepare 請求的編號,則 Acceptor 將本身上次已經批准的最大編號提案回覆給 Proposer,並承諾再也不回覆小於 n 的提案;架構
commit階段: 1. 當一個 Proposer 收到了半數以上的 Acceptors 對 prepare 的回覆後,就進入批准階段。它要向回覆 prepare 請求的 Acceptors 發送 accept 請求,包括編號 n 和根據 prepare階段 決定的 value。這裏的value是全部響應中編號最大的提案的value(若是根據 prepare 沒有已經接受的 value,那麼它能夠自由決定 value)。 2. 在不違背本身向其餘 Proposer 的承諾的前提下,Acceptor 收到 accept 請求後即接受這個請求。即若是acceptor收到這個針對n提案的accept請求,只要該acceptor還沒有對編號大於n的prepare請求作出過響應,它就能夠經過這個提案。併發
Paxos算法初次接觸聽上去確實有點晦澀難懂,這裏有一篇貼子我以爲不錯。貼出來能夠參考:
另外,wiki對Paxos的描述也是比較不錯和權威的參考資料。
有了以上對Paxos算法的理解,咱們才能進行下一步:本身編程實現Paxos算法。
Paxos算法核心的兩個角色即是Proposer(提議者)和Acceptor(決策者),所以也必須圍繞這兩個對象進行算法架構的設計。
1.0 向全部Acceptor發出一個提議(proposal);
2.0 若是收到一個拒絕信息(reject),嘗試從新發送被拒絕的提議;
2.1 若是收到一個Acceptor的承諾迴應(agree),用一個標誌(agreeCount)來計數給了本身承諾的Acceptor個數。當agreeCount超過Acceptor總數的一半時,表示有大多數Acceptor承諾將接受這個提議,須要將本身的提議狀態置爲承諾接受狀態(agreed)。同時,還要通知其餘Proposer我這個提議已經獲得大多數Acceptor承諾會接受。
3.0 提議爲承諾接受狀態(agreed)時,Proposer須要再向Acceptor集合發送一個接受提議的確認請求,咱們稱該請求爲Accept請求。
3.1 發出Accept請求後會收到Acceptor的回覆,若是收到接受信息(Accept),用一個標誌(acceptCount)來計數接受本身提議的Acceptor個數。一樣當acceptCount超過半數時,表示大多數Acceptor接受了這個提議,須要將提議狀態由承諾接受狀態(agreed)置爲接受狀態(acceptd)。 同時,還要通知其餘Proposer我這個提議已經獲得大多數Acceptor接受。
以上1,2屬於Paxos算法的Prepare階段,3屬於Accept階段。
1.0 當Acceptor收到一個提議後,判斷提議版本號是否大於自身保存的提議版本。
1.0 若是小於自身表示曾經已經給過別的Proposer承諾,發送一個拒絕消息(reject),表示本身拒絕給當前Proposer任何承諾。
1.1 反之,則替換自身保存的提議版本號並給當前Proposer發送一個承諾迴應(agree),表示將承諾接受他的提議。同時,將自身狀態置爲已經給了某個Proposer承諾(agree)。
2.0 Acceptor收到一個Proposer的編號爲N的Accept請求,只要該Acceptor以前未曾承諾編號M(M>N)的其餘Proposer提議,那麼他就接受該提案。同時,將自身狀態置爲已接受某個Proposer提議,並通知全部Proposer這個消息。
以上1屬於Paxos算法的Prepare階段,2屬於Accept階段。
以上行爲分析針對本次Paxos算法編程實戰!!!
Paxos算法解決的是分佈式系統一致性的問題,咱們經過端口號在一臺計算機上模擬多個節點。
毋庸置疑,咱們分別須要一個Proposer類和Acceptor類。
Proposer的做用是提出一個提議併發送給Acceptor,因此他自己必須知道全部的Acceptor,同時有些時候要跟其餘Proposer通信,因此也須要知道全部的Proposer(見init方法)。
基本的開始結束接口(start, stop)
在判斷提議是否被大多數Acceptor承諾接受或最終接受,咱們須要設定一個斷定條件(getQuorumCount)
當提議被承諾接受或最終接受時須要通知其餘Proposer(notifyProposer)
發送消息(提議或Accept請求)給Acceptor(sendMsg);接收來自Acceptor的消息(recvMsg)
爲了方便調試,咱們可能須要知道整個過程請求提議的歷史記錄(getHistory)
本身的提議最終被Acceptor接受的個數(getNumAccepted)
清楚Paxos算法流程後,咱們發現假設有兩個Proposer依次提出編號遞增的提議,最終會陷入死循環使得Paxos算法沒法保證活性。因此,通常的作法是選取一個主Proposer做爲領導,只有領導才能提出提議(setPrimary)。
Proposer類的一個難點在於提議發出後的各類狀態轉變與對應數據的處理。從提議發出到提議被接受整個過程,提議的狀態是在不斷地變化,但最終總會到達一個終止態。對於這種狀況的處理,狀態機__註定是一個不錯的選擇。因爲這裏有點複雜咱們將提議功能單獨拿出來抽象爲一個Proposer的協議類__PaxoProposerProtocol。
-因爲各個節點收發消息是並行的,這裏對消息的檢測須要用到線程。這裏HeartbeatListener來監聽消息,HeartbeatSender用來發送消息。
""" @author: chaors @file: PaxoProposer.py @time: 2018/04/14 10:50 @desc: 提議者 """
class PaxoProposer:
#心跳監聽類
class HeartbeatListener(threading.Thread):
pass
#定時發送類
class HeartbeatSender(threading.Thread):
pass
#初始化
def __init__(self, port, proposers=None, acceptors=None):
pass
#開始
def start(self):
pass
#中止
def stop(self):
pass
#設置是否爲領導者
def setPrimary(self, isPrimary):
pass
#獲取支持全部提議者的決策者
def getGroup(self):
pass
#獲取全部提議者
def getProposers(self):
pass
#獲取全部決策者
def getAcceptors(self):
pass
#提議被承諾接受或最終接受的條件必須知足:得到1/2以上的Acceptor支持
def getQuorumCount(self):
pass
#獲取本地記錄數據
def getInstanceValue(self, instanceID):
pass
#獲取歷史記錄
def getHistory(self):
pass
#獲取提議贊成的數量
def getNumAccepted(self):
pass
#通知其餘提議者
def notifyProposer(self, protocol, msg):
pass
#新的提議
def newProposal(self, value, instance=None):
pass
# 發送消息
def sendMsg(self, msg):
pass
# 接收消息
def recvMsg(self, msg):
pass
複製代碼
用來提交一個提議,並用於提交提議後各類狀態的處理。
定義一些狀態來表示當前Proposoer提議的各類狀態
發起提議(propose)
狀態機處理(doTranition)
""" @author: chaors @file: PaxoProposerProtocol.py @time: 2018/04/14 10:50 @desc: 提議者協議 """
class PaxoProposerProtocol:
#常量
STATE_UNDEFIND = -1 #提議協議未定義
STATE_PROPOSED = 0 #提議類型
STATE_REJECTED = 1 #拒絕狀態 提議被拒絕
STATE_AGREED = 2 #提議被承諾接受 Prepare階段獲取大多數Acceptor承諾後的協議狀態
STATE_ACCEPTED = 3 #提議被接受
STATE_UNACCEPTED = 4 #提議未被拒絕
def __init__(self, proposer):
pass
#發起提議
def propose(self, value, pID, instanceID):
pass
#狀態過渡 根據狀態機運行
def doTranition(self, msg):
pass
複製代碼
決策者,對Proposer提出的提議和Accept請求作出迴應。
和Proposer相似的接口再也不贅述。
須要對比Proposer發來的提議版本(getHighestProposal)
""" @author: chaors @file: PaxoAcceptor.py @time: 2018/04/14 10:50 @desc: 決策者 """
class PaxoAcceptor:
def __init__(self, port, proposers):
pass
#開始
def start(self):
pass
#中止
def stop(self):
pass
#失敗
def fail(self):
pass
#恢復
def recover(self):
pass
#發送消息
def sendMsg(self, msg):
pass
#接收消息
def recvMsg(self, msg):
pass
#通知客戶端
def notifyClient(self, protocol, msg):
pass
#獲取本地記錄數據
def getInstanceValue(self, instanceID):
pass
#獲取最高贊成建議
def getHighestProposal(self, instanceID):
pass
複製代碼
決策者協議,用來處理Proposer提出的提議,並一樣使用狀態機來處理自身各類狀態。
""" @author: chaors @file: PaxoAcceptorProtocol.py @time: 2018/04/14 10:50 @desc: 決策者協議 """
from Message import Message #協議依賴消息
class PaxoAcceptorProtocol:
#常量
STATE_UNDEFIND = -1 #協議未定義
STATE_PROPOSAL_RECEIVED = 0 #收到消息
STATE_PROPOSAL_REJECTED = 1 #拒絕連接,網絡不通可能
STATE_PROPOSAL_AGREED = 2 #承諾將接受該提議 針對Proposer的PROPOSED請求
STATE_PROPOSAL_ACCEPTED = 3 #接受該協議 針對Proposer的Accept請求
STATE_PROPOSAL_UNACCEPTED = 4 #拒絕請求
def __init__(self, client):
pass
#收到提議
def recvProposal(self, msg):
pass
#過渡
def doTranition(self, msg):
pass
#通知客戶端
def notifyClient(self, msg):
pass
複製代碼
Proposer和Acceptor的角色都有了,還差一個他們之間傳遞的消息類。這個消息有如下幾種:
""" @author: chaors @file: Message.py @time: 2018/04/14 09:31 @desc: 消息傳遞類 """
class Message:
#常量
MSG_ACCEPTOR_AGREE = 0 #Acceptor對提議請求的承諾
MSG_ACCEPTOR_ACCEPT = 1 #Acceptor對Accept請求的接受
MSG_ACCEPTOR_REJECT = 2 #Acceptor對提議請求的拒絕
MSG_ACCEPTOR_UNACCEPT = 3 #Acceptor對Accept請求的不接受
MSG_ACCEPT = 4 #Proposer發出的Accept請求
MSG_PROPOSE = 5 #Proposer發出的提議請求
MSG_EXT_PROPOSE = 6 #外部(Client)發給Proposer的提議
MSG_HEARTBEAT = 7 #定時的心跳信息,用來同步提議
def __init__(self, cmd=None): #消息初始化有個狀態
pass
#對某個消息的回覆消息
def copyAsReply(self, msg):
pass
複製代碼
提議被抽象在協議裏,在系統達到一致性以前,Proposer可能嘗試提交屢次協議信息(包含提議)。在Proposer和Acceptor之間都須要保存全部的提議記錄,因此二者都有一個InstanceRecord實例數組。
對於Proposer,InstanceRecord實例數組保存的是提交過的全部提議記錄,而且會隨着提議狀態的改變動新記錄狀態(包括協議和記錄的值)的值。
對於Acceptor,InstanceRecord實例數組保存的是Acceptor接收的Proposer提議請求,並隨着提議版本的改變而更新。Acceptor給出承諾(agree)的條件是提議版本大於當前InstanceRecord裏的協議版本;Acceptor接受提議(accept)的條件是當前Accept請求版本號比以前給出承諾的的提議版本號大。
""" @author: chaors @file: InstanceRecord.py @time: 2018/04/14 10:31 @desc: 本地記錄類,記錄決策者,提議者之間協議 """
import threading, socket, pickle, queue,random
# InstanceRecord本地記錄類,決策者,提議者之間協議
from PaxoProposerProtocol import PaxoProposerProtocol
class InstanceRecord():
def __init__(self):
self.protocols = {} #協議字典
self.highestID = (-1, -1) #最高版本(提議版本,端口號)
self.value = None #提議值
#增長協議
def addProtocol(self, protocol):
self.protocols[protocol.proposalID] = protocol
#取得版本最高的協議 假設端口較大的Proposer爲領導,優先承諾 端口相同時取版本號較大的
if protocol.proposalID[1] > self.highestID[1] or \
(protocol.proposalID[1] == self.highestID[1] \
and protocol.proposalID[0] > self.highestID[0]):
self.highestID = protocol.proposalID
#抓取協議
def getProtocol(self, protocolID):
return self.protocols[protocolID]
#清理協議
def cleanProtocols(self):
keys = self.protocols.keys() #取得全部能夠
#遍歷刪除協議
for key in keys:
protocol = self.protocols[key]
if protocol.state == PaxoProposerProtocol.STATE_ACCEPTED:
print("Deleting protocol")
del self.protocols[key] #刪除協議
複製代碼
消息的結構是有了,可是它是怎麼在節點(Proposer和Acceptor)之間傳遞的呢。這裏咱們封裝一個基於Socket傳遞消息的網絡類。這裏接收消息須要藉助一個線程,咱們在構造一個接收消息的輔助類。
這裏的只是不屬於Paxos算法重點,就不贅述了。直接上代碼。
""" @author: chaors @file: MessagePump.py @time: 2018/04/14 09:46 @desc: 基於Socket傳遞消息,封裝網絡類傳遞消息 """
import threading #線程
import pickle #對象序列化
import socket #網絡信息傳輸
import queue #隊列
class MessagePump(threading.Thread):
# 傳遞消息的輔助類
class MPHelper(threading.Thread):
def __init__(self, owner):
self.owner = owner #傳遞消息的對象的全部者
threading.Thread.__init__(self) # 父類初始化
def run(self): #運行
while not self.owner.abort: #只要全部者線程未結束
try:
#返回二進制數據,地址
(bytes, addr) = self.owner.socket.recvfrom(2048) #收取消息
msg = pickle.loads(bytes) #讀取二進制轉化爲消息
msg.source = addr[1] #取出返回的地址
self.owner.queue.put(msg) #消息存入隊列
except Exception as e: #異常
print(e)
def __init__(self, owner, port, timeout=2):
#基本參數初始化
self.owner = owner #全部者
self.timeout = 2 #超時時間
self.port = port #網絡接口
#網絡通訊初始化
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #UDP通訊
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) #通訊參數
self.socket.bind(("localhost", port)) #socket綁定
self.socket.settimeout(timeout) #設置超時
self.queue = queue.Queue() #隊列
self.helper = MessagePump.MPHelper(self) #接收消息的工具類
threading.Thread.__init__(self) #父類初始化
self.abort = False #默認不終止狀態
#運行主線程
def run(self):
self.helper.start() #開啓收消息的線程
while not self.abort: #只要不是終止狀態
msg = self.waitForMsg() #阻塞等待消息
self.owner.recvMsg(msg) #收取消息
#等待消息
def waitForMsg(self):
try:
msg = self.queue.get(True, 3) #從隊列中取消息,最多等3s
return msg
except Exception as e:
print(e)
return None
#發送消息
def sendMsg(self, msg):
bytes = pickle.dumps(msg) #把消息轉成二進制
addr = ("localhost", msg.to)
self.socket.sendto(bytes, addr) #發送消息到地址
return True
#設置狀態爲放棄
def doAbort(self):
self.abort = True
複製代碼
""" @author: chaors @file: paxo_testMain.py @time: 2018/04/14 17:50 @desc: Paxos算法測試用例 """
import threading, socket, pickle, queue,random
import time
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxoProposer import PaxoProposer
from PaxoProposerProtocol import PaxoProposerProtocol
from PaxoAcceptorProtocol import PaxoAcceptorProtocol
from PaxoAcceptor import PaxoAcceptor
if __name__ == '__main__':
#Acceptor數量
numclients = 5
#實例化決策者數組,決策者節點端口號爲65520-65525
acceptors = [PaxoAcceptor(port, [56321, 56322]) for port in range(65520, 65520 + numclients)]
#實例化提議者,端口號分別56321,56322 對應的決策者爲acceptors
proposer1 = PaxoProposer(56321, [56321, 56322], [acceptor.port for acceptor in acceptors])
proposer2 = PaxoProposer(56322, [56321, 56322], [acceptor.port for acceptor in acceptors])
#啓動提議者提議程序
proposer1.start()
proposer1.setPrimary(True)
proposer2.setPrimary(True)
proposer2.start()
#啓動決策者決策程序
for acceptor in acceptors:
acceptor.start()
#模擬網絡中兩個節點宕機
acceptors[0].fail()
acceptors[1].fail()
#利用Socket機制發送提議給決策者
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
start = time.time()
for i in range(1000):
m = Message(Message.MSG_EXT_PROPOSE)
m.value = 0 + i
m.to = 56322
bytes = pickle.dumps(m)
s.sendto(bytes, ("localhost", m.to))
# if i == 2 or i == 30:
# print(leader2.getInstanceValue(1))
#當提議被999個決策者接受時結束整個提議程序
while proposer1.getNumAccepted() < 999:
print(u"休眠1秒--被接受: %d" % proposer1.getNumAccepted())
time.sleep(1)
end = time.time()
print(u"休眠10秒")
time.sleep(10)
print(u"結束領導者")
proposer1.stop()
proposer2.stop()
print(u"結束客戶端")
for acceptor in acceptors:
acceptor.stop()
print(u"領導者1 歷史記錄: %s" % proposer1.getHistory())
print(u"領導者 2 歷史記錄: %s " % proposer2.getHistory())
print(u"一共用了%d 秒" % (end - start))
複製代碼
上面已經完成了基本代碼的架構,詳細源碼稍後我會上傳到github。 接下來,咱們經過一個簡單的測試用例來再一次更深刻地從代碼層面理解Paxos算法的處理邏輯。
咱們運行paxo_testMain代碼,事先我在關鍵步驟處都打了斷點。這樣就能夠完整地從代碼角度看一次Paxos算法兩個階段的運行,也能直觀地觀察到各個步驟的代碼處理邏輯。
#####1.x 對應Paxos算法Prepare階段 #####2.x 對應Paxos算法Commit階段
#總結 以上,咱們就從代碼層面對PAxos算法有一個更深刻的瞭解,我想根據代碼再反過來理解PAxos算法,勢必會有一個更深入的印象。
剛開始據說Paxos也是好幾臉懵逼,也是鏖戰一個週末纔有這麼點體悟。還在學習區塊鏈的小小白起步中,寫這篇帖子也是記錄下本身學習的過程。勉之。
碼農:
擇而爲農
請碼好你的碼
一碼不調
何以碼天下
互聯網顛覆世界,區塊鏈顛覆互聯網!
--------------------------------------------------20180416 00:16