raft做者啓動的一個項目,以這個爲基礎看下文章描述的實現方法git
首先遞增任期,以後進入「候選人」狀態github
void
RaftConsensus::startNewElection()
{
……
++currentTerm;
state = State::CANDIDATE;
leaderId = 0;
votedFor = serverId;
……
}服務器
void
RaftConsensus::requestVote(std::unique_lock<Mutex>& lockGuard, Peer& peer)
{
……
Protocol::Raft::RequestVote::Response response;
VERBOSE("requestVote start");
TimePoint start = Clock::now();
uint64_t epoch = currentEpoch;
Peer::CallStatus status = peer.callRPC(
Protocol::Raft::OpCode::REQUEST_VOTE,
request, response,
lockGuard);
……
}app
當一個server爲某一個server投票以後,會更新本身的currentTerm和VoteFor字段。這樣,當不一樣的candidate定時器同時發送vote請求時,它們雖然TermID相同,可是每一個先收到的follower在更新currentTerm以後還更新了votefor字段,因爲二者並不相同,因此不會在同一個term中給不一樣的server投票。
logcabin-master\Server\RaftConsensus.cc
void
RaftConsensus::handleRequestVote(
const Protocol::Raft::RequestVote::Request& request,
Protocol::Raft::RequestVote::Response& response)
{
std::lock_guard<Mutex> lockGuard(mutex);
assert(!exiting);less
// If the caller has a less complete log, we can't give it our vote.
uint64_t lastLogIndex = log->getLastLogIndex();
uint64_t lastLogTerm = getLastLogTerm();
bool logIsOk = (request.last_log_term() > lastLogTerm ||
(request.last_log_term() == lastLogTerm &&
request.last_log_index() >= lastLogIndex));ide
if (withholdVotesUntil > Clock::now()) {
NOTICE("Rejecting RequestVote for term %lu from server %lu, since "
"this server (which is in term %lu) recently heard from a "
"leader (%lu). Should server %lu be shut down?",
request.term(), request.server_id(), currentTerm,
leaderId, request.server_id());
response.set_term(currentTerm);
response.set_granted(false);
response.set_log_ok(logIsOk);
return;
}
//這裏要求請求的任期必須是連續遞增的,即便時某一個任期選舉沒有成功。
if (request.term() > currentTerm) {
NOTICE("Received RequestVote request from server %lu in term %lu "
"(this server's term was %lu)",
request.server_id(), request.term(), currentTerm);
stepDown(request.term());
}函數
// At this point, if leaderId != 0, we could tell the caller to step down.
// However, this is just an optimization that does not affect correctness
// or really even efficiency, so it's not worth the trouble.ui
if (request.term() == currentTerm) {
if (logIsOk && votedFor == 0) {
// Give caller our vote
NOTICE("Voting for %lu in term %lu",
request.server_id(), currentTerm);
stepDown(currentTerm);
setElectionTimer();
votedFor = request.server_id();
updateLogMetadata();
printElectionState();
}
}this
// Fill in response.
response.set_term(currentTerm);
// don't strictly need the first condition
response.set_granted(request.term() == currentTerm &&
votedFor == request.server_id());
response.set_log_ok(logIsOk);
}spa
經過一個EntryType::NOOP類型的同步包,讓全部的server更新leader的ID。
void
RaftConsensus::becomeLeader()
{
assert(state == State::CANDIDATE);
NOTICE("Now leader for term %lu (appending no-op at index %lu)",
currentTerm,
log->getLastLogIndex() + 1);
state = State::LEADER;
leaderId = serverId;
printElectionState();
startElectionAt = TimePoint::max();
withholdVotesUntil = TimePoint::max();
// Our local cluster time clock has been ticking ever since we got the last
// log entry/snapshot. Set the clock back to when that happened, since we
// don't really want to count that time (the cluster probably had no leader
// for most of it).
clusterClock.newEpoch(clusterClock.clusterTimeAtEpoch);
// The ordering is pretty important here: First set nextIndex and
// matchIndex for ourselves and each follower, then append the no op.
// Otherwise we'll set our localServer's last agree index too high.
configuration->forEach(&Server::beginLeadership);
// Append a new entry so that commitment is not delayed indefinitely.
// Otherwise, if the leader never gets anything to append, it will never
// return to read-only operations (it can't prove that its committed index
// is up-to-date).
Log::Entry entry;
entry.set_term(currentTerm);
entry.set_type(Protocol::Raft::EntryType::NOOP);
entry.set_cluster_time(clusterClock.leaderStamp());
append({&entry});
// Outstanding RequestVote RPCs are no longer needed.
interruptAll();
}
void
RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard,
Peer& peer)
{
……
if (response.success()) {
if (peer.matchIndex > prevLogIndex + numEntries) {
// Revisit this warning if we pipeline AppendEntries RPCs for
// performance.
WARNING("matchIndex should monotonically increase within a "
"term, since servers don't forget entries. But it "
"didn't.");
} else {
peer.matchIndex = prevLogIndex + numEntries;
advanceCommitIndex();
}
peer.nextIndex = peer.matchIndex + 1;
peer.suppressBulkData = false;
if (!peer.isCaughtUp_ &&
peer.thisCatchUpIterationGoalId <= peer.matchIndex) {
Clock::duration duration =
Clock::now() - peer.thisCatchUpIterationStart;
uint64_t thisCatchUpIterationMs =
uint64_t(std::chrono::duration_cast<
std::chrono::milliseconds>(duration).count());
if (labs(int64_t(peer.lastCatchUpIterationMs -
thisCatchUpIterationMs)) * 1000L * 1000L <
std::chrono::nanoseconds(ELECTION_TIMEOUT).count()) {
peer.isCaughtUp_ = true;
stateChanged.notify_all();
} else {
peer.lastCatchUpIterationMs = thisCatchUpIterationMs;
peer.thisCatchUpIterationStart = Clock::now();
peer.thisCatchUpIterationGoalId = log->getLastLogIndex();
}
}
} else {
if (peer.nextIndex > 1)
--peer.nextIndex;
// A server that hasn't been around for a while might have a much
// shorter log than ours. The AppendEntries reply contains the
// index of its last log entry, and there's no reason for us to
// set nextIndex to be more than 1 past that (that would leave a
// gap, so it will always be rejected).
if (response.has_last_log_index() &&
peer.nextIndex > response.last_log_index() + 1) {
peer.nextIndex = response.last_log_index() + 1;
}
}
……
}
void
RaftConsensus::handleAppendEntries(
const Protocol::Raft::AppendEntries::Request& request,
Protocol::Raft::AppendEntries::Response& response)
{
……
// If the caller's term is stale, just return our term to it.
if (request.term() < currentTerm) {
VERBOSE("Caller(%lu) is stale. Our term is %lu, theirs is %lu",
request.server_id(), currentTerm, request.term());
return; // response was set to a rejection above
}
……
}
若是超過半數贊成,advanceCommitIndex函數遞增commitIndex,表示這個日誌編號能夠進入提交狀態
void
RaftConsensus::appendEntries(std::unique_lock<Mutex>& lockGuard,
Peer& peer)
{
……
advanceCommitIndex
……
}
void
RaftConsensus::advanceCommitIndex()
{
if (state != State::LEADER) {
// getMatchIndex is undefined unless we're leader
WARNING("advanceCommitIndex called as %s",
Core::StringUtil::toString(state).c_str());
return;
}
// calculate the largest entry ID stored on a quorum of servers
uint64_t newCommitIndex =
configuration->quorumMin(&Server::getMatchIndex);
if (commitIndex >= newCommitIndex)
return;
……
}
經過線程將已經提交的日誌逐個引用到狀態機中
logcabin-master\Server\StateMachine.cc
void
StateMachine::applyThreadMain()
{
Core::ThreadId::setName("StateMachine");
try {
while (true) {
RaftConsensus::Entry entry = consensus->getNextEntry(lastApplied);
std::lock_guard<Core::Mutex> lockGuard(mutex);
switch (entry.type) {
case RaftConsensus::Entry::SKIP:
break;
case RaftConsensus::Entry::DATA:
apply(entry);
break;
case RaftConsensus::Entry::SNAPSHOT:
NOTICE("Loading snapshot through entry %lu into state "
"machine", entry.index);
loadSnapshot(*entry.snapshotReader);
NOTICE("Done loading snapshot");
break;
}
expireSessions(entry.clusterTime);
lastApplied = entry.index;
entriesApplied.notify_all();
if (shouldTakeSnapshot(lastApplied) &&
maySnapshotAt <= Clock::now()) {
snapshotSuggested.notify_all();
}
}
} catch (const Core::Util::ThreadInterruptedException&) {
NOTICE("exiting");
std::lock_guard<Core::Mutex> lockGuard(mutex);
exiting = true;
entriesApplied.notify_all();
snapshotSuggested.notify_all();
snapshotStarted.notify_all();
snapshotCompleted.notify_all();
killSnapshotProcess(Core::HoldingMutex(lockGuard), SIGTERM);
}
}
RaftConsensus::Entry
RaftConsensus::getNextEntry(uint64_t lastIndex) const
{
std::unique_lock<Mutex> lockGuard(mutex);
uint64_t nextIndex = lastIndex + 1;
while (true) {
if (exiting)
throw Core::Util::ThreadInterruptedException();
if (commitIndex >= nextIndex) {
RaftConsensus::Entry entry;
// Make the state machine load a snapshot if we don't have the next // entry it needs in the log. if (log->getLogStartIndex() > nextIndex) { entry.type = Entry::SNAPSHOT; // For well-behaved state machines, we expect 'snapshotReader' // to contain a SnapshotFile::Reader that we can return // directly to the state machine. In the case that a State // Machine asks for the snapshot again, we have to build a new // SnapshotFile::Reader again. entry.snapshotReader = std::move(snapshotReader); if (!entry.snapshotReader) { WARNING("State machine asked for same snapshot twice; " "this shouldn't happen in normal operation. " "Having to re-read it from disk."); // readSnapshot() shouldn't have any side effects since the // snapshot should have already been read, so const_cast // should be ok (though ugly). const_cast<RaftConsensus*>(this)->readSnapshot(); entry.snapshotReader = std::move(snapshotReader); } entry.index = lastSnapshotIndex; entry.clusterTime = lastSnapshotClusterTime; } else { // not a snapshot const Log::Entry& logEntry = log->getEntry(nextIndex); entry.index = nextIndex; if (logEntry.type() == Protocol::Raft::EntryType::DATA) { entry.type = Entry::DATA; const std::string& s = logEntry.data(); entry.command = Core::Buffer( memcpy(new char[s.length()], s.data(), s.length()), s.length(), Core::Buffer::deleteArrayFn<char>); } else { entry.type = Entry::SKIP; } entry.clusterTime = logEntry.cluster_time(); } return entry; } stateChanged.wait(lockGuard); }}