分佈式協議學習筆記(三) Raft 選舉自編寫代碼練習

因爲時間安排上的緣由,此次的代碼寫的稍微有些簡略,只能算是本身對RAFT協議的一個鞏固。node

實現定義2個節點,使用讀取配置文件來獲取IP和端口以及節點IDios

網絡使用boost同步流程 一個線程收 一個線程發送json

1 收的線程根據接受的數據 判斷是心跳包仍是選舉請求仍是選舉請求回覆  來更新本身的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了本身的選舉票map<int,int> // nodeid, term網絡

2 發送的節點每一個200MS則輪詢一次,根據結點當前狀態減小等待時間(等待時間根據節點狀態調節爲1000ms心跳間隔或者1500-5000的隨機選舉超時)數據結構

 

現運行看看效果dom

咱們須要兩個節點 因此須要將exe和配置文件放入不一樣的文件夾 如圖socket

啓動程序tcp

1 初始兩個節點都是follower狀態 等待leader發送心跳 ide

2 因爲目前無leader 因此兩個節點其中之一在隨機的超時時間後,發起選舉投票oop

 

3 以後節點1 成爲leader,以1秒(1000ms)的間隔發送心跳包 進入正常狀態

 

4 在狀態3 的狀況下 關閉folloer狀態的節點2 對狀況並沒有影響。leader節點1 會持續嘗試鏈接follower節點2

5 節點2 再次鏈接 因爲leader節點1 發送的心跳term爲1  大於新啓動節點2的初始化term 0 。因此節點2 會立刻成爲follower ,接受leader節點1的心跳

 

6 在狀態3 的狀況下,若是關閉的是leader節點1,節點2 在一段時候未接受到心跳後,就會廣播選舉請求,請求本身成爲leader,可是因爲沒有節點與節點2的投票一致,也沒有其餘的節點選舉投票,節點2將持續嘗試選舉本身成爲leader

 

7 節點1上線後,贊成節點2的選舉請求,節點2接收超過半數以上的投票,成爲leader。開始以1秒間隔發送心跳包。

代碼以下

基礎結構體:

 1 const enum STATUS {
 2     LEADER_STATUS = 1,
 3     FOLLOWER_STATUS,
 4     CANDIDATE_STATUS,
 5     PRE_VOTE_STAUS,
 6 };
 7 
 8 const enum INFOTYPE {
 9     DEFAULT_TYPE = 0,
10     HEART_BREAT_TYPE,
11     VOTE_LEADER_TYPE,
12     VOTE_LEADER_RESP_TYPE,
13 
14 };
15 
16 typedef struct netInfo {
17     int fromID;
18     int toID;
19     INFOTYPE infotype;
20     int    term;
21     int voteId;    //選舉ID infotype爲votetype纔有效
22 }NetInfo;
23 
24 typedef struct locaInfo {
25     int    id;
26     int leaderID;
27     STATUS status;
28     int term;
29     int isVote;
30     int IsRecvHeartbeat;
31     int electionTimeout;
32     std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取本身一票
33 }LocalInfo;
34 
35 typedef struct localInfoWithLock {
36     LocalInfo    locInfo;
37     std::mutex m;
38 }LocalInfoWithLock;
基本數據結構
  1 #include "RaftManager.h"
  2 #include "NetInfoHandler.h"
  3 #include "StatusHandler.h"
  4 #include <random>
  5 #include <functional>
  6 
  7 using namespace std;
  8 
  9 std::shared_ptr<RaftManager> RaftManager::p = nullptr;
 10 
 11 bool RaftManager::Init() {
 12     //能夠使用json 讀取配置
 13     ReadConfig cfg("nodeCfg");
 14     map<string, string> kv = cfg.Do();
 15 
 16     if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
 17         assert(0);
 18         return false;
 19     }
 20     ip = kv["ip"];  portStart = stoi(kv["portStart"]);  nodeID = stoi(kv["nodeID"]);
 21     heartbeatTime = 5000;
 22     if (kv.find("heartbeatTime") != kv.end())
 23         heartbeatTime = stoi(kv["heartbeatTime"]);
 24     
 25     locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0;
 26     locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0;
 27     locInfolock.locInfo.electionTimeout = 4000;
 28     locInfolock.locInfo.status = FOLLOWER_STATUS;
 29     locInfolock.locInfo.voteRecord.clear();
 30     
 31     std::random_device rd;
 32     std::default_random_engine engine(rd());
 33     std::uniform_int_distribution<> dis(5001, 9000);
 34     dice = std::bind(dis, engine);
 35     
 36     return true;
 37 }
 38 
 39 void RaftManager::SendFunc(int sendId) {
 40     std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
 41     tcp::resolver resolver(io_service);
 42     while (1) {
 43         int port = portStart+ sendId;
 44         
 45         try {
 46             boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", to_string(port) }));
 47         }
 48         catch (std::exception& e) {
 49             //std::cerr << e.what() << std::endl;
 50             continue;
 51             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
 52         }
 53         //============================================================
 54         netInfo netinfo;
 55         while (1) {
 56             q.Take(netinfo);
 57             boost::system::error_code ignored_error;
 58             boost::asio::write(*s, boost::asio::buffer(&netinfo, sizeof(netinfo)), ignored_error);
 59             if (ignored_error) {
 60                 std::cerr << boost::system::system_error(ignored_error).what() << std::endl;
 61                 break;
 62             }
 63             
 64             std::cout << "\n==========================================================>" << std::endl;
 65             std::cout << "Send netinfo" << std::endl;
 66             std::cout << "netinf.fromID = " << netinfo.fromID << std::endl;
 67             std::cout << "netinf.toID = " << netinfo.toID << std::endl;
 68             std::cout << "netinf.infotype = " << netinfo.infotype << std::endl;
 69             std::cout << "netinf.term = " << netinfo.term << std::endl;
 70             std::cout << "netinf.voteId = " << netinfo.voteId << std::endl << std::endl;
 71             std::cout << "<==========================================================" << std::endl;
 72         }
 73     }
 74     
 75 }
 76 
 77 
 78 void RaftManager::LoopCheck(LocalInfoWithLock& locInfolock) {
 79     int looptime = 200;
 80     StatusHandler handler;
 81     while (1) {
 82         handler.DiapatchByStatus(locInfolock,q);
 83         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
 84     }
 85 
 86     return;
 87 }
 88 
 89 void RaftManager::RecvNetInfo(tcp::socket sock) {
 90     BYTE data[1024] = { 0 };
 91     boost::system::error_code error;
 92     NetInfo netinf;
 93 
 94     for (;;) {
 95         size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
 96         if (error == boost::asio::error::eof)
 97             return; // Connection closed cleanly by peer.
 98         else if (error) {
 99             std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
100             return;
101         }
102         if (length != sizeof(netinf)) {
103             std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
104             continue;
105         }
106         
107         std::cout << "\n==========================================================>" << std::endl;
108         std::cout << "recv netinfo" << std::endl;
109         std::cout << "netinf.fromID = " << netinf.fromID << std::endl;
110         std::cout << "netinf.toID = " << netinf.toID << std::endl;
111         std::cout << "netinf.infotype = " << netinf.infotype << std::endl;
112         std::cout << "netinf.term = " << netinf.term << std::endl;
113         std::cout << "netinf.voteId = " << netinf.voteId << std::endl << std::endl;
114         std::cout << "<==========================================================" << std::endl;
115         
116         NetInfoHandler handler;
117         handler.DispatchByinfoType(netinf,q, locInfolock);
118     }
119 
120 }
121 
122 bool RaftManager::Go() {
123     if (ip == "" || portStart == 0 || nodeID == 0)
124         return false;
125     try {
126         for (int i = 1; i <= NODE_COUNT; i++) {
127             if (i != nodeID) {
128                 std::thread tsend = std::thread(&RaftManager::SendFunc, shared_from_this(),i);
129                 tsend.detach();
130             }
131         }
132 
133         std::thread tloop = std::thread(&RaftManager::LoopCheck, shared_from_this(), std::ref(locInfolock));
134         tloop.detach();
135 
136         int port = portStart + nodeID;
137         tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
138     
139         for (;;)
140         {
141             tcp::socket sock(io_service);
142             a.accept(sock);
143             std::cout << "accept\n";
144             std::thread(&RaftManager::RecvNetInfo, shared_from_this(), std::move(sock)).detach();
145         }
146 
147     }
148     catch (std::exception& e) {
149         std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
150         return false;
151     }
152 
153     return true;
154 }
讀取配置文件和開啓網絡鏈接的代碼
  1 #include "StatusHandler.h"
  2 #include "RaftManager.h"
  3 #include <iostream>
  4 
  5 
  6 void StatusHandler::DiapatchByStatus(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
  7     LocalInfo localInfo;
  8     //加鎖獲取當前狀態 決定是否進行發送操做
  9     {
 10         //加鎖獲取本地當前狀態
 11         std::lock_guard<std::mutex> lck(locInfolock.m);
 12         localInfo = locInfolock.locInfo;
 13     }
 14     
 15     switch (localInfo.status) {
 16     case LEADER_STATUS:
 17         HandleLeaderSend(locInfolock,q);
 18         break;
 19     case FOLLOWER_STATUS:
 20         HandleFollowerSend(locInfolock,q);
 21         break;
 22     case CANDIDATE_STATUS:
 23         HandleCandidateSend(locInfolock,q);
 24         break;
 25     default:
 26         std::cerr << "Unknown status!" << std::endl;
 27     }
 28 }
 29 
 30 void StatusHandler::HandleLeaderSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
 31     bool isSendheartbeat = false;
 32     int nodeid = 0;
 33     int term = 0;
 34 
 35     {
 36         std::lock_guard<std::mutex> lck(locInfolock.m);
 37         if (locInfolock.locInfo.electionTimeout > 0) {
 38             locInfolock.locInfo.electionTimeout -= 200;
 39         }
 40         //超過期間限制
 41         if (locInfolock.locInfo.electionTimeout <= 0 && locInfolock.locInfo.status == LEADER_STATUS) {
 42             isSendheartbeat = true;
 43             nodeid = locInfolock.locInfo.id;
 44             term = locInfolock.locInfo.term;
 45             locInfolock.locInfo.electionTimeout = 1000;
 46         }
 47     }
 48     if (isSendheartbeat) {
 49         for (int i = 1; i <= NODE_COUNT; i++) {
 50             if (i != nodeid) {
 51                 netInfo netinfo{ nodeid ,i,HEART_BREAT_TYPE ,term,0 };
 52                 q.Put(netinfo);
 53             }
 54         }
 55     }
 56 }
 57 
 58 
 59 void StatusHandler::HandleFollowerSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
 60     bool isSendVoteNetInfo = false;
 61     int nodeid = 0;
 62     int term = 0;
 63     //加鎖獲取本地當前狀態
 64     {
 65         //std::cout << "Enter " << __FUNCTION__ << std::endl;
 66         std::lock_guard<std::mutex> lck(locInfolock.m);
 67         if (locInfolock.locInfo.electionTimeout > 0) {
 68             locInfolock.locInfo.electionTimeout -= 200;
 69         }
 70         //超過期間限制
 71         if (locInfolock.locInfo.electionTimeout <= 0) {
 72             std::cout << "electionTimeout .change to CANDIDATE_STATUS" << std::endl;
 73             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
 74                 //心跳超時  切換到選舉模式
 75                 locInfolock.locInfo.term++;
 76                 locInfolock.locInfo.status = CANDIDATE_STATUS;
 77                 locInfolock.locInfo.voteRecord.clear();
 78                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
 79                     locInfolock.locInfo.term;
 80                 isSendVoteNetInfo = true;
 81                 term = locInfolock.locInfo.term;
 82                 nodeid = locInfolock.locInfo.id;
 83                 locInfolock.locInfo.electionTimeout = dice();
 84             }
 85             else {
 86                 locInfolock.locInfo.IsRecvHeartbeat = 0;
 87             }
 88         }
 89         else if ( (locInfolock.locInfo.electionTimeout > 0) && 
 90                 (locInfolock.locInfo.IsRecvHeartbeat == 1) && 
 91                 (locInfolock.locInfo.status == FOLLOWER_STATUS) )
 92         {
 93             std::cout << "Check hearbeat OK!!! Clear electionTimeout" << std::endl;
 94             locInfolock.locInfo.IsRecvHeartbeat = 0;
 95             locInfolock.locInfo.electionTimeout = dice();
 96         }
 97     }
 98 
 99     if (isSendVoteNetInfo) {
100         for (int i = 1; i <= NODE_COUNT; i++) {
101             if (i != nodeid) {
102                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
103                 q.Put(netinfo);
104             }
105         }
106     }
107     
108 }
109 
110 void StatusHandler::HandleCandidateSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
111     bool isSendVoteNetInfo = false;
112     int nodeid = 0;
113     int term = 0;
114     {
115         std::lock_guard<std::mutex> lck(locInfolock.m);
116         if (locInfolock.locInfo.electionTimeout > 0) {
117             locInfolock.locInfo.electionTimeout -= 200;
118         }
119         //超過期間限制
120         if (locInfolock.locInfo.electionTimeout <= 0) {
121             std::cout << "electionTimeout .CANDIDATE_STATUS too" << std::endl;
122             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
123                 //心跳超時  切換到選舉模式
124                 locInfolock.locInfo.term++;
125                 locInfolock.locInfo.status = CANDIDATE_STATUS;
126                 locInfolock.locInfo.voteRecord.clear();
127                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
128                     locInfolock.locInfo.term;
129             }
130             isSendVoteNetInfo = true;
131             term = locInfolock.locInfo.term;
132             nodeid = locInfolock.locInfo.id;
133             locInfolock.locInfo.electionTimeout = dice();
134         }
135     }
136 
137     if (isSendVoteNetInfo) {
138         for (int i = 1; i <= NODE_COUNT; i++) {
139             if (i != nodeid) {
140                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
141                 q.Put(netinfo);
142             }
143         }
144     }
145 }
每間隔200秒就進行狀態檢測切換,和超時發送回覆代碼
  1 #include "NetInfoHandler.h"
  2 #include "RaftManager.h"
  3 
  4 void NetInfoHandler::DispatchByinfoType(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
  5     {
  6         std::lock_guard<std::mutex> lck(locInfolock.m);
  7         if (netinf.term < locInfolock.locInfo.term)
  8             return;
  9         if (netinf.term > locInfolock.locInfo.term) {
 10             locInfolock.locInfo.term = netinf.term;
 11             locInfolock.locInfo.status = FOLLOWER_STATUS;
 12             locInfolock.locInfo.isVote = 0;
 13             locInfolock.locInfo.IsRecvHeartbeat = 0;
 14             locInfolock.locInfo.electionTimeout = dice();
 15             locInfolock.locInfo.voteRecord.clear();
 16         }
 17     }
 18     switch (netinf.infotype) {
 19     case HEART_BREAT_TYPE:
 20         HandleHeartBeatTypeRecv(netinf,q, locInfolock);
 21         break;
 22     case VOTE_LEADER_TYPE:
 23         HandleVoteTypeRecv(netinf,q, locInfolock);
 24         break;
 25     case VOTE_LEADER_RESP_TYPE:
 26         HandleVoteRespTypeRecv(netinf,q, locInfolock);
 27         break;
 28     default:
 29         std::cerr << "Recv Unknown info type." << std::endl;
 30     }
 31 }
 32 
 33 void NetInfoHandler::HandleVoteRespTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q,LocalInfoWithLock& locInfolock) {
 34     
 35     {
 36         std::lock_guard<std::mutex> lck(locInfolock.m);
 37         if (netinf.term < locInfolock.locInfo.term)
 38             return;
 39         if (netinf.term > locInfolock.locInfo.term) {
 40             locInfolock.locInfo.term = netinf.term;
 41             locInfolock.locInfo.status = FOLLOWER_STATUS;
 42             locInfolock.locInfo.isVote = 0;
 43             locInfolock.locInfo.IsRecvHeartbeat = 0;
 44             locInfolock.locInfo.voteRecord.clear();
 45         }
 46         if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == locInfolock.locInfo.id && netinf.voteId == locInfolock.locInfo.id) {
 47             //更新本地map記錄
 48             locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
 49         }
 50         int count = 0;
 51         std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
 52             //查看本term的投票是否達半數以上
 53         while (it != locInfolock.locInfo.voteRecord.end()) {
 54             if (it->second == locInfolock.locInfo.term)
 55                 count++;
 56                 it++;
 57         }
 58         if (count > NODE_COUNT / 2) {
 59             //達到半數以上 轉化爲leader模式 不然繼續選舉
 60             locInfolock.locInfo.leaderID = locInfolock.locInfo.id;
 61             locInfolock.locInfo.IsRecvHeartbeat = 0;
 62             locInfolock.locInfo.status = LEADER_STATUS;
 63             locInfolock.locInfo.electionTimeout = 1000;
 64             std::cout << "I am the leader term = " << 
 65                 locInfolock.locInfo.term << std::endl;
 66         }
 67     }
 68 
 69 }
 70 
 71 void NetInfoHandler::HandleVoteTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
 72 
 73     NetInfo respNetInfo;
 74     bool isSend = false;
 75     {
 76         std::lock_guard<std::mutex> lck(locInfolock.m);
 77         if (netinf.term < locInfolock.locInfo.term)
 78             return;
 79         if (netinf.term > locInfolock.locInfo.term) {
 80             locInfolock.locInfo.term = netinf.term;
 81             locInfolock.locInfo.status = FOLLOWER_STATUS;
 82             locInfolock.locInfo.isVote = 0;
 83             locInfolock.locInfo.IsRecvHeartbeat = 0;
 84             locInfolock.locInfo.voteRecord.clear();
 85         }
 86         if (locInfolock.locInfo.isVote == 0 && locInfolock.locInfo.status == FOLLOWER_STATUS) {
 87             respNetInfo.fromID = locInfolock.locInfo.id;
 88             respNetInfo.toID = netinf.fromID;
 89             respNetInfo.term = netinf.term;
 90             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
 91             respNetInfo.voteId = netinf.voteId;
 92             locInfolock.locInfo.isVote = 1;
 93             isSend = true;
 94         }
 95         else if(locInfolock.locInfo.status == FOLLOWER_STATUS){
 96             respNetInfo.fromID = locInfolock.locInfo.id;
 97             respNetInfo.toID = netinf.fromID;
 98             respNetInfo.term = netinf.term;
 99             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
100             respNetInfo.voteId = 0;
101             isSend = true;
102         }
103     }
104     if(isSend == true)
105         q.Put(respNetInfo);
106 }
107 
108 
109 void NetInfoHandler::HandleHeartBeatTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
110 
111     {
112         std::lock_guard<std::mutex> lck(locInfolock.m);
113         if (netinf.term < locInfolock.locInfo.term)
114             return;
115         if (netinf.term > locInfolock.locInfo.term) {
116             locInfolock.locInfo.term = netinf.term;
117             locInfolock.locInfo.status = FOLLOWER_STATUS;
118             locInfolock.locInfo.isVote = 0;
119             locInfolock.locInfo.IsRecvHeartbeat = 0;
120             locInfolock.locInfo.voteRecord.clear();
121         }
122 
123         locInfolock.locInfo.IsRecvHeartbeat = 1;    
124     }
125 }
收到信息,進行處理以及發送告知本身狀態改變的代碼

 

相關文章
相關標籤/搜索