分佈式協議學習筆記(一) Raft 選舉

Raft官網node

論文中文翻譯git

論文英文地址程序員

感受做爲paxos的升級精簡版 Raft在設計之初就以容易理解爲目標 看完資料 腦海裏都有了大概的輪廓。github

有了這些詳細的資料甚至是動畫演示在前 起始都沒多少好說的,本篇知識做爲記錄下學習點,做爲往後回顧提示算法

在分佈式系統中,一致性指的是集羣中的多個節點在狀態上達成一致.可是在現實場景中,因爲程序崩潰、網絡故障、硬件故障、斷電等緣由,節點間的一致性很難保證,這樣就須要Paxos、Raft等一致性協議。json

Paxos協議是Leslie Lamport在1990年提出的一種基於消息傳遞的、具備高度容錯特性的一致性算法.可是Paxos有兩個明顯的缺點:第一個缺點就是Paxos算法難以理解.第二個缺點就是並無提供構建現實系統的良好基礎,windows

有不少工程化Paxos算法的嘗試,可是他們對Paxos算法自己作了較大改動,彼此之間的實現差距都比較大服務器

Raft算法是一種用於管理複製日誌的一致性算法,在設計Raft算法時設計者就將易於理解做爲目標之一,是的Raft算法更易於構建實際的系統,大幅度減小了工程化的工做量。

1 Leader選舉

Raft協議的模式是一個Leader節點和多個Follower節點的模式。就是常說的Leader-Follower模式.每一個節點有三個狀態Leader Follower Candidate狀態

Leader負責處理客戶端請求 而且將處理結果以log形式同步到其餘Follower節點上

在Raft協議中有兩個時間控制Leader選舉的進度。

一個Leader定時向Follower發送心跳包。

一個是選舉超時控制(election timeout),選舉超時控制就是一個處於Follower節點等待進入Candidate狀態的時間限制。

選舉超時控制(election timeout)通常在選擇150ms到300ms之間的隨機值(機率上避免多個節點同時進入Candidate狀態)

若某個節點election timeout進度完成以前都沒收到Leader的心跳包,則說明沒有Leader,該節點進入Candidate狀態.給本身投票,而後給其餘節點發送選舉請求.

其餘節點收到選舉請求後,若在當前請求中標記的任期(term)內比本身記錄的term相等或者更大,且未進行過投票,則回覆答應該投票請求,重置本身的選舉超時控制

選舉者獲取一半以上投票,進入Leader狀態,開始給其餘節點Follower發送心跳,維持本身的權威

下面來看看多個節點 選擇的狀況 節點B D同時發起選舉投票,而且每一個節點都獲取一張選票,最後的結果就是隨機選舉超時時間,選舉超時控制(election timeout)通常在選擇150ms到300ms之間的隨機值(機率上避免多個節點同時進入Candidate狀態) 。 

最終,重複屢次選舉投票後(機率很小),某個節點獲取一半以上投票,成爲Leader。

 

 1 #pragma once
 2 #include <iostream>
 3 #include <fstream>
 4 #include <cassert>
 5 #include <string>
 6 #include <iostream>
 7 #include <vector>
 8 #include <map>
 9 using namespace std;
10 /*
11 *做 者: itdef
12 *歡迎轉帖 請保持文本完整並註明出處
13 *技術博客 http://www.cnblogs.com/itdef/
14 *技術交流羣 羣號碼:432336863
15 *歡迎c c++ windows驅動愛好者 服務器程序員溝通交流
16 *部分老代碼存放地點
17 *http://www.oschina.net/code/list_by_user?id=614253
18 */
19 const string FILE_NAME = "config.txt";
20 class ReadConfig {
21 public:
22     ReadConfig(string filename = "") {
23         if (filename.empty()) {
24             file_name = FILE_NAME;
25         }
26         else {
27             file_name = filename;
28         }
29     }
30     ~ReadConfig() {}
31     map<string, string> Do() {
32         tar_path.clear();
33         ifstream fin;
34         fin.open(file_name);
35         if (false == fin.is_open()) {
36             std::cerr << "open file failed!!" << std::endl;
37             return tar_path;
38         }
39         string s;
40         while (getline(fin, s))
41         {
42             if ('#' == s[0] || ('/' == s[0] && '/' == s[1]))
43                 continue;
44             size_t pos = s.find_first_of("=");
45             if (pos == std::string::npos || pos + 1 >= s.size())
46                 continue;
47             string targetName = s.substr(0, pos);
48             string path = s.substr(pos + 1);
49             std::cout << targetName << " = " << path << std::endl;
50             if (path[0] != ' ')
51                 tar_path[targetName] = path;
52         }
53         fin.close();
54         return tar_path;
55     }
56 private:
57     map<string, string> tar_path;
58     string file_name;
59 };
ReadConfig.h
 1 #pragma once
 2 #pragma once
 3 #include <string>
 4 #include <mutex>
 5 #include <map>
 6 
 7 const enum STATUS {
 8     LEADER_STATUS = 1,
 9     FOLLOWER_STATUS,
10     CANDIDATE_STATUS,
11     PRE_VOTE_STAUS,
12 };
13 
14 const enum INFOTYPE {
15     DEFAULT_TYPE = 0,
16     HEART_BREAT_TYPE,
17     VOTE_LEADER_TYPE,
18     VOTE_LEADER_RESP_TYPE,
19 
20 };
21 
22 typedef struct netInfo {
23     int fromID;
24     int toID;
25     INFOTYPE infotype;
26     int    term;
27     int voteId;    //選舉ID infotype爲votetype纔有效
28 }NetInfo;
29 
30 typedef struct locaInfo {
31     int    id;
32     int leaderID;
33     STATUS status;
34     int term;
35     int isVote;
36     int    IsRecvHeartbeat;
37     std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取本身一票
38 }LocalInfo;
39 
40 typedef struct localInfoWithLock {
41     LocalInfo    locInfo;
42     std::mutex m;
43 }LocalInfoWithLock;
CommonStruct.h
 1 #pragma once
 2 #pragma once
 3 #include "CommonStruct.h"
 4 #include "ReadConfig.h"
 5 #include <memory>
 6 #include <boost/asio.hpp>
 7 
 8 using boost::asio::ip::tcp;
 9 using namespace std;
10 
11 class RaftManager :public enable_shared_from_this<RaftManager> {
12 public:
13     static std::shared_ptr<RaftManager> GetInstance() {
14         if (p == nullptr)
15             p.reset(new RaftManager());
16         //p = std::make_shared<RaftManager>();
17         return p;
18     }
19     ~RaftManager() {
20         std::cout << "enter ~RaftManager()\n";
21     }
22     bool Init();
23     bool Go();
24 
25 private:
26     boost::asio::io_service io_service;
27     std::string ip; int portStart;
28     int nodeID;
29     int electionTimeout;
30     int heartbeatTime;
31     LocalInfoWithLock locInfolock;
32 
33     //===============================send
34     void DiapatchByStatus(int id, int& timeoutLimit);
35     void HandleLeaderSend(int id, int& timeoutLimit);
36     void HandleCandidateSend(int id, int& timeoutLimit);
37     void HandleFollowerSend(int id, int& timeoutLimit);
38     void HandlePreVoteSend(int id, int& timeoutLimit);
39 
40     //===================recv
41     void DiapatchByInfoType(const NetInfo& netinf);
42     void HandleHeartbeatTypeRecv(const NetInfo& netinf);
43     void HandleVoteTypeRecv(const NetInfo& netinf);
44     void HandleVoteRespTypeRecv(const NetInfo& netinf);
45 
46     std::function<int()> dice;
47 
48     bool LoopCheck(int id, std::shared_ptr<tcp::socket> s);
49     void Session(tcp::socket sock);
50     void SendFunc(int id);
51 
52     RaftManager() {}
53     RaftManager(const RaftManager&) = delete;
54     RaftManager& operator=(const RaftManager&) = delete;
55     static std::shared_ptr<RaftManager> p;
56 };
RaftManager.h
  1 #include "RaftManager.h"
  2 #include <random>
  3 #include <functional>
  4 
  5 std::shared_ptr<RaftManager> RaftManager::p = nullptr;
  6 
  7 
  8 
  9 
 10 bool RaftManager::Init() {
 11     //可使用json 讀取配置
 12     ReadConfig cfg("nodeCfg");
 13     map<string, string> kv = cfg.Do();
 14 
 15     if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
 16         assert(0);
 17         return false;
 18     }
 19     ip = kv["ip"];  portStart = stoi(kv["portStart"]);  nodeID = stoi(kv["nodeID"]);
 20     electionTimeout = 4000;
 21     heartbeatTime = 5000;
 22     if (kv.find("heartbeatTime") != kv.end())
 23         heartbeatTime = stoi(kv["heartbeatTime"]);
 24 
 25     locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0;
 26     locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0;
 27     locInfolock.locInfo.status = FOLLOWER_STATUS;
 28     locInfolock.locInfo.voteRecord.clear();
 29 
 30     std::random_device rd;
 31     std::default_random_engine engine(rd());
 32     std::uniform_int_distribution<> dis(2001, 5000);
 33     dice = std::bind(dis, engine);
 34 
 35     return true;
 36 }
 37 
 38 void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) {
 39     if (timeoutLimit > 0){
 40         timeoutLimit -= 200;
 41     }
 42     if (timeoutLimit <= 0) {
 43     
 44         
 45 
 46         timeoutLimit = dice();
 47     }
 48 }
 49 void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) {
 50     if (timeoutLimit > 0) {
 51         timeoutLimit -= 200;
 52     }
 53     if (timeoutLimit <= 0) {
 54 
 55 
 56 
 57         timeoutLimit = dice();
 58     }
 59 
 60 }
 61 
 62 
 63 void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) {
 64     if (timeoutLimit > 0) {
 65         timeoutLimit -= 200;
 66     }
 67     if (timeoutLimit <= 0) {
 68 
 69 
 70 
 71         timeoutLimit = dice();
 72     }
 73 
 74 }
 75 
 76 void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) {
 77     if (timeoutLimit > 0) {
 78         timeoutLimit -= 200;
 79     }
 80     if (timeoutLimit <= 0) {
 81         LocalInfo localInfo;
 82         //加鎖獲取當前狀態 決定是否進行發送操做
 83         {
 84             //加鎖獲取本地當前狀態
 85             std::lock_guard<std::mutex> lck(locInfolock.m);
 86             localInfo = locInfolock.locInfo;
 87         }
 88         if (localInfo.IsRecvHeartbeat == 0) {
 89             //心跳超時  切換到選舉模式
 90             std::lock_guard<std::mutex> lck(locInfolock.m);
 91             locInfolock.locInfo.term++;
 92             locInfolock.locInfo.status = CANDIDATE_STATUS;
 93             locInfolock.locInfo.voteRecord.clear();
 94             locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term;
 95         }
 96 
 97         timeoutLimit = dice();
 98     }
 99 }
100 
101 //===================
102 void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) {
103     std::lock_guard<std::mutex> lck(locInfolock.m);
104     if (netinf.fromID != locInfolock.locInfo.leaderID)
105         locInfolock.locInfo.leaderID = netinf.fromID;
106     locInfolock.locInfo.IsRecvHeartbeat = 1;
107 
108 }
109 void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) {
110     std::lock_guard<std::mutex> lck(locInfolock.m);
111     int voteid = netinf.fromID;
112     if (locInfolock.locInfo.isVote == 0) {
113         //回覆投票 todo
114 
115         locInfolock.locInfo.isVote = 1;    //標記該term已經投票
116     }
117     else {
118         //回覆不投票 todo
119     }
120 
121 }
122 void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) {
123     std::lock_guard<std::mutex> lck(locInfolock.m);
124     if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) {
125         //更新本地map記錄
126         locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
127     }
128     int count = 0;
129     std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
130     //查看本term的投票是否達半數以上
131     while (it != locInfolock.locInfo.voteRecord.end()) {
132         if (it->second == locInfolock.locInfo.term)
133             count++;
134         it++;
135     }
136     if (count > 5 / 2) {
137         //達到半數以上 轉化爲leader模式 不然繼續選舉
138         locInfolock.locInfo.leaderID = nodeID;
139         locInfolock.locInfo.IsRecvHeartbeat = 0;
140         locInfolock.locInfo.status = LEADER_STATUS;
141     }
142 }
143 
144 
145 //loop send
146 void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) {
147     NetInfo netinf{ nodeID,id,DEFAULT_TYPE,0,0 };
148     LocalInfo localInfo;
149     //加鎖獲取當前狀態 決定是否進行發送操做
150     {
151         //加鎖獲取本地當前狀態
152         std::lock_guard<std::mutex> lck(locInfolock.m);
153         localInfo = locInfolock.locInfo;
154     }
155     switch (localInfo.status) {
156     case LEADER_STATUS:
157         HandleLeaderSend(id,timeoutLimit);
158         break;
159     case FOLLOWER_STATUS:
160         HandleFollowerSend(id,timeoutLimit);
161         break;
162     case CANDIDATE_STATUS:
163         HandleCandidateSend(id,timeoutLimit);
164         break;
165     case PRE_VOTE_STAUS:
166         HandlePreVoteSend(id, timeoutLimit);
167     default:
168         std::cerr << "unknown status!!" << std::endl;
169     }
170 
171 }
172 
173 
174 //handle recv
175 void RaftManager::DiapatchByInfoType(const NetInfo& netinf) {
176     {
177         std::lock_guard<std::mutex> lck(locInfolock.m);
178         if (netinf.term < locInfolock.locInfo.term)
179             return;
180         if (netinf.term > locInfolock.locInfo.term) {
181             locInfolock.locInfo.term = netinf.term;
182             locInfolock.locInfo.status = FOLLOWER_STATUS;
183             locInfolock.locInfo.isVote = 0;
184             locInfolock.locInfo.IsRecvHeartbeat = 0;
185             locInfolock.locInfo.voteRecord.clear();
186         }
187     }
188     //========================================
189     switch (netinf.infotype) {
190     case HEART_BREAT_TYPE:
191         HandleHeartbeatTypeRecv(netinf);
192         break;
193     case VOTE_LEADER_TYPE:
194         HandleVoteTypeRecv(netinf);
195         break;
196     case VOTE_LEADER_RESP_TYPE:
197         HandleVoteRespTypeRecv(netinf);
198         break;
199     default:
200         std::cerr << "Recv Unknown info type." << std::endl;
201     }
202 
203 }
204 
205 bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) {
206     int looptime = 200;
207     int timeoutlimit = dice();
208     while (1) {
209         DiapatchByStatus(id, timeoutlimit);
210         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
211     }
212 
213     return false;
214 }
215 
216 void RaftManager::SendFunc(int i) {
217     //todo
218     //示例 間隔200ms掃描 心跳間隔5000ms  選舉超時未 1001-4000ms
219     string port = "9920";
220     port[port.size() - 1] += i;
221     int looptime = 4000;
222     while (1) {
223         std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
224         tcp::resolver resolver(io_service);
225         try {
226             boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port }));
227         }
228         catch (exception& e) {
229             //持續嘗試鏈接
230             continue;
231         }
232         LoopCheck(i, s);
233         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
234     }
235 
236     return;
237 }
238 
239 void RaftManager::Session(tcp::socket sock) {
240     BYTE data[1024] = { 0 };
241     boost::system::error_code error;
242     NetInfo netinf;
243     while (1) {
244         size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
245         if (error == boost::asio::error::eof)
246             return; // Connection closed cleanly by peer.
247         else if (error) {
248             std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
249             return;
250         }
251         if (length != sizeof(netinf)) {
252             std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
253             return;
254         }
255 
256         DiapatchByInfoType(netinf);
257 
258     }
259 }
260 
261 bool RaftManager::Go() {
262     //創建網絡 原本可使用廣播  獲取和通知其餘節點
263     //演示版本假定 5個ID和端口分別爲1 2 3 4 5 和9921 9922 9923 9924 9925
264     if (ip == "" || portStart == 0 || nodeID == 0)
265         return false;
266     try {
267         //開啓4個與其餘線程發送信息的線程
268         for (int i = 1; i <= 2; i++) {
269             if (i == nodeID)
270                 continue;
271             std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i);
272             t.detach();
273         }
274 
275         int port = portStart + nodeID;
276         tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
277         for (;;)
278         {
279             for (;;)
280             {
281                 tcp::socket sock(io_service);
282                 a.accept(sock);
283                 std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach();
284             }
285         }
286     }
287     catch (exception& e) {
288         std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
289         return false;
290     }
291 
292     return true;
293 }
RaftManager.cpp
  1 // QueueTemplate.cpp : 此文件包含 "main" 函數。程序執行將在此處開始並結束。
  2 //
  3 
  4 #include "pch.h"
  5 #include <iostream>
  6 
  7 
  8 #include<list>
  9 #include<mutex>
 10 #include<thread>
 11 #include<condition_variable>
 12 #include <iostream>
 13 using namespace std;
 14 
 15 template<typename T>
 16 class SyncQueue
 17 {
 18 public:
 19     SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
 20     {
 21     }
 22 
 23     void Put(const T&x)
 24     {
 25         Add(x);
 26     }
 27 
 28     void Put(T&&x)
 29     {
 30         Add(std::forward<T>(x));
 31     }
 32 
 33     void Take(std::list<T>& list)
 34     {
 35         std::unique_lock<std::mutex> locker(m_mutex);
 36         m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
 37 
 38         if (m_needStop)
 39             return;
 40         list = std::move(m_queue);
 41         m_notFull.notify_one();
 42     }
 43 
 44     void Take(T& t)
 45     {
 46         std::unique_lock<std::mutex> locker(m_mutex);
 47         m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
 48 
 49         if (m_needStop)
 50             return;
 51         t = m_queue.front();
 52         m_queue.pop_front();
 53         m_notFull.notify_one();
 54     }
 55 
 56     void Stop()
 57     {
 58         {
 59             std::lock_guard<std::mutex> locker(m_mutex);
 60             m_needStop = true;
 61         }
 62         m_notFull.notify_all();
 63         m_notEmpty.notify_all();
 64     }
 65 
 66     bool Empty()
 67     {
 68         std::lock_guard<std::mutex> locker(m_mutex);
 69         return m_queue.empty();
 70     }
 71 
 72     bool Full()
 73     {
 74         std::lock_guard<std::mutex> locker(m_mutex);
 75         return m_queue.size() == m_maxSize;
 76     }
 77 
 78     size_t Size()
 79     {
 80         std::lock_guard<std::mutex> locker(m_mutex);
 81         return m_queue.size();
 82     }
 83 
 84     int Count()
 85     {
 86         return m_queue.size();
 87     }
 88 private:
 89     bool NotFull() const
 90     {
 91         bool full = m_queue.size() >= m_maxSize;
 92         if (full)
 93             cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
 94         return !full;
 95     }
 96 
 97     bool NotEmpty() const
 98     {
 99         bool empty = m_queue.empty();
100         if (empty)
101             cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
102         return !empty;
103     }
104 
105     template<typename F>
106     void Add(F&&x)
107     {
108         std::unique_lock< std::mutex> locker(m_mutex);
109         m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
110         if (m_needStop)
111             return;
112 
113         m_queue.push_back(std::forward<F>(x));
114         m_notEmpty.notify_one();
115     }
116 
117 private:
118     std::list<T> m_queue; //緩衝區
119     std::mutex m_mutex; //互斥量和條件變量結合起來使用
120     std::condition_variable m_notEmpty;//不爲空的條件變量
121     std::condition_variable m_notFull; //沒有滿的條件變量
122     int m_maxSize; //同步隊列最大的size
123 
124     bool m_needStop; //中止的標誌
125 };
126 
127 int main()
128 {
129     std::cout << "Hello World!\n"; 
130 
131     SyncQueue<int> q(1);
132     q.Put(1);
133 
134     int a = 0;
135     q.Take(a);
136     
137     q.Put(2);
138     q.Take(a);
139 
140     
141     q.Stop();
142 
143 }
syncqueue.h

 

 

 

 

 

 

本身嘗試作一個簡化的raft選舉演示

實現定義2-5個節點,使用讀取配置文件來獲取IP和端口以及節點ID

網絡使用boost同步流程

一個線程收 四個線程發送

1 收的線程根據接受的數據 判斷是心跳包仍是選舉請求仍是選舉請求回覆  來更新本身的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了本身的選舉票map<int,int> // nodeid, term

2 發送的節點每一個200MS則輪詢一次,根據結點當前狀態減小等待時間(等待時間根據節點狀態調節爲1000ms心跳間隔或者1500-5000的隨機選舉超時)

根據當前狀態決定發送心跳包或者是選舉消息 或者是選舉回覆消息

 待填坑

 

 

參考:

《etcd技術內幕》

http://thesecretlivesofdata.com/raft/#intro

相關文章
相關標籤/搜索