本文主要研究一下nacos的RaftPeerSetjava
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.javagit
@Component @DependsOn("serverListManager") public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware { @Autowired private ServerListManager serverListManager; private ApplicationContext applicationContext; private AtomicLong localTerm = new AtomicLong(0L); private RaftPeer leader = null; private Map<String, RaftPeer> peers = new HashMap<>(); private Set<String> sites = new HashSet<>(); private boolean ready = false; public RaftPeerSet() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @PostConstruct public void init() { serverListManager.listen(this); } public RaftPeer getLeader() { if (STANDALONE_MODE) { return local(); } return leader; } public Set<String> allSites() { return sites; } public boolean isReady() { return ready; } public void remove(List<String> servers) { for (String server : servers) { peers.remove(server); } } public RaftPeer update(RaftPeer peer) { peers.put(peer.ip, peer); return peer; } public boolean isLeader(String ip) { if (STANDALONE_MODE) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); } public Set<String> allServersIncludeMyself() { return peers.keySet(); } public Set<String> allServersWithoutMySelf() { Set<String> servers = new HashSet<String>(peers.keySet()); // exclude myself servers.remove(local().ip); return servers; } public Collection<RaftPeer> allPeers() { return peers.values(); } public int size() { return peers.size(); } public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } public RaftPeer makeLeader(RaftPeer candidate) { if (!Objects.equals(leader, candidate)) { leader = candidate; applicationContext.publishEvent(new MakeLeaderEvent(this, leader)); Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); } for (final RaftPeer peer : peers.values()) { Map<String, String> params = new HashMap<>(1); if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; } update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); } public RaftPeer local() { RaftPeer peer = peers.get(NetUtils.localServer()); if (peer == null && SystemUtils.STANDALONE_MODE) { RaftPeer localPeer = new RaftPeer(); localPeer.ip = NetUtils.localServer(); localPeer.term.set(localTerm.get()); peers.put(localPeer.ip, localPeer); return localPeer; } if (peer == null) { throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: " + Arrays.toString(peers.keySet().toArray())); } return peer; } public RaftPeer get(String server) { return peers.get(server); } public int majorityCount() { return peers.size() / 2 + 1; } public void reset() { leader = null; for (RaftPeer peer : peers.values()) { peer.voteFor = null; } } public void setTerm(long term) { localTerm.set(term); } public long getTerm() { return localTerm.get(); } public boolean contains(RaftPeer remote) { return peers.containsKey(remote.ip); } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.javagithub
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JSON.toJSONString(getLeader()), local.term); peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.javaapp
@Component public class RaftCore { //...... public RaftPeer receivedBeat(JSONObject beat) throws Exception { final RaftPeer local = peers.local(); final RaftPeer remote = new RaftPeer(); remote.ip = beat.getJSONObject("peer").getString("ip"); remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state")); remote.term.set(beat.getJSONObject("peer").getLongValue("term")); remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs"); remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs"); remote.voteFor = beat.getJSONObject("peer").getString("voteFor"); if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JSON.toJSONString(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } if (local.term.get() > remote.term.get()) { Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}" , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs); throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } final JSONArray beatDatums = beat.getJSONArray("datums"); local.resetLeaderDue(); local.resetHeartbeatDue(); peers.makeLeader(remote); Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size()); for (Map.Entry<String, Datum> entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // now check datums List<String> batch = new ArrayList<>(); if (!switchDomain.isSendBeatOnly()) { int processedCount = 0; if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}", beatDatums.size(), datums.size(), remote.ip, remote.term, local.term); } for (Object object : beatDatums) { processedCount = processedCount + 1; JSONObject entry = (JSONObject) object; String key = entry.getString("key"); final String datumKey; if (KeyBuilder.matchServiceMetaKey(key)) { datumKey = KeyBuilder.detailServiceMetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } long timestamp = entry.getLong("timestamp"); receivedKeysMap.put(datumKey, 1); try { if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}" , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size()); // update datum entry String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8"); HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { return 1; } List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() { }); for (JSONObject datumJson : datumList) { OPERATE_LOCK.lock(); Datum newDatum = null; try { Datum oldDatum = getDatum(datumJson.getString("key")); if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) { Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp); continue; } if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) { Datum<Service> serviceDatum = new Datum<>(); serviceDatum.key = datumJson.getString("key"); serviceDatum.timestamp.set(datumJson.getLongValue("timestamp")); serviceDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class); newDatum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) { Datum<Instances> instancesDatum = new Datum<>(); instancesDatum.key = datumJson.getString("key"); instancesDatum.timestamp.set(datumJson.getLongValue("timestamp")); instancesDatum.value = JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class); newDatum = instancesDatum; } if (newDatum == null || newDatum.value == null) { Loggers.RAFT.error("receive null datum: {}", datumJson); continue; } raftStore.write(newDatum); datums.put(newDatum.key, newDatum); notifier.addTask(newDatum.key, ApplyAction.CHANGE); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term); } catch (Throwable e) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e); } finally { OPERATE_LOCK.unlock(); } } TimeUnit.MILLISECONDS.sleep(200); return 0; } }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } List<String> deadKeys = new ArrayList<>(); for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } } } return local; } //...... }