歡迎你們前往騰訊雲社區,獲取更多騰訊海量技術實踐乾貨哦~php
做者:LBD git
瞭解分佈式系統的童鞋確定聽過Paxos算法的大名。Paxos算法以晦澀難懂著稱,其工程實現更難。目前,號稱在工程上實現了Paxos算法的應該只有Google、阿里和騰訊。然而,只有騰訊的微信團隊真正將代碼開源出來,他們將Paxos算法的實現封裝成了一個Paxos庫,你們能夠基於該庫實現本身想要的功能,好比用於master選舉,或者甚至利用它來實現一個分佈式KV數據庫等。github
以前就對Paxos很感興趣,可是一直沒看過實現的代碼,此次微信開源了PhxPaxos後終於有機會深刻地瞭解Paxos的實現細節。在這裏感謝微信團隊。感謝PhxPaxos的做者。讓咱們一塊兒來領略Paxos的魅力吧。算法
本次的源碼分析先從網絡部分開始。由於在分佈式系統中不可避免會涉及到不一樣節點以及相同節點上不一樣進程之間的通訊。所以網絡部分也是相當重要,因此就先把網絡單獨拿出來看,接下來再去看Paxos算法的實現部分。數據庫
源碼的include/phxpaxos目錄下是公共頭文件。include/phpaxos/network.h 是網絡模塊的抽象函數,若是用戶想使用本身的網絡協議,能夠經過重寫這些函數實現網絡模塊的自定義。api
咱們先來看下network.h的內容:數組
namespace phxpaxos { //You can use your own network to make paxos communicate. :) class Node; class NetWork { public: NetWork(); virtual ~NetWork() {} //Network must not send/recieve any message before paxoslib called this funtion. virtual void RunNetWork() = 0; //If paxoslib call this function, network need to stop receive any message. virtual void StopNetWork() = 0; virtual int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0; virtual int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0; //When receive a message, call this funtion. //This funtion is async, just enqueue an return. int OnReceiveMessage(const char * pcMessage, const int iMessageLen); private: friend class Node; Node * m_poNode; }; }
這幾個函數的做用從名字就能夠看出來。並且都是虛函數,即須要重寫這些函數。在PhxPaxos中,提供了一個默認的網絡模塊,就是繼承了NetWork類。該類的名字叫DFNetWork,DF應該就是default的縮寫了。以下:安全
namespace phxpaxos { class DFNetWork : public NetWork { public: DFNetWork(); virtual ~DFNetWork(); int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount); void RunNetWork(); void StopNetWork(); int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage); int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage); private: UDPRecv m_oUDPRecv; UDPSend m_oUDPSend; TcpIOThread m_oTcpIOThread; }; }
該類的私有成員裏有UDPRecv、UDPSend和TcpIOThread三個類的對象,這三個類分別用於接收UDP消息、發送UDP消息以及收發TCP消息。微信
Init方法就是將UDPRecv、UDPSend和TcpIOThread分別初始化:網絡
int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount) { //初始化UDPSend int ret = m_oUDPSend.Init(); if (ret != 0) { return ret; } //初始化UDPRecv ret = m_oUDPRecv.Init(iListenPort); if (ret != 0) { return ret; } //初始化TCP ret = m_oTcpIOThread.Init(sListenIp, iListenPort, iIOThreadCount); if (ret != 0) { PLErr("m_oTcpIOThread Init fail, ret %d", ret); return ret; } return 0; }
具體的初始化過程就是調用socket的api。以UDPRecv爲例,就是建立socket、設定端口、設置socket屬性(如端口可重用)最後綁定端口。以下:
int UDPRecv :: Init(const int iPort) { //建立socket,得到socket fd if ((m_iSockFD = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(iPort); //設定端口 addr.sin_addr.s_addr = htonl(INADDR_ANY); int enable = 1; //設定socket屬性,端口可重用 setsockopt(m_iSockFD, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); //綁定,用於監聽 if (bind(m_iSockFD, (struct sockaddr *)&addr, sizeof(addr)) < 0) { return -1; } return 0; }
RunNetWork就是將UDPRecv、UDPSend和TcpIOThread分別運行起來:
void DFNetWork :: RunNetWork() { //UDPSend和UDPRecv都是調用Thread的start方法 m_oUDPSend.start(); m_oUDPRecv.start(); //TCP的Start是封裝過的 m_oTcpIOThread.Start(); }
TcpIOThread的Start()實際執行的代碼以下,分別啓動了TcpAcceptor、TcpWrite和TcpRead:
void TcpIOThread :: Start() { m_oTcpAcceptor.start(); for (auto & poTcpWrite : m_vecTcpWrite) { poTcpWrite->start(); } for (auto & poTcpRead : m_vecTcpRead) { poTcpRead->start(); } m_bIsStarted = true; }
StopNetWork就是將UDPRecv、UDPSend和TcpIOThread中止。
SendMessageTCP就是將消息用TCP發送:
int DFNetWork :: SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) { return m_oTcpIOThread.AddMessage(iGroupIdx, sIp, iPort, sMessage); }
SendMessageUDP就是將消息用UDP發送:
int DFNetWork :: SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) { return m_oUDPSend.AddMessage(sIp, iPort, sMessage); }
前面SendMessageUDP調用了m_oUDPSend.AddMessage。這裏的UDPSend維護了一個發送隊列,以下:
Queue<QueueData *> m_oSendQueue;
m_oUDPSend.AddMessage就是將消息加入到UDP的m_oSendQueue中。
而後UDPSend在run方法中一直循環將m_oSendQueue中的消息發送出去:
void UDPSend :: run() { m_bIsStarted = true; while(true) { QueueData * poData = nullptr; //同步,線程安全 m_oSendQueue.lock(); bool bSucc = m_oSendQueue.peek(poData, 1000); if (bSucc) { //取出隊頭消息 m_oSendQueue.pop(); } m_oSendQueue.unlock(); if (poData != nullptr) { //將消息發送出去 SendMessage(poData->m_sIP, poData->m_iPort, poData->m_sMessage); delete poData; } if (m_bIsEnd) { PLHead("UDPSend [END]"); return; } } }
所以UDPSend就是把消息加入到消息隊列,而後循環將消息隊列裏的消息發送出去。
接下來看看UDPRecv。UDPRecv的初始化前面已經看過了,就是簡單的得到socket fd,設定sockaddr_in,設置socket屬性最後將socket fd和sockaddr_in綁定用於監聽。
主要來看看UDPRecv的run方法。這裏主要用了I/O多路複用中的poll,註冊了一個pollfd,該pollfd的fd即以前建立的綁定了端口的socket fd,events爲POLLIN,表示監聽數據可讀事件,若是有數據可讀了,則調用recvfrom讀入數據。最後調用OnReceiveMessage將消息添加到當前instance的IoLoop中:
void UDPRecv :: run() { m_bIsStarted = true; char sBuffer[65536] = {0}; struct sockaddr_in addr; socklen_t addr_len = sizeof(struct sockaddr_in); memset(&addr, 0, sizeof(addr)); while(true) { if (m_bIsEnd) { PLHead("UDPRecv [END]"); return; } struct pollfd fd; int ret; fd.fd = m_iSockFD; //註冊POLLIN事件 fd.events = POLLIN; //調用poll檢查是否有數據可讀 ret = poll(&fd, 1, 500); if (ret == 0 || ret == -1) { continue; } //將接收到的數據放入sBuffer中 int iRecvLen = recvfrom(m_iSockFD, sBuffer, sizeof(sBuffer), 0, (struct sockaddr *)&addr, &addr_len); BP->GetNetworkBP()->UDPReceive(iRecvLen); if (iRecvLen > 0) { //這裏會依次調用Node和Instance的OnReceiveMessage方法,最後將消息加入到Instance的IoLoop中 m_poDFNetWork->OnReceiveMessage(sBuffer, iRecvLen); } } }
接下來看看收發TCP消息的TcpIOThread:
class TcpIOThread { public: TcpIOThread(NetWork * poNetWork); ~TcpIOThread(); //用於初始化TcpAcceptor以及iIOThreadCount個m_vecTcpRead和m_vecTcpWrite int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount); //啓動TcpAcceptor用於監聽以及全部的m_vecTcpRead和m_vecTcpWrite用於讀寫消息 void Start(); //中止TcpAcceptor和全部的m_vecTcpRead及m_vecTcpWrite void Stop(); //將消息加入到特定TcpWrite的消息隊列中 int AddMessage(const int iGroupIdx, const std::string & sIP, const int iPort, const std::string & sMessage); private: NetWork * m_poNetWork; TcpAcceptor m_oTcpAcceptor; std::vector<TcpRead *> m_vecTcpRead; std::vector<TcpWrite *> m_vecTcpWrite; bool m_bIsStarted; };
TcpRead相似於前面講的UDPRecv,TcpWrite相似於於UDPSend。嚴格來說,TcpAcceptor + TcpRead纔是UDPRecv。這裏把TcpAcceptor單獨抽出來,專門用於監聽鏈接請求並創建鏈接。TcpRead只須要負責讀消息就行。
咱們來看看TcpAcceptor:
class TcpAcceptor : public Thread { public: TcpAcceptor(); ~TcpAcceptor(); //監聽端口 void Listen(const std::string & sListenIP, const int iListenPort); //一直while循環,監聽鏈接事件並創建鏈接得到fd,而後添加事件到EventLoop中 void run(); void Stop(); void AddEventLoop(EventLoop * poEventLoop); void AddEvent(int iFD, SocketAddress oAddr); private: //服務端的socket,用於監聽 ServerSocket m_oSocket; std::vector<EventLoop *> m_vecEventLoop; private: bool m_bIsEnd; bool m_bIsStarted; };
這裏主要來看下run方法:
void TcpAcceptor :: run() { m_bIsStarted = true; PLHead("start accept..."); m_oSocket.setAcceptTimeout(500); m_oSocket.setNonBlocking(true); while (true) { struct pollfd pfd; int ret; pfd.fd = m_oSocket.getSocketHandle(); //註冊事件 pfd.events = POLLIN; //等待事件到來 ret = poll(&pfd, 1, 500); if (ret != 0 && ret != -1) { SocketAddress oAddr; int fd = -1; try { //創建鏈接,得到fd。這裏的acceptfd對accept進行了簡單的封裝 fd = m_oSocket.acceptfd(&oAddr); } catch(...) { fd = -1; } if (fd >= 0) { BP->GetNetworkBP()->TcpAcceptFd(); PLImp("accepted!, fd %d ip %s port %d", fd, oAddr.getHost().c_str(), oAddr.getPort()); //添加事件 AddEvent(fd, oAddr); } } if (m_bIsEnd) { PLHead("TCP.Acceptor [END]"); return; } } }
再看看AddEvent方法:
void TcpAcceptor :: AddEvent(int iFD, SocketAddress oAddr) { EventLoop * poMinActiveEventLoop = nullptr; int iMinActiveEventCount = 1 << 30; for (auto & poEventLoop : m_vecEventLoop) { int iActiveCount = poEventLoop->GetActiveEventCount(); if (iActiveCount < iMinActiveEventCount) { iMinActiveEventCount = iActiveCount; poMinActiveEventLoop = poEventLoop; } } oAddr.getPort()); poMinActiveEventLoop->AddEvent(iFD, oAddr); }
即找到活躍數最少的EventLoop,將事件添加到該EventLoop中。這裏應該是爲了負載均衡,防止有些線程工做量很大,有些則很空閒。
具體EventLoop的AddEvent就是將事件加入到FDQueue中,以下:
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr) { std::lock_guard<std::mutex> oLockGuard(m_oMutex); m_oFDQueue.push(make_pair(iFD, oAddr)); }
到這裏TcpAcceptor的做用及實現基本就很清晰了。
先來看看TcpRead類的定義:
class TcpRead : public Thread { public: TcpRead(NetWork * poNetWork); ~TcpRead(); int Init(); void run(); void Stop(); EventLoop * GetEventLoop(); private: EventLoop m_oEventLoop; };
這裏的成員變量是一個EventLoop對象。經過源碼發現,Init、run、Stop方法其實都是調用了m_oEventLoop相應的方法,以下:
int TcpRead :: Init() { return m_oEventLoop.Init(20480); } void TcpRead :: run() { m_oEventLoop.StartLoop(); } void TcpRead :: Stop() { m_oEventLoop.Stop(); join(); PLHead("TcpReadThread [END]"); }
所以主要來看下EventLoop。
首先說下Event。PhxPaxos在TCP這塊主要用了I/O多路複用中的epoll。這裏主要將數據和通知等都封裝成Event,而後由TcpWrite和TcpRead的EventLoop去執行。PhxPaxos中的Event包含兩個子類,分別是MessageEvent和Notify。其中MessageEvent主要用於數據的讀寫;而Notify主要用於通知事件發生。這裏的Notify基於管道pipe和EPOLLIN事件來實現,能夠經過Notify的Init方法看出:
int Notify :: Init() { //m_iPipeFD是一個長度爲2的int數組,用於存放管道兩端的socket fd int ret = pipe(m_iPipeFD); if (ret != 0) { PLErr("create pipe fail, ret %d", ret); return ret; } fcntl(m_iPipeFD[0], F_SETFL, O_NONBLOCK); fcntl(m_iPipeFD[1], F_SETFL, O_NONBLOCK); AddEvent(EPOLLIN); return 0; }
繼續回到EventLoop。首先看下EventLoop的Init方法:
int EventLoop :: Init(const int iEpollLength) { //建立epoll句柄,iEpollLength爲監聽的fd數 m_iEpollFd = epoll_create(iEpollLength); if (m_iEpollFd == -1) { PLErr("epoll_create fail, ret %d", m_iEpollFd); return -1; } m_poNotify = new Notify(this); assert(m_poNotify != nullptr); //初始化Notify:建立pipe,設置m_iPipeFD並添加EPOLLIN事件 int ret = m_poNotify->Init(); if (ret != 0) { return ret; } return 0; }
接着來看下最重要的StartLoop:
void EventLoop :: StartLoop() { m_bIsEnd = false; while(true) { BP->GetNetworkBP()->TcpEpollLoop(); int iNextTimeout = 1000; DealwithTimeout(iNextTimeout); //PLHead("nexttimeout %d", iNextTimeout); OneLoop(iNextTimeout); CreateEvent(); if (m_poTcpClient != nullptr) { m_poTcpClient->DealWithWrite(); } if (m_bIsEnd) { PLHead("TCP.EventLoop [END]"); break; } } }
主循環是OneLoop:
void EventLoop :: OneLoop(const int iTimeoutMs) { //調用epoll_wait等待事件發生 int n = epoll_wait(m_iEpollFd, m_EpollEvents, MAX_EVENTS, 1); if (n == -1) { if (errno != EINTR) { PLErr("epoll_wait fail, errno %d", errno); return; } } //逐一處理髮生的epoll事件 for (int i = 0; i < n; i++) { int iFd = m_EpollEvents[i].data.fd; auto it = m_mapEvent.find(iFd); if (it == end(m_mapEvent)) { continue; } int iEvents = m_EpollEvents[i].events; Event * poEvent = it->second.m_poEvent; int ret = 0; if (iEvents & EPOLLERR) { OnError(iEvents, poEvent); continue; } try { //若是是EPOLLIN事件,代表由數據可讀,則調用poEvent的OnRead方法處理 if (iEvents & EPOLLIN) { ret = poEvent->OnRead(); } //若是是EPOLLOUT事件,代表由數據可寫,則調用poEvent的OnWrite方法處理 if (iEvents & EPOLLOUT) { ret = poEvent->OnWrite(); } } catch (...) { ret = -1; } if (ret != 0) { OnError(iEvents, poEvent); } } }
其餘具體的細節這裏就再也不贅述了,有興趣的能夠本身去看看源碼。
看完了TcpRead,再來看看TcpWrite。首先仍是看它的定義:
class TcpWrite : public Thread { public: TcpWrite(NetWork * poNetWork); ~TcpWrite(); int Init(); void run(); void Stop(); int AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage); private: TcpClient m_oTcpClient; EventLoop m_oEventLoop; };
Init、run、Stop跟TcpRead中對應方法的做用一致。AddMessage則是調用了m_oTcpClient的AddMessage方法。發現TcpWrite的成員變量比TcpRead多了一個TcpClient對象,所以主要來看看這個TcpClient是幹嗎的。
剛剛說TcpWrite的AddMessage調用了m_oTcpClient的AddMessage方法。在m_oTcpClient的AddMessage方法中,則是先建立了一個指向MessageEvent對象的指針poEvent,而後再調用poEvent的AddMessage方法:
int TcpClient :: AddMessage(const std::string & sIP, const int iPort, const std::string & sMessage) { //PLImp("ok"); MessageEvent * poEvent = GetEvent(sIP, iPort); if (poEvent == nullptr) { PLErr("no event created for this ip %s port %d", sIP.c_str(), iPort); return -1; } return poEvent->AddMessage(sMessage); }
所以繼續看看MessageEvent的AddMessage方法:
int MessageEvent :: AddMessage(const std::string & sMessage) { m_llLastActiveTime = Time::GetSteadyClockMS(); std::unique_lock<std::mutex> oLock(m_oMutex); if ((int)m_oInQueue.size() > TCP_QUEUE_MAXLEN) { BP->GetNetworkBP()->TcpQueueFull(); //PLErr("queue length %d too long, can't enqueue", m_oInQueue.size()); return -2; } if (m_iQueueMemSize > MAX_QUEUE_MEM_SIZE) { //PLErr("queue memsize %d too large, can't enqueue", m_iQueueMemSize); return -2; } QueueData tData; //將消息封裝成QueueData後放入隊列 tData.llEnqueueAbsTime = Time::GetSteadyClockMS(); tData.psValue = new string(sMessage); m_oInQueue.push(tData); m_iQueueMemSize += sMessage.size(); oLock.unlock(); //退出EpollWait,實際是調用SendNotify發送了一個通知 JumpoutEpollWait(); return 0; }
能夠看到這裏將消息加上入隊時間後封裝成一個QueueDate,而後放入m_oInQueue隊列中。最後調用EventLoop的SendNotify發送了一個通知(利用以前建立的pipe)退出EpollWait。
說完了消息怎麼入隊,那消息是怎麼發送出去的呢?
這裏主要涉及到MessageEvent的OnWrite函數:
int MessageEvent :: OnWrite() { int ret = 0; //只要發送隊列不爲空或者還有上次未發送完的數據,就調用DoOnWrite執行真正的發送操做 while (!m_oInQueue.empty() || m_iLeftWriteLen > 0) { ret = DoOnWrite(); if (ret != 0 && ret != 1) { return ret; } else if (ret == 1) { //need break, wait next write return 0; } } WriteDone(); return 0; }
DoOnWrite:
int MessageEvent :: DoOnWrite() { //上一次的消息還未發送完畢,將剩下的發送完 if (m_iLeftWriteLen > 0) { return WriteLeft(); } m_oMutex.lock(); if (m_oInQueue.empty()) { m_oMutex.unlock(); return 0; } //從隊列中取出一條新消息,準備發送 QueueData tData = m_oInQueue.front(); m_oInQueue.pop(); m_iQueueMemSize -= tData.psValue->size(); m_oMutex.unlock(); std::string * poMessage = tData.psValue; //若是該消息入隊過久沒有被處理,則拋棄,不發送 uint64_t llNowTime = Time::GetSteadyClockMS(); int iDelayMs = llNowTime > tData.llEnqueueAbsTime ? (int)(llNowTime - tData.llEnqueueAbsTime) : 0; BP->GetNetworkBP()->TcpOutQueue(iDelayMs); if (iDelayMs > TCP_OUTQUEUE_DROP_TIMEMS) { //PLErr("drop request because enqueue timeout, nowtime %lu unqueuetime %lu", //llNowTime, tData.llEnqueueAbsTime); delete poMessage; return 0; } //計算髮送緩衝區長度,須要加上4字節用於表示消息長度 int iBuffLen = poMessage->size(); int niBuffLen = htonl(iBuffLen + 4); int iLen = iBuffLen + 4; //申請緩衝區 m_oWriteCacheBuffer.Ready(iLen); //將消息長度及消息內容拷貝到緩衝區 memcpy(m_oWriteCacheBuffer.GetPtr(), &niBuffLen, 4); memcpy(m_oWriteCacheBuffer.GetPtr() + 4, poMessage->c_str(), iBuffLen); m_iLeftWriteLen = iLen; m_iLastWritePos = 0; delete poMessage; //PLImp("write len %d ip %s port %d", iLen, m_oAddr.getHost().c_str(), m_oAddr.getPort()); //開始發送消息,有可能消息太大一次發送不完 int iWriteLen = m_oSocket.send(m_oWriteCacheBuffer.GetPtr(), iLen); if (iWriteLen < 0) { PLErr("fail, write len %d ip %s port %d", iWriteLen, m_oAddr.getHost().c_str(), m_oAddr.getPort()); return -1; } //須要下次再發送 if (iWriteLen == 0) { //need wait next write AddEvent(EPOLLOUT); return 1; } //PLImp("real write len %d", iWriteLen); //發送成功 if (iWriteLen == iLen) { m_iLeftWriteLen = 0; m_iLastWritePos = 0; //write done } //沒有一次性所有發送完,剩下的須要下次發送 else if (iWriteLen < iLen) { //m_iLastWritePos和m_iLeftWriteLen分別用來表示上次寫的位置以及剩下須要發送的長度 m_iLastWritePos = iWriteLen; m_iLeftWriteLen = iLen - iWriteLen; PLImp("write buflen %d smaller than expectlen %d", iWriteLen, iLen); } else { PLErr("write buflen %d large than expectlen %d", iWriteLen, iLen); } return 0; }
先介紹這麼多吧,接下去會有更多相關的文章,特別是PhxPaxos中實現Paxos算法的那部分,相信看過Paxos相關論文的童鞋會對這塊很感興趣。
最後,附上PhxPaxos源碼的地址:https://github.com/Tencent/phxpaxos
可進入個人博客查看原文
歡迎關注公衆號: FullStackPlan 獲取更多幹貨
基於騰訊開源 Angel 的 LDA* 入選國際頂級學術會議 VLDB
此文已由做者受權騰訊雲技術社區發佈,轉載請註明原文出處
原文連接:https://cloud.tencent.com/community/article/363266