因爲時間安排上的緣由,此次的代碼寫的稍微有些簡略,只能算是本身對RAFT協議的一個鞏固。node
實現定義2個節點,使用讀取配置文件來獲取IP和端口以及節點IDios
網絡使用boost同步流程 一個線程收 一個線程發送json
1 收的線程根據接受的數據 判斷是心跳包仍是選舉請求仍是選舉請求回覆 來更新本身的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了本身的選舉票map<int,int> // nodeid, term網絡
2 發送的節點每一個200MS則輪詢一次,根據結點當前狀態減小等待時間(等待時間根據節點狀態調節爲1000ms心跳間隔或者1500-5000的隨機選舉超時)數據結構
現運行看看效果dom
咱們須要兩個節點 因此須要將exe和配置文件放入不一樣的文件夾 如圖socket
啓動程序tcp
1 初始兩個節點都是follower狀態 等待leader發送心跳 ide
2 因爲目前無leader 因此兩個節點其中之一在隨機的超時時間後,發起選舉投票oop
3 以後節點1 成爲leader,以1秒(1000ms)的間隔發送心跳包 進入正常狀態
4 在狀態3 的狀況下 關閉folloer狀態的節點2 對狀況並沒有影響。leader節點1 會持續嘗試鏈接follower節點2
5 節點2 再次鏈接 因爲leader節點1 發送的心跳term爲1 大於新啓動節點2的初始化term 0 。因此節點2 會立刻成爲follower ,接受leader節點1的心跳
6 在狀態3 的狀況下,若是關閉的是leader節點1,節點2 在一段時候未接受到心跳後,就會廣播選舉請求,請求本身成爲leader,可是因爲沒有節點與節點2的投票一致,也沒有其餘的節點選舉投票,節點2將持續嘗試選舉本身成爲leader
7 節點1上線後,贊成節點2的選舉請求,節點2接收超過半數以上的投票,成爲leader。開始以1秒間隔發送心跳包。
代碼以下
基礎結構體:
1 const enum STATUS { 2 LEADER_STATUS = 1, 3 FOLLOWER_STATUS, 4 CANDIDATE_STATUS, 5 PRE_VOTE_STAUS, 6 }; 7 8 const enum INFOTYPE { 9 DEFAULT_TYPE = 0, 10 HEART_BREAT_TYPE, 11 VOTE_LEADER_TYPE, 12 VOTE_LEADER_RESP_TYPE, 13 14 }; 15 16 typedef struct netInfo { 17 int fromID; 18 int toID; 19 INFOTYPE infotype; 20 int term; 21 int voteId; //選舉ID infotype爲votetype纔有效 22 }NetInfo; 23 24 typedef struct locaInfo { 25 int id; 26 int leaderID; 27 STATUS status; 28 int term; 29 int isVote; 30 int IsRecvHeartbeat; 31 int electionTimeout; 32 std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取本身一票 33 }LocalInfo; 34 35 typedef struct localInfoWithLock { 36 LocalInfo locInfo; 37 std::mutex m; 38 }LocalInfoWithLock;
1 #include "RaftManager.h" 2 #include "NetInfoHandler.h" 3 #include "StatusHandler.h" 4 #include <random> 5 #include <functional> 6 7 using namespace std; 8 9 std::shared_ptr<RaftManager> RaftManager::p = nullptr; 10 11 bool RaftManager::Init() { 12 //能夠使用json 讀取配置 13 ReadConfig cfg("nodeCfg"); 14 map<string, string> kv = cfg.Do(); 15 16 if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) { 17 assert(0); 18 return false; 19 } 20 ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]); 21 heartbeatTime = 5000; 22 if (kv.find("heartbeatTime") != kv.end()) 23 heartbeatTime = stoi(kv["heartbeatTime"]); 24 25 locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0; 26 locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0; 27 locInfolock.locInfo.electionTimeout = 4000; 28 locInfolock.locInfo.status = FOLLOWER_STATUS; 29 locInfolock.locInfo.voteRecord.clear(); 30 31 std::random_device rd; 32 std::default_random_engine engine(rd()); 33 std::uniform_int_distribution<> dis(5001, 9000); 34 dice = std::bind(dis, engine); 35 36 return true; 37 } 38 39 void RaftManager::SendFunc(int sendId) { 40 std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service)); 41 tcp::resolver resolver(io_service); 42 while (1) { 43 int port = portStart+ sendId; 44 45 try { 46 boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", to_string(port) })); 47 } 48 catch (std::exception& e) { 49 //std::cerr << e.what() << std::endl; 50 continue; 51 std::this_thread::sleep_for(std::chrono::milliseconds(2000)); 52 } 53 //============================================================ 54 netInfo netinfo; 55 while (1) { 56 q.Take(netinfo); 57 boost::system::error_code ignored_error; 58 boost::asio::write(*s, boost::asio::buffer(&netinfo, sizeof(netinfo)), ignored_error); 59 if (ignored_error) { 60 std::cerr << boost::system::system_error(ignored_error).what() << std::endl; 61 break; 62 } 63 64 std::cout << "\n==========================================================>" << std::endl; 65 std::cout << "Send netinfo" << std::endl; 66 std::cout << "netinf.fromID = " << netinfo.fromID << std::endl; 67 std::cout << "netinf.toID = " << netinfo.toID << std::endl; 68 std::cout << "netinf.infotype = " << netinfo.infotype << std::endl; 69 std::cout << "netinf.term = " << netinfo.term << std::endl; 70 std::cout << "netinf.voteId = " << netinfo.voteId << std::endl << std::endl; 71 std::cout << "<==========================================================" << std::endl; 72 } 73 } 74 75 } 76 77 78 void RaftManager::LoopCheck(LocalInfoWithLock& locInfolock) { 79 int looptime = 200; 80 StatusHandler handler; 81 while (1) { 82 handler.DiapatchByStatus(locInfolock,q); 83 std::this_thread::sleep_for(std::chrono::milliseconds(looptime)); 84 } 85 86 return; 87 } 88 89 void RaftManager::RecvNetInfo(tcp::socket sock) { 90 BYTE data[1024] = { 0 }; 91 boost::system::error_code error; 92 NetInfo netinf; 93 94 for (;;) { 95 size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error); 96 if (error == boost::asio::error::eof) 97 return; // Connection closed cleanly by peer. 98 else if (error) { 99 std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error. 100 return; 101 } 102 if (length != sizeof(netinf)) { 103 std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error. 104 continue; 105 } 106 107 std::cout << "\n==========================================================>" << std::endl; 108 std::cout << "recv netinfo" << std::endl; 109 std::cout << "netinf.fromID = " << netinf.fromID << std::endl; 110 std::cout << "netinf.toID = " << netinf.toID << std::endl; 111 std::cout << "netinf.infotype = " << netinf.infotype << std::endl; 112 std::cout << "netinf.term = " << netinf.term << std::endl; 113 std::cout << "netinf.voteId = " << netinf.voteId << std::endl << std::endl; 114 std::cout << "<==========================================================" << std::endl; 115 116 NetInfoHandler handler; 117 handler.DispatchByinfoType(netinf,q, locInfolock); 118 } 119 120 } 121 122 bool RaftManager::Go() { 123 if (ip == "" || portStart == 0 || nodeID == 0) 124 return false; 125 try { 126 for (int i = 1; i <= NODE_COUNT; i++) { 127 if (i != nodeID) { 128 std::thread tsend = std::thread(&RaftManager::SendFunc, shared_from_this(),i); 129 tsend.detach(); 130 } 131 } 132 133 std::thread tloop = std::thread(&RaftManager::LoopCheck, shared_from_this(), std::ref(locInfolock)); 134 tloop.detach(); 135 136 int port = portStart + nodeID; 137 tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); 138 139 for (;;) 140 { 141 tcp::socket sock(io_service); 142 a.accept(sock); 143 std::cout << "accept\n"; 144 std::thread(&RaftManager::RecvNetInfo, shared_from_this(), std::move(sock)).detach(); 145 } 146 147 } 148 catch (std::exception& e) { 149 std::cerr << __FUNCTION__ << " : " << e.what() << std::endl; 150 return false; 151 } 152 153 return true; 154 }
1 #include "StatusHandler.h" 2 #include "RaftManager.h" 3 #include <iostream> 4 5 6 void StatusHandler::DiapatchByStatus(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) { 7 LocalInfo localInfo; 8 //加鎖獲取當前狀態 決定是否進行發送操做 9 { 10 //加鎖獲取本地當前狀態 11 std::lock_guard<std::mutex> lck(locInfolock.m); 12 localInfo = locInfolock.locInfo; 13 } 14 15 switch (localInfo.status) { 16 case LEADER_STATUS: 17 HandleLeaderSend(locInfolock,q); 18 break; 19 case FOLLOWER_STATUS: 20 HandleFollowerSend(locInfolock,q); 21 break; 22 case CANDIDATE_STATUS: 23 HandleCandidateSend(locInfolock,q); 24 break; 25 default: 26 std::cerr << "Unknown status!" << std::endl; 27 } 28 } 29 30 void StatusHandler::HandleLeaderSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) { 31 bool isSendheartbeat = false; 32 int nodeid = 0; 33 int term = 0; 34 35 { 36 std::lock_guard<std::mutex> lck(locInfolock.m); 37 if (locInfolock.locInfo.electionTimeout > 0) { 38 locInfolock.locInfo.electionTimeout -= 200; 39 } 40 //超過期間限制 41 if (locInfolock.locInfo.electionTimeout <= 0 && locInfolock.locInfo.status == LEADER_STATUS) { 42 isSendheartbeat = true; 43 nodeid = locInfolock.locInfo.id; 44 term = locInfolock.locInfo.term; 45 locInfolock.locInfo.electionTimeout = 1000; 46 } 47 } 48 if (isSendheartbeat) { 49 for (int i = 1; i <= NODE_COUNT; i++) { 50 if (i != nodeid) { 51 netInfo netinfo{ nodeid ,i,HEART_BREAT_TYPE ,term,0 }; 52 q.Put(netinfo); 53 } 54 } 55 } 56 } 57 58 59 void StatusHandler::HandleFollowerSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) { 60 bool isSendVoteNetInfo = false; 61 int nodeid = 0; 62 int term = 0; 63 //加鎖獲取本地當前狀態 64 { 65 //std::cout << "Enter " << __FUNCTION__ << std::endl; 66 std::lock_guard<std::mutex> lck(locInfolock.m); 67 if (locInfolock.locInfo.electionTimeout > 0) { 68 locInfolock.locInfo.electionTimeout -= 200; 69 } 70 //超過期間限制 71 if (locInfolock.locInfo.electionTimeout <= 0) { 72 std::cout << "electionTimeout .change to CANDIDATE_STATUS" << std::endl; 73 if (locInfolock.locInfo.IsRecvHeartbeat == 0) { 74 //心跳超時 切換到選舉模式 75 locInfolock.locInfo.term++; 76 locInfolock.locInfo.status = CANDIDATE_STATUS; 77 locInfolock.locInfo.voteRecord.clear(); 78 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] = 79 locInfolock.locInfo.term; 80 isSendVoteNetInfo = true; 81 term = locInfolock.locInfo.term; 82 nodeid = locInfolock.locInfo.id; 83 locInfolock.locInfo.electionTimeout = dice(); 84 } 85 else { 86 locInfolock.locInfo.IsRecvHeartbeat = 0; 87 } 88 } 89 else if ( (locInfolock.locInfo.electionTimeout > 0) && 90 (locInfolock.locInfo.IsRecvHeartbeat == 1) && 91 (locInfolock.locInfo.status == FOLLOWER_STATUS) ) 92 { 93 std::cout << "Check hearbeat OK!!! Clear electionTimeout" << std::endl; 94 locInfolock.locInfo.IsRecvHeartbeat = 0; 95 locInfolock.locInfo.electionTimeout = dice(); 96 } 97 } 98 99 if (isSendVoteNetInfo) { 100 for (int i = 1; i <= NODE_COUNT; i++) { 101 if (i != nodeid) { 102 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid }; 103 q.Put(netinfo); 104 } 105 } 106 } 107 108 } 109 110 void StatusHandler::HandleCandidateSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) { 111 bool isSendVoteNetInfo = false; 112 int nodeid = 0; 113 int term = 0; 114 { 115 std::lock_guard<std::mutex> lck(locInfolock.m); 116 if (locInfolock.locInfo.electionTimeout > 0) { 117 locInfolock.locInfo.electionTimeout -= 200; 118 } 119 //超過期間限制 120 if (locInfolock.locInfo.electionTimeout <= 0) { 121 std::cout << "electionTimeout .CANDIDATE_STATUS too" << std::endl; 122 if (locInfolock.locInfo.IsRecvHeartbeat == 0) { 123 //心跳超時 切換到選舉模式 124 locInfolock.locInfo.term++; 125 locInfolock.locInfo.status = CANDIDATE_STATUS; 126 locInfolock.locInfo.voteRecord.clear(); 127 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] = 128 locInfolock.locInfo.term; 129 } 130 isSendVoteNetInfo = true; 131 term = locInfolock.locInfo.term; 132 nodeid = locInfolock.locInfo.id; 133 locInfolock.locInfo.electionTimeout = dice(); 134 } 135 } 136 137 if (isSendVoteNetInfo) { 138 for (int i = 1; i <= NODE_COUNT; i++) { 139 if (i != nodeid) { 140 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid }; 141 q.Put(netinfo); 142 } 143 } 144 } 145 }
1 #include "NetInfoHandler.h" 2 #include "RaftManager.h" 3 4 void NetInfoHandler::DispatchByinfoType(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) { 5 { 6 std::lock_guard<std::mutex> lck(locInfolock.m); 7 if (netinf.term < locInfolock.locInfo.term) 8 return; 9 if (netinf.term > locInfolock.locInfo.term) { 10 locInfolock.locInfo.term = netinf.term; 11 locInfolock.locInfo.status = FOLLOWER_STATUS; 12 locInfolock.locInfo.isVote = 0; 13 locInfolock.locInfo.IsRecvHeartbeat = 0; 14 locInfolock.locInfo.electionTimeout = dice(); 15 locInfolock.locInfo.voteRecord.clear(); 16 } 17 } 18 switch (netinf.infotype) { 19 case HEART_BREAT_TYPE: 20 HandleHeartBeatTypeRecv(netinf,q, locInfolock); 21 break; 22 case VOTE_LEADER_TYPE: 23 HandleVoteTypeRecv(netinf,q, locInfolock); 24 break; 25 case VOTE_LEADER_RESP_TYPE: 26 HandleVoteRespTypeRecv(netinf,q, locInfolock); 27 break; 28 default: 29 std::cerr << "Recv Unknown info type." << std::endl; 30 } 31 } 32 33 void NetInfoHandler::HandleVoteRespTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q,LocalInfoWithLock& locInfolock) { 34 35 { 36 std::lock_guard<std::mutex> lck(locInfolock.m); 37 if (netinf.term < locInfolock.locInfo.term) 38 return; 39 if (netinf.term > locInfolock.locInfo.term) { 40 locInfolock.locInfo.term = netinf.term; 41 locInfolock.locInfo.status = FOLLOWER_STATUS; 42 locInfolock.locInfo.isVote = 0; 43 locInfolock.locInfo.IsRecvHeartbeat = 0; 44 locInfolock.locInfo.voteRecord.clear(); 45 } 46 if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == locInfolock.locInfo.id && netinf.voteId == locInfolock.locInfo.id) { 47 //更新本地map記錄 48 locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term; 49 } 50 int count = 0; 51 std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin(); 52 //查看本term的投票是否達半數以上 53 while (it != locInfolock.locInfo.voteRecord.end()) { 54 if (it->second == locInfolock.locInfo.term) 55 count++; 56 it++; 57 } 58 if (count > NODE_COUNT / 2) { 59 //達到半數以上 轉化爲leader模式 不然繼續選舉 60 locInfolock.locInfo.leaderID = locInfolock.locInfo.id; 61 locInfolock.locInfo.IsRecvHeartbeat = 0; 62 locInfolock.locInfo.status = LEADER_STATUS; 63 locInfolock.locInfo.electionTimeout = 1000; 64 std::cout << "I am the leader term = " << 65 locInfolock.locInfo.term << std::endl; 66 } 67 } 68 69 } 70 71 void NetInfoHandler::HandleVoteTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) { 72 73 NetInfo respNetInfo; 74 bool isSend = false; 75 { 76 std::lock_guard<std::mutex> lck(locInfolock.m); 77 if (netinf.term < locInfolock.locInfo.term) 78 return; 79 if (netinf.term > locInfolock.locInfo.term) { 80 locInfolock.locInfo.term = netinf.term; 81 locInfolock.locInfo.status = FOLLOWER_STATUS; 82 locInfolock.locInfo.isVote = 0; 83 locInfolock.locInfo.IsRecvHeartbeat = 0; 84 locInfolock.locInfo.voteRecord.clear(); 85 } 86 if (locInfolock.locInfo.isVote == 0 && locInfolock.locInfo.status == FOLLOWER_STATUS) { 87 respNetInfo.fromID = locInfolock.locInfo.id; 88 respNetInfo.toID = netinf.fromID; 89 respNetInfo.term = netinf.term; 90 respNetInfo.infotype = VOTE_LEADER_RESP_TYPE; 91 respNetInfo.voteId = netinf.voteId; 92 locInfolock.locInfo.isVote = 1; 93 isSend = true; 94 } 95 else if(locInfolock.locInfo.status == FOLLOWER_STATUS){ 96 respNetInfo.fromID = locInfolock.locInfo.id; 97 respNetInfo.toID = netinf.fromID; 98 respNetInfo.term = netinf.term; 99 respNetInfo.infotype = VOTE_LEADER_RESP_TYPE; 100 respNetInfo.voteId = 0; 101 isSend = true; 102 } 103 } 104 if(isSend == true) 105 q.Put(respNetInfo); 106 } 107 108 109 void NetInfoHandler::HandleHeartBeatTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) { 110 111 { 112 std::lock_guard<std::mutex> lck(locInfolock.m); 113 if (netinf.term < locInfolock.locInfo.term) 114 return; 115 if (netinf.term > locInfolock.locInfo.term) { 116 locInfolock.locInfo.term = netinf.term; 117 locInfolock.locInfo.status = FOLLOWER_STATUS; 118 locInfolock.locInfo.isVote = 0; 119 locInfolock.locInfo.IsRecvHeartbeat = 0; 120 locInfolock.locInfo.voteRecord.clear(); 121 } 122 123 locInfolock.locInfo.IsRecvHeartbeat = 1; 124 } 125 }