本文主要研究一下kingbus的starRaftnode
kingbus/server/server.gogit
func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error { var ( etcdRaftNode etcdraft.Node id types.ID cl *membership.RaftCluster remotes []*membership.Member appliedIndex uint64 ) prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout) if err != nil { return err } store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize) if err != nil { log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir) } //store, err := storage.NewMemoryStorage(cfg.DataDir) //if err != nil { // log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir) //} defer func() { //close storage when occur error if err != nil { store.Close() } }() logExist := utils.ExistLog(cfg.DataDir) switch { case !logExist && !cfg.NewCluster: if err = cfg.VerifyJoinExisting(); err != nil { return err } cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap) if err != nil { return err } remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name) existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt) if gerr != nil { return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) } if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } remotes = existingCluster.Members() cl.SetID(existingCluster.GetID()) cl.SetStore(store) id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil) case !logExist && cfg.NewCluster: if err = cfg.VerifyBootstrap(); err != nil { return err } cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap) if err != nil { return err } m := cl.MemberByName(cfg.Name) if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) { return fmt.Errorf("member %s has already been bootstrapped", m.ID) } cl.SetStore(store) id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs()) case logExist: if err = utils.IsDirWriteable(cfg.DataDir); err != nil { return fmt.Errorf("cannot write to member directory: %v", err) } //node restart, read states from storage //get applied index appliedIndex = raft.MustGetAppliedIndex(store) cfg.AppliedIndex = appliedIndex id, etcdRaftNode, cl = restartEtcdNode(cfg, store) cl.SetStore(store) default: return fmt.Errorf("unsupported bootstrap config") } s.raftNode = raft.NewNode( raft.NodeConfig{ IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: etcdRaftNode, Heartbeat: cfg.HeartbeatMs, Storage: store, }, ) //committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm) //set appliedIndex when applyEntries will check the entry continuity s.raftNode.SetAppliedIndex(appliedIndex) s.id = id s.wait = wait.New() s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now()) s.stopping = make(chan struct{}) s.errorc = make(chan error) s.applyBroadcast = utils.NewBroadcast() s.stats = stats.NewServerStats(cfg.Name, id.String()) s.lstats = stats.NewLeaderStats(id.String()) s.store = store tr := &rafthttp.Transport{ TLSInfo: transport.TLSInfo{}, DialTimeout: DialTimeout, ID: id, URLs: cfg.PeerURLs, ClusterID: cl.GetID(), Raft: s, ServerStats: s.stats, LeaderStats: s.lstats, ErrorC: s.errorc, } if err = tr.Start(); err != nil { return err } // add all remotes into transport //Add remotes to rafthttp, who help newly joined members catch up the //progress of the cluster. It supports basic message sending to remote, and //has no stream connection for simplicity. remotes will not be used //after the latest peers have been added into rafthttp. for _, m := range remotes { if m.ID != id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range cl.Members() { if m.ID != id { tr.AddPeer(m.ID, m.PeerURLs) } } s.raftNode.Transport = tr s.cluster = cl return nil }
kingbus/server/server.gogithub
func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) ( id types.ID, n etcdraft.Node) { member := cl.MemberByName(cfg.Name) peers := make([]etcdraft.Peer, len(ids)) for i, id := range ids { ctx, err := json.Marshal((*cl).Member(id)) if err != nil { log.Log.Panicf("marshal member should never fail: %v", err) } peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx} } id = member.ID log.Log.Infof("starting member %s in cluster %s", id, cl.GetID()) c := &etcdraft.Config{ ID: uint64(id), ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs), HeartbeatTick: 1, Storage: store, MaxSizePerMsg: cfg.MaxRequestBytes, MaxInflightMsgs: maxInflightMsgs, CheckQuorum: true, PreVote: cfg.PreVote, DisableProposalForwarding: true, Logger: log.Log, } n = etcdraft.StartNode(c, peers) raft.AdvanceTicks(n, c.ElectionTick) return id, n }
kingbus/server/server.gojson
func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) ( types.ID, etcdraft.Node, *membership.RaftCluster) { cl, err := membership.GetRaftClusterFromStorage(store) if err != nil { if err != nil { log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error()) } } log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String()) //get id from raftCluster member := cl.MemberByName(cfg.Name) if member == nil { log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s", cl.String(), cfg.Name) } c := &etcdraft.Config{ ID: uint64(member.ID), ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs), HeartbeatTick: 1, Applied: cfg.AppliedIndex, //set appliedIndex Storage: store, MaxSizePerMsg: cfg.MaxRequestBytes, MaxInflightMsgs: maxInflightMsgs, CheckQuorum: true, PreVote: cfg.PreVote, DisableProposalForwarding: true, Logger: log.Log, } n := etcdraft.RestartNode(c) return member.ID, n, cl }
starRaft方法先經過rafthttp.NewRoundTripper建立http.RoundTripper,以後經過storage.NewDiskStorage建立DiskStorage,以後根據logExist及cfg.NewCluster作不一樣處理;若兩者都爲false則更新membership.RaftCluster的id爲存在的cluster的id,而後執行startEtcdRaftNode;若cfg.NewCluster爲true則使用cl.MemberIDs()來執行startEtcdRaftNode;若logExist爲true則執行restartEtcdNode;最後建立rafthttp.Transport,執行tr.Start()、tr.AddRemote、tr.AddPeerbootstrap