1. SOFAJRaft源碼分析— SOFAJRaft啓動時作了什麼?

咱們此次依然用上次的例子CounterServer來進行講解:html

我這裏就不貼整個代碼了java

public static void main(final String[] args) throws IOException {
    if (args.length != 4) {
        System.out
            .println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
        System.out
            .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer " +
                    "/tmp/server1 " +
                    "counter " +
                    "127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
        System.exit(1);
    }
    //日誌存儲的路徑
    final String dataPath = args[0];
    //SOFAJRaft集羣的名字
    final String groupId = args[1];
    //當前節點的ip和端口
    final String serverIdStr = args[2];
    //集羣節點的ip和端口
    final String initConfStr = args[3];

    final NodeOptions nodeOptions = new NodeOptions();
    // 爲了測試,調整 snapshot 間隔等參數
    // 設置選舉超時時間爲 1 秒
    nodeOptions.setElectionTimeoutMs(1000);
    // 關閉 CLI 服務。
    nodeOptions.setDisableCli(false);
    // 每隔30秒作一次 snapshot
    nodeOptions.setSnapshotIntervalSecs(30);
    // 解析參數
    final PeerId serverId = new PeerId();
    if (!serverId.parse(serverIdStr)) {
        throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
    }
    final Configuration initConf = new Configuration();
    //將raft分組加入到Configuration的peers數組中
    if (!initConf.parse(initConfStr)) {
        throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
    }
    // 設置初始集羣配置
    nodeOptions.setInitialConf(initConf);

    // 啓動
    final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
    System.out.println("Started counter server at port:"
                       + counterServer.getNode().getNodeId().getPeerId().getPort());
}
複製代碼

咱們在啓動server的main方法的時候會傳入日誌存儲的路徑、SOFAJRaft集羣的名字、當前節點的ip和端口、集羣節點的ip和端口並設值到NodeOptions中,做爲當前節點啓動的參數。node

這裏會將當前節點初始化爲一個PeerId對象 PeerId數組

//存放當前節點的ip和端口號
private Endpoint            endpoint         = new Endpoint(Utils.IP_ANY, 0);

//默認是0
private int                 idx; 
//是一個ip:端口的字符串
private String              str;
public PeerId() {
    super();
}

public boolean parse(final String s) {
    final String[] tmps = StringUtils.split(s, ':');
    if (tmps.length != 3 && tmps.length != 2) {
        return false;
    }
    try {
        final int port = Integer.parseInt(tmps[1]);
        this.endpoint = new Endpoint(tmps[0], port);
        if (tmps.length == 3) {
            this.idx = Integer.parseInt(tmps[2]);
        } else {
            this.idx = 0;
        }
        this.str = null;
        return true;
    } catch (final Exception e) {
        LOG.error("Parse peer from string failed: {}", s, e);
        return false;
    }
}
複製代碼

PeerId的parse方法會將傳入的ip:端口解析以後對變量進行一些賦值的操做。服務器

而後會調用到CounterServer的構造器中: CounterServerapp

public CounterServer(final String dataPath, final String groupId, final PeerId serverId, final NodeOptions nodeOptions) throws IOException {
    // 初始化路徑
    FileUtils.forceMkdir(new File(dataPath));

    // 這裏讓 raft RPC 和業務 RPC 使用同一個 RPC server, 一般也能夠分開
    final RpcServer rpcServer = new RpcServer(serverId.getPort());
    RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
    // 註冊業務處理器
    rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
    rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
    // 初始化狀態機
    this.fsm = new CounterStateMachine();
    // 設置狀態機到啓動參數
    nodeOptions.setFsm(this.fsm);
    // 設置存儲路徑
    // 日誌, 必須
    nodeOptions.setLogUri(dataPath + File.separator + "log");
    // 元信息, 必須
    nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
    // snapshot, 可選, 通常都推薦
    nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
    // 初始化 raft group 服務框架
    this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    // 啓動
    this.node = this.raftGroupService.start();
}
複製代碼

這個方法主要是調用NodeOptions的各類方法進行設置,而後調用raftGroupService的start方法啓動raft節點。框架

RaftGroupService

咱們來到RaftGroupService的start方法: RaftGroupService#startdom

public synchronized Node start(final boolean startRpcServer) {
    //若是已經啓動了,那麼就返回
    if (this.started) {
        return this.node;
    }
    //校驗serverId和groupId
    if (this.serverId == null || this.serverId.getEndpoint() == null
            || this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {
        throw new IllegalArgumentException("Blank serverId:" + this.serverId);
    }
    if (StringUtils.isBlank(this.groupId)) {
        throw new IllegalArgumentException("Blank group id" + this.groupId);
    }
    //Adds RPC server to Server.
    //設置當前node的ip和端口
    NodeManager.getInstance().addAddress(this.serverId.getEndpoint());

    //建立node
    this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
    if (startRpcServer) {
        //啓動遠程服務
        this.rpcServer.start();
    } else {
        LOG.warn("RPC server is not started in RaftGroupService.");
    }
    this.started = true;
    LOG.info("Start the RaftGroupService successfully.");
    return this.node;
}
複製代碼

這個方法會在一開始的時候對RaftGroupService在構造器實例化的參數進行校驗,而後把當前節點的Endpoint添加到NodeManager的addrSet變量中,接着調用RaftServiceFactory#createAndInitRaftNode實例化Node節點。ide

每一個節點都會啓動一個rpc的服務,由於每一個節點既能夠被選舉也能夠投票給其餘節點,節點之間須要互相通訊,因此須要啓動一個rpc服務。工具

RaftServiceFactory#createAndInitRaftNode

public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {
    //實例化一個node節點
    final Node ret = createRaftNode(groupId, serverId);
    //爲node節點初始化
    if (!ret.init(opts)) {
        throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");
    }
    return ret;
}

public static Node createRaftNode(final String groupId, final PeerId serverId) {
    return new NodeImpl(groupId, serverId);
}
複製代碼

createAndInitRaftNode方法首先調用createRaftNode實例化一個Node的實例NodeImpl,而後調用其init方法進行初始化,主要的配置都是在init方法中完成的。

NodeImpl

public NodeImpl(final String groupId, final PeerId serverId) {
    super();
    if (groupId != null) {
        //檢驗groupId是否符合格式規範
        Utils.verifyGroupId(groupId);
    }
    this.groupId = groupId;
    this.serverId = serverId != null ? serverId.copy() : null;
    //一開始的設置爲未初始化
    this.state = State.STATE_UNINITIALIZED;
    //設置新的任期爲0
    this.currTerm = 0;
    //設置最新的時間戳
    updateLastLeaderTimestamp(Utils.monotonicMs());
    this.confCtx = new ConfigurationCtx(this);
    this.wakingCandidate = null; 
    final int num = GLOBAL_NUM_NODES.incrementAndGet();
    LOG.info("The number of active nodes increment to {}.", num);
}
複製代碼

NodeImpl會在構造器中初始化一些參數。

Node的初始化

Node節點的全部的重要的配置都是在init方法中完成的,NodeImpl的init方法比較長因此分紅代碼塊來進行講解。

NodeImpl#init

//非空校驗
Requires.requireNonNull(opts, "Null node options");
Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
//目前就一個實現:DefaultJRaftServiceFactory
this.serviceFactory = opts.getServiceFactory();
this.options = opts;
this.raftOptions = opts.getRaftOptions();
//基於 Metrics 類庫的性能指標統計,具備豐富的性能統計指標,默認不開啓度量工具
this.metrics = new NodeMetrics(opts.isEnableMetrics());

if (this.serverId.getIp().equals(Utils.IP_ANY)) {
    LOG.error("Node can't started from IP_ANY.");
    return false;
}

if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) {
    LOG.error("No RPC server attached to, did you forget to call addService?");
    return false;
}
//定時任務管理器
this.timerManager = new TimerManager();
//初始化定時任務管理器的內置線程池
if (!this.timerManager.init(this.options.getTimerPoolSize())) {
    LOG.error("Fail to init timer manager.");
    return false;
}

//定時任務管理器
this.timerManager = new TimerManager();
//初始化定時任務管理器的內置線程池
if (!this.timerManager.init(this.options.getTimerPoolSize())) {
    LOG.error("Fail to init timer manager.");
    return false;
}
複製代碼

這段代碼主要是給各個變量賦值,而後進行校驗判斷一下serverId不能爲0.0.0.0,當前的Endpoint必需要在NodeManager裏面設置過等等(NodeManager的設置是在RaftGroupService的start方法裏)。

而後會初始化一個全局的的定時調度管理器TimerManager: TimerManager

private ScheduledExecutorService executor;

@Override
public boolean init(Integer coreSize) {
    this.executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory(
        "JRaft-Node-ScheduleThreadPool-", true));
    return true;
}
複製代碼

TimerManager的init方法就是初始化一個線程池,若是當前的服務器的cpu線程數3 大於20 ,那麼這個線程池的coreSize就是20,不然就是cpu線程數3。

往下走是計時器的初始化:

// Init timers
//設置投票計時器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {

    @Override
    protected void onTrigger() {
        //處理投票超時
        handleVoteTimeout();
    }

    @Override
    protected int adjustTimeout(final int timeoutMs) {
        //在必定範圍內返回一個隨機的時間戳
        return randomTimeout(timeoutMs);
    }
};
//設置預投票計時器
//當leader在規定的一段時間內沒有與 Follower 艦船進行通訊時,
// Follower 就能夠認爲leader已經不能正常擔任旗艦的職責,則 Follower 能夠去嘗試接替leader的角色。
// 這段通訊超時被稱爲 Election Timeout
//候選者在發起投票以前,先發起預投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {

    @Override
    protected void onTrigger() {
        handleElectionTimeout();
    }

    @Override
    protected int adjustTimeout(final int timeoutMs) {
        //在必定範圍內返回一個隨機的時間戳
        //爲了不同時發起選舉而致使失敗
        return randomTimeout(timeoutMs);
    }
};
//leader下臺的計時器
//定時檢查是否須要從新選舉leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {

    @Override
    protected void onTrigger() {
        handleStepDownTimeout();
    }
};
//快照計時器
this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) {

    @Override
    protected void onTrigger() {
        handleSnapshotTimeout();
    }
};
複製代碼

voteTimer是用來控制選舉的,若是選舉超時,當前的節點又是候選者角色,那麼就會發起選舉。 electionTimer是預投票計時器。候選者在發起投票以前,先發起預投票,若是沒有獲得半數以上節點的反饋,則候選者就會識趣的放棄參選。 stepDownTimer定時檢查是否須要從新選舉leader。當前的leader可能出現它的Follower可能並無整個集羣的1/2卻尚未下臺的狀況,那麼這個時候會按期的檢查看leader的Follower是否有那麼多,沒有那麼多的話會強制讓leader下臺。 snapshotTimer快照計時器。這個計時器會每隔1小時觸發一次生成一個快照。

這些計時器的具體實現如今暫時不表,等到要講具體功能的時候再進行梳理。

這些計時器有一個共同的特色就是會根據不一樣的計時器返回一個在必定範圍內隨機的時間。返回一個隨機的時間能夠防止多個節點在同一時間內同時發起投票選舉從而下降選舉失敗的機率。

繼續往下看:

this.configManager = new ConfigurationManager();
//初始化一個disruptor,採用多生產者模式
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
        //設置disruptor大小,默認16384
        .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
        .setEventFactory(new LogEntryAndClosureFactory()) //
        .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
        .setProducerType(ProducerType.MULTI) //
        .setWaitStrategy(new BlockingWaitStrategy()) //
        .build();
//設置事件處理器
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
//設置異常處理器
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
// 啓動disruptor的線程
this.applyQueue = this.applyDisruptor.start();
//若是開啓了metrics統計
if (this.metrics.getMetricRegistry() != null) {
    this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
            new DisruptorMetricSet(this.applyQueue));
}
複製代碼

這裏初始化了一個Disruptor做爲消費隊列,不清楚Disruptor的朋友能夠去看我上一篇文章:Disruptor—核心概念及體驗。而後還校驗了metrics是否開啓,默認是不開啓的。

繼續往下看:

//fsmCaller封裝對業務 StateMachine 的狀態轉換的調用以及日誌的寫入等
this.fsmCaller = new FSMCallerImpl();
//初始化日誌存儲功能
if (!initLogStorage()) {
    LOG.error("Node {} initLogStorage failed.", getNodeId());
    return false;
}
//初始化元數據存儲功能
if (!initMetaStorage()) {
    LOG.error("Node {} initMetaStorage failed.", getNodeId());
    return false;
}
//對FSMCaller初始化
if (!initFSMCaller(new LogId(0, 0))) {
    LOG.error("Node {} initFSMCaller failed.", getNodeId());
    return false;
}
//實例化投票箱
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
//初始化ballotBox的屬性
if (!this.ballotBox.init(ballotBoxOpts)) {
    LOG.error("Node {} init ballotBox failed.", getNodeId());
    return false;
}
//初始化快照存儲功能
if (!initSnapshotStorage()) {
    LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
    return false;
}
//校驗日誌文件索引的一致性
final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
    LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
    return false;
}
//配置管理raft group中的信息
this.conf = new ConfigurationEntry();
this.conf.setId(new LogId());
// if have log using conf in log, else using conf in options
if (this.logManager.getLastLogIndex() > 0) {
    this.conf = this.logManager.checkAndSetConfiguration(this.conf);
} else {
    this.conf.setConf(this.options.getInitialConf());
}
複製代碼

這段代碼主要是對快照、日誌、元數據等功能初始化。

this.replicatorGroup = new ReplicatorGroupImpl();
//收其餘節點或者客戶端發過來的請求,轉交給對應服務處理
this.rpcService = new BoltRaftClientService(this.replicatorGroup);
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(this);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);

// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());
//初始化rpc服務
if (!this.rpcService.init(this.options)) {
    LOG.error("Fail to init rpc service.");
    return false;
}
this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);

this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);
//只讀服務初始化
if (!this.readOnlyService.init(rosOpts)) {
    LOG.error("Fail to init readOnlyService.");
    return false;
}
複製代碼

這段代碼主要是初始化replicatorGroup、rpcService以及readOnlyService。

接下來是最後一段的代碼:

// set state to follower
this.state = State.STATE_FOLLOWER;

if (LOG.isInfoEnabled()) {
    LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
            this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());
}

//若是快照執行器不爲空,而且生成快照的時間間隔大於0,那麼就定時生成快照
if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
    LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
    this.snapshotTimer.start();
}

if (!this.conf.isEmpty()) {
    //新啓動的node須要從新選舉
    stepDown(this.currTerm, false, new Status());
}

if (!NodeManager.getInstance().add(this)) {
    LOG.error("NodeManager add {} failed.", getNodeId());
    return false;
}

// Now the raft node is started , have to acquire the writeLock to avoid race
// conditions
this.writeLock.lock();
//這個分支表示當前的jraft集羣裏只有一個節點,那麼個節點一定是leader直接進行選舉就行了
if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
    // The group contains only this server which must be the LEADER, trigger
    // the timer immediately.
    electSelf();
} else {
    this.writeLock.unlock();
}

return true;
複製代碼

這段代碼裏會將當前的狀態設置爲Follower,而後啓動快照定時器定時生成快照。 若是當前的集羣不是單節點集羣須要作一下stepDown,表示新生成的Node節點須要從新進行選舉。 最下面有一個if分支,若是當前的jraft集羣裏只有一個節點,那麼個節點一定是leader直接進行選舉就行了,因此會直接調用electSelf進行選舉。 選舉的代碼咱們就暫時略過,要否則後面就沒得講了。

到這裏整個NodeImpl實例的init方法就分析完了,這個方法很長,可是仍是作了不少事情的。

好了,今天也不早了,各位晚安~

相關文章
相關標籤/搜索