Raft官網node
論文中文翻譯git
論文英文地址程序員
感受做爲paxos的升級精簡版 Raft在設計之初就以容易理解爲目標 看完資料 腦海裏都有了大概的輪廓。github
有了這些詳細的資料甚至是動畫演示在前 起始都沒多少好說的,本篇知識做爲記錄下學習點,做爲往後回顧提示算法
在分佈式系統中,一致性指的是集羣中的多個節點在狀態上達成一致.可是在現實場景中,因爲程序崩潰、網絡故障、硬件故障、斷電等緣由,節點間的一致性很難保證,這樣就須要Paxos、Raft等一致性協議。json
Paxos協議是Leslie Lamport在1990年提出的一種基於消息傳遞的、具備高度容錯特性的一致性算法.可是Paxos有兩個明顯的缺點:第一個缺點就是Paxos算法難以理解.第二個缺點就是並無提供構建現實系統的良好基礎,windows
有不少工程化Paxos算法的嘗試,可是他們對Paxos算法自己作了較大改動,彼此之間的實現差距都比較大服務器
Raft算法是一種用於管理複製日誌的一致性算法,在設計Raft算法時設計者就將易於理解做爲目標之一,是的Raft算法更易於構建實際的系統,大幅度減小了工程化的工做量。
Raft協議的模式是一個Leader節點和多個Follower節點的模式。就是常說的Leader-Follower模式.每一個節點有三個狀態Leader Follower Candidate狀態
Leader負責處理客戶端請求 而且將處理結果以log形式同步到其餘Follower節點上
在Raft協議中有兩個時間控制Leader選舉的進度。
一個Leader定時向Follower發送心跳包。
一個是選舉超時控制(election timeout),選舉超時控制就是一個處於Follower節點等待進入Candidate狀態的時間限制。
選舉超時控制(election timeout)通常在選擇150ms到300ms之間的隨機值(機率上避免多個節點同時進入Candidate狀態)
若某個節點election timeout進度完成以前都沒收到Leader的心跳包,則說明沒有Leader,該節點進入Candidate狀態.給本身投票,而後給其餘節點發送選舉請求.
其餘節點收到選舉請求後,若在當前請求中標記的任期(term)內比本身記錄的term相等或者更大,且未進行過投票,則回覆答應該投票請求,重置本身的選舉超時控制
選舉者獲取一半以上投票,進入Leader狀態,開始給其餘節點Follower發送心跳,維持本身的權威
下面來看看多個節點 選擇的狀況 節點B D同時發起選舉投票,而且每一個節點都獲取一張選票,最後的結果就是隨機選舉超時時間,選舉超時控制(election timeout)通常在選擇150ms到300ms之間的隨機值(機率上避免多個節點同時進入Candidate狀態) 。
最終,重複屢次選舉投票後(機率很小),某個節點獲取一半以上投票,成爲Leader。
1 #pragma once 2 #include <iostream> 3 #include <fstream> 4 #include <cassert> 5 #include <string> 6 #include <iostream> 7 #include <vector> 8 #include <map> 9 using namespace std; 10 /* 11 *做 者: itdef 12 *歡迎轉帖 請保持文本完整並註明出處 13 *技術博客 http://www.cnblogs.com/itdef/ 14 *技術交流羣 羣號碼:432336863 15 *歡迎c c++ windows驅動愛好者 服務器程序員溝通交流 16 *部分老代碼存放地點 17 *http://www.oschina.net/code/list_by_user?id=614253 18 */ 19 const string FILE_NAME = "config.txt"; 20 class ReadConfig { 21 public: 22 ReadConfig(string filename = "") { 23 if (filename.empty()) { 24 file_name = FILE_NAME; 25 } 26 else { 27 file_name = filename; 28 } 29 } 30 ~ReadConfig() {} 31 map<string, string> Do() { 32 tar_path.clear(); 33 ifstream fin; 34 fin.open(file_name); 35 if (false == fin.is_open()) { 36 std::cerr << "open file failed!!" << std::endl; 37 return tar_path; 38 } 39 string s; 40 while (getline(fin, s)) 41 { 42 if ('#' == s[0] || ('/' == s[0] && '/' == s[1])) 43 continue; 44 size_t pos = s.find_first_of("="); 45 if (pos == std::string::npos || pos + 1 >= s.size()) 46 continue; 47 string targetName = s.substr(0, pos); 48 string path = s.substr(pos + 1); 49 std::cout << targetName << " = " << path << std::endl; 50 if (path[0] != ' ') 51 tar_path[targetName] = path; 52 } 53 fin.close(); 54 return tar_path; 55 } 56 private: 57 map<string, string> tar_path; 58 string file_name; 59 };
1 #pragma once 2 #pragma once 3 #include <string> 4 #include <mutex> 5 #include <map> 6 7 const enum STATUS { 8 LEADER_STATUS = 1, 9 FOLLOWER_STATUS, 10 CANDIDATE_STATUS, 11 PRE_VOTE_STAUS, 12 }; 13 14 const enum INFOTYPE { 15 DEFAULT_TYPE = 0, 16 HEART_BREAT_TYPE, 17 VOTE_LEADER_TYPE, 18 VOTE_LEADER_RESP_TYPE, 19 20 }; 21 22 typedef struct netInfo { 23 int fromID; 24 int toID; 25 INFOTYPE infotype; 26 int term; 27 int voteId; //選舉ID infotype爲votetype纔有效 28 }NetInfo; 29 30 typedef struct locaInfo { 31 int id; 32 int leaderID; 33 STATUS status; 34 int term; 35 int isVote; 36 int IsRecvHeartbeat; 37 std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取本身一票 38 }LocalInfo; 39 40 typedef struct localInfoWithLock { 41 LocalInfo locInfo; 42 std::mutex m; 43 }LocalInfoWithLock;
1 #pragma once 2 #pragma once 3 #include "CommonStruct.h" 4 #include "ReadConfig.h" 5 #include <memory> 6 #include <boost/asio.hpp> 7 8 using boost::asio::ip::tcp; 9 using namespace std; 10 11 class RaftManager :public enable_shared_from_this<RaftManager> { 12 public: 13 static std::shared_ptr<RaftManager> GetInstance() { 14 if (p == nullptr) 15 p.reset(new RaftManager()); 16 //p = std::make_shared<RaftManager>(); 17 return p; 18 } 19 ~RaftManager() { 20 std::cout << "enter ~RaftManager()\n"; 21 } 22 bool Init(); 23 bool Go(); 24 25 private: 26 boost::asio::io_service io_service; 27 std::string ip; int portStart; 28 int nodeID; 29 int electionTimeout; 30 int heartbeatTime; 31 LocalInfoWithLock locInfolock; 32 33 //===============================send 34 void DiapatchByStatus(int id, int& timeoutLimit); 35 void HandleLeaderSend(int id, int& timeoutLimit); 36 void HandleCandidateSend(int id, int& timeoutLimit); 37 void HandleFollowerSend(int id, int& timeoutLimit); 38 void HandlePreVoteSend(int id, int& timeoutLimit); 39 40 //===================recv 41 void DiapatchByInfoType(const NetInfo& netinf); 42 void HandleHeartbeatTypeRecv(const NetInfo& netinf); 43 void HandleVoteTypeRecv(const NetInfo& netinf); 44 void HandleVoteRespTypeRecv(const NetInfo& netinf); 45 46 std::function<int()> dice; 47 48 bool LoopCheck(int id, std::shared_ptr<tcp::socket> s); 49 void Session(tcp::socket sock); 50 void SendFunc(int id); 51 52 RaftManager() {} 53 RaftManager(const RaftManager&) = delete; 54 RaftManager& operator=(const RaftManager&) = delete; 55 static std::shared_ptr<RaftManager> p; 56 };
1 #include "RaftManager.h" 2 #include <random> 3 #include <functional> 4 5 std::shared_ptr<RaftManager> RaftManager::p = nullptr; 6 7 8 9 10 bool RaftManager::Init() { 11 //可使用json 讀取配置 12 ReadConfig cfg("nodeCfg"); 13 map<string, string> kv = cfg.Do(); 14 15 if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) { 16 assert(0); 17 return false; 18 } 19 ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]); 20 electionTimeout = 4000; 21 heartbeatTime = 5000; 22 if (kv.find("heartbeatTime") != kv.end()) 23 heartbeatTime = stoi(kv["heartbeatTime"]); 24 25 locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0; 26 locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0; 27 locInfolock.locInfo.status = FOLLOWER_STATUS; 28 locInfolock.locInfo.voteRecord.clear(); 29 30 std::random_device rd; 31 std::default_random_engine engine(rd()); 32 std::uniform_int_distribution<> dis(2001, 5000); 33 dice = std::bind(dis, engine); 34 35 return true; 36 } 37 38 void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) { 39 if (timeoutLimit > 0){ 40 timeoutLimit -= 200; 41 } 42 if (timeoutLimit <= 0) { 43 44 45 46 timeoutLimit = dice(); 47 } 48 } 49 void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) { 50 if (timeoutLimit > 0) { 51 timeoutLimit -= 200; 52 } 53 if (timeoutLimit <= 0) { 54 55 56 57 timeoutLimit = dice(); 58 } 59 60 } 61 62 63 void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) { 64 if (timeoutLimit > 0) { 65 timeoutLimit -= 200; 66 } 67 if (timeoutLimit <= 0) { 68 69 70 71 timeoutLimit = dice(); 72 } 73 74 } 75 76 void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) { 77 if (timeoutLimit > 0) { 78 timeoutLimit -= 200; 79 } 80 if (timeoutLimit <= 0) { 81 LocalInfo localInfo; 82 //加鎖獲取當前狀態 決定是否進行發送操做 83 { 84 //加鎖獲取本地當前狀態 85 std::lock_guard<std::mutex> lck(locInfolock.m); 86 localInfo = locInfolock.locInfo; 87 } 88 if (localInfo.IsRecvHeartbeat == 0) { 89 //心跳超時 切換到選舉模式 90 std::lock_guard<std::mutex> lck(locInfolock.m); 91 locInfolock.locInfo.term++; 92 locInfolock.locInfo.status = CANDIDATE_STATUS; 93 locInfolock.locInfo.voteRecord.clear(); 94 locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term; 95 } 96 97 timeoutLimit = dice(); 98 } 99 } 100 101 //=================== 102 void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) { 103 std::lock_guard<std::mutex> lck(locInfolock.m); 104 if (netinf.fromID != locInfolock.locInfo.leaderID) 105 locInfolock.locInfo.leaderID = netinf.fromID; 106 locInfolock.locInfo.IsRecvHeartbeat = 1; 107 108 } 109 void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) { 110 std::lock_guard<std::mutex> lck(locInfolock.m); 111 int voteid = netinf.fromID; 112 if (locInfolock.locInfo.isVote == 0) { 113 //回覆投票 todo 114 115 locInfolock.locInfo.isVote = 1; //標記該term已經投票 116 } 117 else { 118 //回覆不投票 todo 119 } 120 121 } 122 void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) { 123 std::lock_guard<std::mutex> lck(locInfolock.m); 124 if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) { 125 //更新本地map記錄 126 locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term; 127 } 128 int count = 0; 129 std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin(); 130 //查看本term的投票是否達半數以上 131 while (it != locInfolock.locInfo.voteRecord.end()) { 132 if (it->second == locInfolock.locInfo.term) 133 count++; 134 it++; 135 } 136 if (count > 5 / 2) { 137 //達到半數以上 轉化爲leader模式 不然繼續選舉 138 locInfolock.locInfo.leaderID = nodeID; 139 locInfolock.locInfo.IsRecvHeartbeat = 0; 140 locInfolock.locInfo.status = LEADER_STATUS; 141 } 142 } 143 144 145 //loop send 146 void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) { 147 NetInfo netinf{ nodeID,id,DEFAULT_TYPE,0,0 }; 148 LocalInfo localInfo; 149 //加鎖獲取當前狀態 決定是否進行發送操做 150 { 151 //加鎖獲取本地當前狀態 152 std::lock_guard<std::mutex> lck(locInfolock.m); 153 localInfo = locInfolock.locInfo; 154 } 155 switch (localInfo.status) { 156 case LEADER_STATUS: 157 HandleLeaderSend(id,timeoutLimit); 158 break; 159 case FOLLOWER_STATUS: 160 HandleFollowerSend(id,timeoutLimit); 161 break; 162 case CANDIDATE_STATUS: 163 HandleCandidateSend(id,timeoutLimit); 164 break; 165 case PRE_VOTE_STAUS: 166 HandlePreVoteSend(id, timeoutLimit); 167 default: 168 std::cerr << "unknown status!!" << std::endl; 169 } 170 171 } 172 173 174 //handle recv 175 void RaftManager::DiapatchByInfoType(const NetInfo& netinf) { 176 { 177 std::lock_guard<std::mutex> lck(locInfolock.m); 178 if (netinf.term < locInfolock.locInfo.term) 179 return; 180 if (netinf.term > locInfolock.locInfo.term) { 181 locInfolock.locInfo.term = netinf.term; 182 locInfolock.locInfo.status = FOLLOWER_STATUS; 183 locInfolock.locInfo.isVote = 0; 184 locInfolock.locInfo.IsRecvHeartbeat = 0; 185 locInfolock.locInfo.voteRecord.clear(); 186 } 187 } 188 //======================================== 189 switch (netinf.infotype) { 190 case HEART_BREAT_TYPE: 191 HandleHeartbeatTypeRecv(netinf); 192 break; 193 case VOTE_LEADER_TYPE: 194 HandleVoteTypeRecv(netinf); 195 break; 196 case VOTE_LEADER_RESP_TYPE: 197 HandleVoteRespTypeRecv(netinf); 198 break; 199 default: 200 std::cerr << "Recv Unknown info type." << std::endl; 201 } 202 203 } 204 205 bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) { 206 int looptime = 200; 207 int timeoutlimit = dice(); 208 while (1) { 209 DiapatchByStatus(id, timeoutlimit); 210 std::this_thread::sleep_for(std::chrono::milliseconds(looptime)); 211 } 212 213 return false; 214 } 215 216 void RaftManager::SendFunc(int i) { 217 //todo 218 //示例 間隔200ms掃描 心跳間隔5000ms 選舉超時未 1001-4000ms 219 string port = "9920"; 220 port[port.size() - 1] += i; 221 int looptime = 4000; 222 while (1) { 223 std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service)); 224 tcp::resolver resolver(io_service); 225 try { 226 boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port })); 227 } 228 catch (exception& e) { 229 //持續嘗試鏈接 230 continue; 231 } 232 LoopCheck(i, s); 233 std::this_thread::sleep_for(std::chrono::milliseconds(looptime)); 234 } 235 236 return; 237 } 238 239 void RaftManager::Session(tcp::socket sock) { 240 BYTE data[1024] = { 0 }; 241 boost::system::error_code error; 242 NetInfo netinf; 243 while (1) { 244 size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error); 245 if (error == boost::asio::error::eof) 246 return; // Connection closed cleanly by peer. 247 else if (error) { 248 std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error. 249 return; 250 } 251 if (length != sizeof(netinf)) { 252 std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error. 253 return; 254 } 255 256 DiapatchByInfoType(netinf); 257 258 } 259 } 260 261 bool RaftManager::Go() { 262 //創建網絡 原本可使用廣播 獲取和通知其餘節點 263 //演示版本假定 5個ID和端口分別爲1 2 3 4 5 和9921 9922 9923 9924 9925 264 if (ip == "" || portStart == 0 || nodeID == 0) 265 return false; 266 try { 267 //開啓4個與其餘線程發送信息的線程 268 for (int i = 1; i <= 2; i++) { 269 if (i == nodeID) 270 continue; 271 std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i); 272 t.detach(); 273 } 274 275 int port = portStart + nodeID; 276 tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); 277 for (;;) 278 { 279 for (;;) 280 { 281 tcp::socket sock(io_service); 282 a.accept(sock); 283 std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach(); 284 } 285 } 286 } 287 catch (exception& e) { 288 std::cerr << __FUNCTION__ << " : " << e.what() << std::endl; 289 return false; 290 } 291 292 return true; 293 }
1 // QueueTemplate.cpp : 此文件包含 "main" 函數。程序執行將在此處開始並結束。 2 // 3 4 #include "pch.h" 5 #include <iostream> 6 7 8 #include<list> 9 #include<mutex> 10 #include<thread> 11 #include<condition_variable> 12 #include <iostream> 13 using namespace std; 14 15 template<typename T> 16 class SyncQueue 17 { 18 public: 19 SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) 20 { 21 } 22 23 void Put(const T&x) 24 { 25 Add(x); 26 } 27 28 void Put(T&&x) 29 { 30 Add(std::forward<T>(x)); 31 } 32 33 void Take(std::list<T>& list) 34 { 35 std::unique_lock<std::mutex> locker(m_mutex); 36 m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); 37 38 if (m_needStop) 39 return; 40 list = std::move(m_queue); 41 m_notFull.notify_one(); 42 } 43 44 void Take(T& t) 45 { 46 std::unique_lock<std::mutex> locker(m_mutex); 47 m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); 48 49 if (m_needStop) 50 return; 51 t = m_queue.front(); 52 m_queue.pop_front(); 53 m_notFull.notify_one(); 54 } 55 56 void Stop() 57 { 58 { 59 std::lock_guard<std::mutex> locker(m_mutex); 60 m_needStop = true; 61 } 62 m_notFull.notify_all(); 63 m_notEmpty.notify_all(); 64 } 65 66 bool Empty() 67 { 68 std::lock_guard<std::mutex> locker(m_mutex); 69 return m_queue.empty(); 70 } 71 72 bool Full() 73 { 74 std::lock_guard<std::mutex> locker(m_mutex); 75 return m_queue.size() == m_maxSize; 76 } 77 78 size_t Size() 79 { 80 std::lock_guard<std::mutex> locker(m_mutex); 81 return m_queue.size(); 82 } 83 84 int Count() 85 { 86 return m_queue.size(); 87 } 88 private: 89 bool NotFull() const 90 { 91 bool full = m_queue.size() >= m_maxSize; 92 if (full) 93 cout << "full, waiting,thread id: " << this_thread::get_id() << endl; 94 return !full; 95 } 96 97 bool NotEmpty() const 98 { 99 bool empty = m_queue.empty(); 100 if (empty) 101 cout << "empty,waiting,thread id: " << this_thread::get_id() << endl; 102 return !empty; 103 } 104 105 template<typename F> 106 void Add(F&&x) 107 { 108 std::unique_lock< std::mutex> locker(m_mutex); 109 m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); 110 if (m_needStop) 111 return; 112 113 m_queue.push_back(std::forward<F>(x)); 114 m_notEmpty.notify_one(); 115 } 116 117 private: 118 std::list<T> m_queue; //緩衝區 119 std::mutex m_mutex; //互斥量和條件變量結合起來使用 120 std::condition_variable m_notEmpty;//不爲空的條件變量 121 std::condition_variable m_notFull; //沒有滿的條件變量 122 int m_maxSize; //同步隊列最大的size 123 124 bool m_needStop; //中止的標誌 125 }; 126 127 int main() 128 { 129 std::cout << "Hello World!\n"; 130 131 SyncQueue<int> q(1); 132 q.Put(1); 133 134 int a = 0; 135 q.Take(a); 136 137 q.Put(2); 138 q.Take(a); 139 140 141 q.Stop(); 142 143 }
本身嘗試作一個簡化的raft選舉演示
實現定義2-5個節點,使用讀取配置文件來獲取IP和端口以及節點ID
網絡使用boost同步流程
一個線程收 四個線程發送
1 收的線程根據接受的數據 判斷是心跳包仍是選舉請求仍是選舉請求回覆 來更新本身的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了本身的選舉票map<int,int> // nodeid, term
2 發送的節點每一個200MS則輪詢一次,根據結點當前狀態減小等待時間(等待時間根據節點狀態調節爲1000ms心跳間隔或者1500-5000的隨機選舉超時)
根據當前狀態決定發送心跳包或者是選舉消息 或者是選舉回覆消息
待填坑
參考:
《etcd技術內幕》